您的案例程序看不懂,连接不上,提示错误

艾溪湖艾溪湖 · 2023-05-12 16:55
首先需要引入   from pytdx.exhq import * from pytdx.hq import * 一、标准行情api方法列表 from pytdx.hq import *     api_hq=None api_exhq=None try:     api_hq = TdxHq_API()     with self.isConnected(api_hq,type='hq',verbose=True) as api_hq:             ... 其中 isConnected() 是自定义的方法。      def isConnected(self,api,type='hq',verbose=False):        pool=None        if type=='hq':            pool=IPPOOL.hq_address        elif type=='exhq':            pool=IPPOOL.exhq_address                    for i in range(len(pool)):            address=pool[i]            res= api.connect(address[1],address[2])            if res:                if verbose:                    log.info('Success connecting %s server %s:%s'%(type,address[1],address[2]))                return res            else:                log.error('ERROR: failed connecting %s server %s:%s'%(type,address[1],address[2]))        return False     address是行情的IP地址和port。 有微信吗,能详细咨询您吗?
1 个回答
艾溪湖
艾溪湖
from pytdx.hq import TdxHq_API
from connect_utils import  isConnected

api_hq=None
api_exhq=None
try:
     api_hq = TdxHq_API()
     with self.isConnected(api_hq,type='hq',verbose=True) as api_hq:
#          stock_data_day = api.to_df(api.get_security_bars(4, 1, '600026', 0, 800)) # 返回DataFrame
#print(stock_data_day)


#connect_utils.py

def isConnected(self, api, type='hq', verbose=False):
    pool = None
    if type == 'hq':
        pool = IPPOOL.hq_address
    elif type == 'exhq':
        pool = IPPOOL.exhq_address

    for i in range(len(pool)):
        address = pool[i]
        res = api.connect(address[1], address[2])
        if res:
            if verbose:
                log.info('Success connecting %s server %s:%s' % (type, address[1], address[2]))
            return res
        else:
            log.error('ERROR: failed connecting %s server %s:%s' % (type, address[1], address[2]))
    return False



赞同
反对
评论
收藏
2023-05-12 16:57
前往发表回答