请教大伙一个问题,我调试了半天,发现 gen.return 返回异步结果后,程序不是接着 yield 的地方执行,而是又跳到 ioloop,tornado 初学,文档较少,希望大家能帮忙指出错误在哪,感激 部分程序如下
@gen.coroutine
def _fetch_and_extract(self, task):
self.processing_task_number += 1
self.processing_task_set.add(task)
if self.processing_task_number != self.processing_task_set.size():ioloop.IOLoop.instance().stop()
self.logger.info("%s: start to fetch and extract" % self.worker_name)
if check_task(task):
try:
spider = load_object(task["spider"])
except Exception, e:
handle_fail_task(task,"load %s object failed" % (task["spider"]),self.process_fail_task_queue, self.wait_for_process_task_queue)
self.logger.error("%s: load object failed.path:%s, exception:%s" % (self.worker_name, task["spider"], e))
else:
spider_object = spider(self.processed_url_set, self.wait_for_process_task_queue)
fetch_start_time = datetime.datetime.now()
resp = yield spider_object.fetch(task["request"])
self.logger.debug("got resp already ”)#已打印
#这里已经拿到异步返回的 resp 了,但是代码没有恢复接着往下走,而是直接回到 loop 去取 task 了,但是这时候 task 里空了,所以之后就一直提示任务空
#相当于只抓了美团 api 的 city 列表,后面 extract、再添加 task 都没有执行,我 debug 时候比较奇怪这点,按道理异步返回 resp 了应该接着之前代码的位置继续执行 不是么
if resp == None or resp.error != None:
handle_fail_task(task, "fetch request: %s failed,code:%d error:%s" % (task["request"], resp.code, resp.error), self.process_fail_task_queue, self.wait_for_process_task_queue)
self.logger.error("%s: fetch request: %s failed, error: %s" % (self.worker_name, task["request"], resp))
else:
content_length = resp.headers['Content-Length'] if resp.headers.has_key('Content-Length') else None
last_modified = resp.headers['Last-Modified'] if resp.headers.has_key("Last-Modified") else None
fetch_time = datetime.datetime.now() - fetch_start_time
extract_start_time = datetime.datetime.now()
status = yield spider_object.extract(resp, **dict(task["kwargs"])
1
mactec OP mark 下,如果有答案了再来更新
|
2
bluesky139 2017-08-30 08:18:03 +08:00
肉眼表示没看出什么问题,你在那句打印出来 log 的地方以下每隔一行打 log 出来看看呢,或者能否精简个可独立运行的简陋版本出来
|
3
mactec OP @bluesky139
''' class Worker(object): def __init__(self,str): self._name = str self._installed = False def install(self): if self._installed: print "%s: no need to start again.worker has been installed!" % self._name else: self._installed = True ioloop.IOLoop.instance().add_callback(self.test) print "%s: worker is installed" % self._name @gen.coroutine def fetch(self,url): req = HTTPRequest(url,connect_timeout=3,request_timeout=5) client = httpclient.AsyncHTTPClient() resp = yield gen.Task(client.fetch, req) raise gen.Return(resp) @gen.coroutine def exact(self,resp): yield gen.sleep(5) raise gen.Return(10) @gen.coroutine def test(self): if self._installed: ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test) str = self._name task_url = u'http://api.caipiao.163.com/missNumber_trend.html?gameEn=kuai3' resp = yield self.fetch(task_url) print "%s get resp already at %s" %(str,datetime.datetime.now()) staus = yield self.exact(resp) print "callback!!!%s status returned %d at %s" %(str,staus,datetime.datetime.now()) if __name__ == '__main__': arrs = ['aaa','bbb'] for arr in arrs: worker = Worker(arr) worker.install() ioloop.IOLoop.instance().start() ''' 谢谢,还在找原因 请问 ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test) 这个能让 test 方法定期执行么,测试结果这个间隔没有用 |