Ⅲ.Re:Scrapy架构概述

架构图

Scrapy 架构图

执行流程

  1. ENGINESPIDERS中(start_urlsstart_requests方法)获取种子链接

    # scrapy/crawler.py#CrawlerRunner
    
    def crawl(self, crawler_or_spidercls, *args, **kwargs):
    
        if isinstance(crawler_or_spidercls, Spider):
            raise ValueError(
                'The crawler_or_spidercls argument cannot be a spider object, '
                'it must be a spider class (or a Crawler object)')
        # 启动创建crawler,从crawler_or_spidercls获取spider的name或类名
        crawler = self.create_crawler(crawler_or_spidercls)
        return self._crawl(crawler, *args, **kwargs)
    
    def _crawl(self, crawler, *args, **kwargs):
        self.crawlers.add(crawler)
        # 加载抓取任务
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)
        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            self.bootstrap_failed |= not getattr(crawler, 'spider', None)
            return result
        return d.addBoth(_done)

    P.S. 概念:crawler是抓取任务、spider是编写的爬虫逻辑模块

    # scrapy/crawler.py#Crawler
    
    @defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        if self.crawling:
            raise RuntimeError("Crawling already taking place")
        self.crawling = True
        try:
            # 创建spider、engine
            self.spider = self._create_spider(*args, **kwargs)
            self.engine = self._create_engine()
            # 迭代 spider 的 start_requests() 获取链接
            start_requests = iter(self.spider.start_requests())
            # engine 获取 start_requests 进行调度
            yield self.engine.open_spider(self.spider, start_requests)
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()
            raise
  2. ENGINESCHEDULER中调度 Requests,并获取将要抓取的 Requests

  3. SCHEDULER把 Requests 返回给ENGINE

  4. ENGINE把 Requests 经过DOWNLOADER_MIDDLEWARES(process_request)的层层处理,交递给DOWNLOADER

  5. DOWNLOADER将 Requests 请求后,会得到的 Response ,再经由DOWNLOADER_MIDDLEWARES(process_response)的层层处理,返回到ENGINE

  6. ENGINE将得到的 Response 经过SPIDER_MIDDLEWARES(process_spider_input),发送给SPIDERS处理

  7. SPIDERS根据编写的业务逻辑将 Items 或新的 Requests,通过SPIDER_MIDDLEWARES(process_spider_output)交由ENGINE

  8. ENGINE会根据 Items 或 Requests 的不同,发往ITEM PIPELNESSCHEDULER

  9. 从步骤3之后,程序会循环执行,直到ENGINE里没有可供调度的 Requests 结束

核心类

# scrapy/core/engine.py#ExecutionEngine

@inlineCallbacks
def open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True):
    if self.slot is not None:
        raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
    logger.info("Spider opened", extra={'spider': spider})
    # 即将要进行调度的方法,这里是准备请求的request
    nextcall = CallLaterOnce(self._next_request)
    # 创建scheduler
    scheduler = create_instance(self.scheduler_cls, settings=None, crawler=self.crawler)
    # 流经spider中间件process_start_requests方法后的start_requests
    start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
    # 根据scheduler为nextcall创建Slot,Slot可以看做是requests的生命周期,用来管理engine的运行状态和正在请求的request
    self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
    self.spider = spider
    if hasattr(scheduler, "open"):
        # 开启scheduler
        yield scheduler.open(spider)
    # 开启scraper
    yield self.scraper.open_spider(spider)
    # 开启stats(状态收集器)
    self.crawler.stats.open_spider(spider)
    # 记录spider_opened日志
    yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
    # 调度nextcall
    self.slot.nextcall.schedule()
    # 调度间隔,now的默认值为True,会立即执行
    self.slot.heartbeat.start(5)

_next_request

# scrapy/core/engine.py#ExecutionEngine  _next_request(self)

def _next_request(self) -> None:
    if self.slot is None:
        return
    assert self.spider is not None  # typing
    if self.paused:
        return None
    # _next_request_from_scheduler:从调度器中获取下个request交给引擎
    while not self._needs_backout() and self._next_request_from_scheduler() is not None:
        pass
    if self.slot.start_requests is not None and not self._needs_backout():
        try:
            request = next(self.slot.start_requests)
        except StopIteration:
            self.slot.start_requests = None
        except Exception:
            self.slot.start_requests = None
            logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': self.spider})
        else:
            self.crawl(request)
    if self.spider_is_idle() and self.slot.close_if_idle:
        self._spider_idle()
# scrapy/core/engine.py#ExecutionEngine  _next_request_from_scheduler(self)

def _next_request_from_scheduler(self) -> Optional[Deferred]:
    assert self.slot is not None  # typing
    assert self.spider is not None  # typing
    request = self.slot.scheduler.next_request()
    if request is None:
        return None
    # _download:下载requeset
    d = self._download(request, self.spider)
    # _handle_downloader_output:下载器输出
    d.addBoth(self._handle_downloader_output, request)
    d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': self.spider}))
    d.addBoth(lambda _: self.slot.remove_request(request))
    d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': self.spider}))
    slot = self.slot
    d.addBoth(lambda _: slot.nextcall.schedule())
    d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': self.spider}))
    return d

_download

# scrapy/core/engine.py#ExecutionEngine  _download(self, request, spider)

