您的案例程序看不懂,连接不上,提示错误
首先需要引入
from pytdx.exhq import *
from pytdx.hq import *
一、标准行情api方法列表
from pytdx.hq import *
api_hq=None
api_exhq=None
try:
api_hq = TdxHq_API()
with self.isConnected(api_hq,type='hq',verbose=True) as api_hq:
...
其中 isConnected() 是自定义的方法。
def isConnected(self,api,type='hq',verbose=False):
pool=None
if type=='hq':
pool=IPPOOL.hq_address
elif type=='exhq':
pool=IPPOOL.exhq_address
for i in range(len(pool)):
address=pool[i]
res= api.connect(address[1],address[2])
if res:
if verbose:
log.info('Success connecting %s server %s:%s'%(type,address[1],address[2]))
return res
else:
log.error('ERROR: failed connecting %s server %s:%s'%(type,address[1],address[2]))
return False
address是行情的IP地址和port。
有微信吗,能详细咨询您吗?
1 个回答
from pytdx.hq import TdxHq_API
from connect_utils import isConnected
api_hq=None
api_exhq=None
try:
api_hq = TdxHq_API()
with self.isConnected(api_hq,type='hq',verbose=True) as api_hq:
# stock_data_day = api.to_df(api.get_security_bars(4, 1, '600026', 0, 800)) # 返回DataFrame
#print(stock_data_day)
#connect_utils.py
def isConnected(self, api, type='hq', verbose=False):
pool = None
if type == 'hq':
pool = IPPOOL.hq_address
elif type == 'exhq':
pool = IPPOOL.exhq_address
for i in range(len(pool)):
address = pool[i]
res = api.connect(address[1], address[2])
if res:
if verbose:
log.info('Success connecting %s server %s:%s' % (type, address[1], address[2]))
return res
else:
log.error('ERROR: failed connecting %s server %s:%s' % (type, address[1], address[2]))
return False