Ⅲ.Re:Scrapy架构概述
架构图
执行流程
ENGINE
从SPIDERS
中(start_urls
或start_requests
方法)获取种子链接# scrapy/crawler.py#CrawlerRunner def crawl(self, crawler_or_spidercls, *args, **kwargs): if isinstance(crawler_or_spidercls, Spider): raise ValueError( 'The crawler_or_spidercls argument cannot be a spider object, ' 'it must be a spider class (or a Crawler object)') # 启动创建crawler,从crawler_or_spidercls获取spider的name或类名 crawler = self.create_crawler(crawler_or_spidercls) return self._crawl(crawler, *args, **kwargs) def _crawl(self, crawler, *args, **kwargs): self.crawlers.add(crawler) # 加载抓取任务 d = crawler.crawl(*args, **kwargs) self._active.add(d) def _done(result): self.crawlers.discard(crawler) self._active.discard(d) self.bootstrap_failed |= not getattr(crawler, 'spider', None) return result return d.addBoth(_done)
P.S.
概念:crawler
是抓取任务、spider
是编写的爬虫逻辑模块# scrapy/crawler.py#Crawler @defer.inlineCallbacks def crawl(self, *args, **kwargs): if self.crawling: raise RuntimeError("Crawling already taking place") self.crawling = True try: # 创建spider、engine self.spider = self._create_spider(*args, **kwargs) self.engine = self._create_engine() # 迭代 spider 的 start_requests() 获取链接 start_requests = iter(self.spider.start_requests()) # engine 获取 start_requests 进行调度 yield self.engine.open_spider(self.spider, start_requests) yield defer.maybeDeferred(self.engine.start) except Exception: self.crawling = False if self.engine is not None: yield self.engine.close() raise
ENGINE
在SCHEDULER
中调度 Requests,并获取将要抓取的 RequestsSCHEDULER
把 Requests 返回给ENGINE
ENGINE
把 Requests 经过DOWNLOADER_MIDDLEWARES
(process_request)的层层处理,交递给DOWNLOADER
DOWNLOADER
将 Requests 请求后,会得到的 Response ,再经由DOWNLOADER_MIDDLEWARES
(process_response)的层层处理,返回到ENGINE
ENGINE
将得到的 Response 经过SPIDER_MIDDLEWARES
(process_spider_input),发送给SPIDERS
处理SPIDERS
根据编写的业务逻辑将 Items 或新的 Requests,通过SPIDER_MIDDLEWARES
(process_spider_output)交由ENGINE
ENGINE
会根据 Items 或 Requests 的不同,发往ITEM PIPELNES
或SCHEDULER
从步骤3之后,程序会循环执行,直到
ENGINE
里没有可供调度的 Requests 结束
核心类
# scrapy/core/engine.py#ExecutionEngine
@inlineCallbacks
def open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True):
if self.slot is not None:
raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
logger.info("Spider opened", extra={'spider': spider})
# 即将要进行调度的方法,这里是准备请求的request
nextcall = CallLaterOnce(self._next_request)
# 创建scheduler
scheduler = create_instance(self.scheduler_cls, settings=None, crawler=self.crawler)
# 流经spider中间件process_start_requests方法后的start_requests
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
# 根据scheduler为nextcall创建Slot,Slot可以看做是requests的生命周期,用来管理engine的运行状态和正在请求的request
self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.spider = spider
if hasattr(scheduler, "open"):
# 开启scheduler
yield scheduler.open(spider)
# 开启scraper
yield self.scraper.open_spider(spider)
# 开启stats(状态收集器)
self.crawler.stats.open_spider(spider)
# 记录spider_opened日志
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
# 调度nextcall
self.slot.nextcall.schedule()
# 调度间隔,now的默认值为True,会立即执行
self.slot.heartbeat.start(5)
_next_request
# scrapy/core/engine.py#ExecutionEngine _next_request(self)
def _next_request(self) -> None:
if self.slot is None:
return
assert self.spider is not None # typing
if self.paused:
return None
# _next_request_from_scheduler:从调度器中获取下个request交给引擎
while not self._needs_backout() and self._next_request_from_scheduler() is not None:
pass
if self.slot.start_requests is not None and not self._needs_backout():
try:
request = next(self.slot.start_requests)
except StopIteration:
self.slot.start_requests = None
except Exception:
self.slot.start_requests = None
logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': self.spider})
else:
self.crawl(request)
if self.spider_is_idle() and self.slot.close_if_idle:
self._spider_idle()
# scrapy/core/engine.py#ExecutionEngine _next_request_from_scheduler(self)
def _next_request_from_scheduler(self) -> Optional[Deferred]:
assert self.slot is not None # typing
assert self.spider is not None # typing
request = self.slot.scheduler.next_request()
if request is None:
return None
# _download:下载requeset
d = self._download(request, self.spider)
# _handle_downloader_output:下载器输出
d.addBoth(self._handle_downloader_output, request)
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': self.spider}))
d.addBoth(lambda _: self.slot.remove_request(request))
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': self.spider}))
slot = self.slot
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': self.spider}))
return d
_download
# scrapy/core/engine.py#ExecutionEngine _download(self, request, spider)
def _download(self, request: Request, spider: Spider) -> Deferred:
assert self.slot is not None # typing
self.slot.add_request(request)
def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
if not isinstance(result, (Response, Request)):
raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {res
if isinstance(result, Response):
if result.request is None:
result.request = request
logkws = self.logformatter.crawled(result.request, result, spider)
if logkws is not None:
logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
self.signals.send_catch_log(
signal=signals.response_received,
response=result,
request=result.request,
spider=spider,
)
return result
def _on_complete(_):
self.slot.nextcall.schedule()
return _
# 调用 settings.py 里的下载器:DOWNLOADER = 'scrapy.core.downloader.Downloader'
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
_handle_downloader_output
# scrapy/core/engine.py#ExecutionEngine _handle_downloader_output(self, result, request)
def _handle_downloader_output(
self, result: Union[Request, Response, Failure], request: Request
) -> Optional[Deferred]:
assert self.spider is not None # typing
if not isinstance(result, (Request, Response, Failure)):
raise TypeError(f"Incorrect type: expected Request, Response or Failure, got {type(result)}: {result!r}")
# downloader middleware can return requests (for example, redirects)
# 是Request的情况,继续抓取
if isinstance(result, Request):
self.crawl(result)
return None
# enqueue_scrape
d = self.scraper.enqueue_scrape(result, request, self.spider)
d.addErrback(
lambda f: logger.error(
"Error while enqueuing downloader output",
exc_info=failure_to_exc_info(f),
extra={'spider': self.spider},
)
)
return d
# scrapy/core/scraper.py#Scraper enqueue_scrape(self, result, request, spider)
def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if self.slot is None:
raise RuntimeError("Scraper slot not assigned")
dfd = self.slot.add_response_request(result, request)
def finish_scraping(_):
self.slot.finish_response(result, request)
self._check_if_closing(spider)
self._scrape_next(spider)
return _
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error('Scraper bug processing %(request)s',
{'request': request},
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
# _scrape_next
self._scrape_next(spider)
return dfd
# scrapy/core/scraper.py#Scraper _scrape_next(self, result, request, spider)
def _scrape_next(self, spider: Spider) -> None:
assert self.slot is not None # typing
while self.slot.queue:
response, request, deferred = self.slot.next_response_request_deferred()
# _scrape
self._scrape(response, request, spider).chainDeferred(deferred)
# scrapy/core/scraper.py#Scraper _scrape(self, result, request, spider)
def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the downloaded response or failure through the spider callback/errback
"""
if not isinstance(result, (Response, Failure)):
raise TypeError(f"Incorrect type: expected Response or Failure, got {type(result)}: {result!r}")
# _scrape2
dfd = self._scrape2(result, request, spider) # returns spider's processed output
dfd.addErrback(self.handle_spider_error, request, result, spider)
dfd.addCallback(self.handle_spider_output, request, result, spider)
return dfd
# scrapy/core/scraper.py#Scraper _scrape2(self, result, request, spider)
def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
"""
Handle the different cases of request's result been a Response or a Failure
"""
if isinstance(result, Response):
# scrape_response:spider中间件process_spider_input、process_spider_output的处理
return self.spidermw.scrape_response(self.call_spider, result, request, spider)
else: # result is a Failure
# call_spider:失败回调
dfd = self.call_spider(result, request, spider)
return dfd.addErrback(self._log_download_errors, result, request, spider)
scrape_response
# scrapy/core/spidermw.py#SpiderMiddlewareManager scrape_response(self, scrape_func, response, request, spider)
def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request,
spider: Spider) -> Deferred:
def process_callback_output(result: Iterable) -> MutableChain:
return self._process_callback_output(response, spider, result)
def process_spider_exception(_failure: Failure) -> Union[Failure, MutableChain]:
return self._process_spider_exception(response, spider, _failure)
dfd = mustbe_deferred(self._process_spider_input, scrape_func, response, request, spider)
dfd.addCallbacks(callback=process_callback_output, errback=process_spider_exception)
return dfd
call_spider
# scrapy/core/scraper.py#Scraper call_spider(self, result, request, spider)
def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
if isinstance(result, Response):
if getattr(result, "request", None) is None:
result.request = request
# 回调自定义的callback或默认的继承 Spider 的 parse(self, response, **kwargs)
callback = result.request.callback or spider._parse
warn_on_generator_with_return_value(spider, callback)
dfd = defer_succeed(result)
dfd.addCallbacks(callback=callback, callbackKeywords=result.request.cb_kwargs)
else: # result is a Failure
result.request = request
# 失败情况的回调
warn_on_generator_with_return_value(spider, request.errback)
dfd = defer_fail(result)
dfd.addErrback(request.errback)
Ⅲ.Re:Scrapy架构概述
https://元气码农少女酱.我爱你/c31a9553cda1/