def _download(self, request: Request, spider: Spider) -> Deferred:
    assert self.slot is not None  # typing
    self.slot.add_request(request)
    def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
        if not isinstance(result, (Response, Request)):
            raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {res
        if isinstance(result, Response):
            if result.request is None:
                result.request = request
            logkws = self.logformatter.crawled(result.request, result, spider)
            if logkws is not None:
                logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
            self.signals.send_catch_log(
                signal=signals.response_received,
                response=result,
                request=result.request,
                spider=spider,
            )
        return result
    def _on_complete(_):
        self.slot.nextcall.schedule()
        return _
    # 调用 settings.py 里的下载器:DOWNLOADER = 'scrapy.core.downloader.Downloader'
    dwld = self.downloader.fetch(request, spider)
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)
    return dwld

_handle_downloader_output

# scrapy/core/engine.py#ExecutionEngine  _handle_downloader_output(self, result, request)

def _handle_downloader_output(
    self, result: Union[Request, Response, Failure], request: Request
) -> Optional[Deferred]:
    assert self.spider is not None  # typing
    if not isinstance(result, (Request, Response, Failure)):
        raise TypeError(f"Incorrect type: expected Request, Response or Failure, got {type(result)}: {result!r}")
    # downloader middleware can return requests (for example, redirects)
    # 是Request的情况,继续抓取
    if isinstance(result, Request):
        self.crawl(result)
        return None
    # enqueue_scrape
    d = self.scraper.enqueue_scrape(result, request, self.spider)
    d.addErrback(
        lambda f: logger.error(
            "Error while enqueuing downloader output",
            exc_info=failure_to_exc_info(f),
            extra={'spider': self.spider},
        )
    )
    return d
# scrapy/core/scraper.py#Scraper  enqueue_scrape(self, result, request, spider)

def enqueue_scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
    if self.slot is None:
        raise RuntimeError("Scraper slot not assigned")
    dfd = self.slot.add_response_request(result, request)
    def finish_scraping(_):
        self.slot.finish_response(result, request)
        self._check_if_closing(spider)
        self._scrape_next(spider)
        return _
    dfd.addBoth(finish_scraping)
    dfd.addErrback(
        lambda f: logger.error('Scraper bug processing %(request)s',
                               {'request': request},
                               exc_info=failure_to_exc_info(f),
                               extra={'spider': spider}))
    # _scrape_next
    self._scrape_next(spider)
    return dfd
# scrapy/core/scraper.py#Scraper  _scrape_next(self, result, request, spider)

def _scrape_next(self, spider: Spider) -> None:
    assert self.slot is not None  # typing
    while self.slot.queue:
        response, request, deferred = self.slot.next_response_request_deferred()
        # _scrape
        self._scrape(response, request, spider).chainDeferred(deferred)
# scrapy/core/scraper.py#Scraper  _scrape(self, result, request, spider)

def _scrape(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
    """
    Handle the downloaded response or failure through the spider callback/errback
    """
    if not isinstance(result, (Response, Failure)):
        raise TypeError(f"Incorrect type: expected Response or Failure, got {type(result)}: {result!r}")
    # _scrape2
    dfd = self._scrape2(result, request, spider)  # returns spider's processed output
    dfd.addErrback(self.handle_spider_error, request, result, spider)
    dfd.addCallback(self.handle_spider_output, request, result, spider)
    return dfd
# scrapy/core/scraper.py#Scraper  _scrape2(self, result, request, spider)

def _scrape2(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
    """
    Handle the different cases of request's result been a Response or a Failure
    """
    if isinstance(result, Response):
        # scrape_response:spider中间件process_spider_input、process_spider_output的处理
        return self.spidermw.scrape_response(self.call_spider, result, request, spider)
    else:  # result is a Failure
        # call_spider:失败回调
        dfd = self.call_spider(result, request, spider)
        return dfd.addErrback(self._log_download_errors, result, request, spider)

scrape_response

# scrapy/core/spidermw.py#SpiderMiddlewareManager  scrape_response(self, scrape_func, response, request, spider)

def scrape_response(self, scrape_func: ScrapeFunc, response: Response, request: Request,
                    spider: Spider) -> Deferred:
    def process_callback_output(result: Iterable) -> MutableChain:
        return self._process_callback_output(response, spider, result)
    def process_spider_exception(_failure: Failure) -> Union[Failure, MutableChain]:
        return self._process_spider_exception(response, spider, _failure)
    dfd = mustbe_deferred(self._process_spider_input, scrape_func, response, request, spider)
    dfd.addCallbacks(callback=process_callback_output, errback=process_spider_exception)
    return dfd

call_spider

# scrapy/core/scraper.py#Scraper  call_spider(self, result, request, spider)

def call_spider(self, result: Union[Response, Failure], request: Request, spider: Spider) -> Deferred:
    if isinstance(result, Response):
        if getattr(result, "request", None) is None:
            result.request = request
        # 回调自定义的callback或默认的继承 Spider 的 parse(self, response, **kwargs)
        callback = result.request.callback or spider._parse
        warn_on_generator_with_return_value(spider, callback)
        dfd = defer_succeed(result)
        dfd.addCallbacks(callback=callback, callbackKeywords=result.request.cb_kwargs)
    else:  # result is a Failure
        result.request = request
        # 失败情况的回调
        warn_on_generator_with_return_value(spider, request.errback)
        dfd = defer_fail(result)
        dfd.addErrback(request.errback)

Ⅲ.Re:Scrapy架构概述
https://元气码农少女酱.我爱你/c31a9553cda1/
作者
元气码农少女酱
发布于
2023年5月2日
许可协议