Ⅶ.Re:Scrapy 之 Downloader

Downloader

Downloader 负责获取网页数据并交递响应给 Engine ,换言之,实际上是交给 Spiders

源码解析

scrapy/core/engine.py#ExecutionEngine _download(self, request: Request, spider: Spider) -> Deferred

# open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True) -->
# _next_request(self) -> None -->
# _next_request_from_scheduler(self) -> Optional[Deferred] -->
# _download(self, request: Request, spider: Spider) -> Deferred

def _download(self, request: Request, spider: Spider) -> Deferred:
    assert self.slot is not None  # typing
    self.slot.add_request(request)
    def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
        if not isinstance(result, (Response, Request)):
            raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {result!r}")
        if isinstance(result, Response):
            if result.request is None:
                result.request = request
            logkws = self.logformatter.crawled(result.request, result, spider)
            if logkws is not None:
                logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
            self.signals.send_catch_log(
                signal=signals.response_received,
                response=result,
                request=result.request,
                spider=spider,
            )
        return result
    def _on_complete(_):
        self.slot.nextcall.schedule()
        return _
    # 默认使用 'scrapy.core.downloader.Downloader' 作为 Downloader
    dwld = self.downloader.fetch(request, spider)
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)
    return dwld

scrapy/core/downloader/init.py#Downloader fetch(self, request, spider)

def fetch(self, request, spider):
    def _deactivate(response):
        self.active.remove(request)
        return response
    # active 是 set集合,用于记录当前正在下载的 Request
    self.active.add(request)
    # 执行 Downloader Middlewares 中的逻辑
    # _enqueue_request 请求队列
    dfd = self.middleware.download(self._enqueue_request, request, spider)
    return dfd.addBoth(_deactivate)

scrapy/core/downloader/middleware.py#DownloaderMiddlewareManager(MiddlewareManager) download(self, download_func: Callable, request: Request, spider: Spider)

# 实现下载逻辑,返回deferred对象
def download(self, download_func: Callable, request: Request, spider: Spider):
    @defer.inlineCallbacks
    def process_request(request: Request):
        for method in self.methods['process_request']:
            method = cast(Callable, method)
            response = yield deferred_from_coro(method(request=request, spider=spider))
            if response is not None and not isinstance(response, (Response, Request)):
                raise _InvalidOutput(
                    f"Middleware {method.__qualname__} must return None, Response or "
                    f"Request, got {response.__class__.__name__}"
                )
            if response:
                return response
        return (yield download_func(request=request, spider=spider))
    @defer.inlineCallbacks
    def process_response(response: Union[Response, Request]):
        if response is None:
            raise TypeError("Received None in process_response")
        elif isinstance(response, Request):
            return response
        for method in self.methods['process_response']:
            method = cast(Callable, method)
            response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
            if not isinstance(response, (Response, Request)):
                raise _InvalidOutput(
                    f"Middleware {method.__qualname__} must return Response or Request, "
                    f"got {type(response)}"
                )
            if isinstance(response, Request):
                return response
        return response
    @defer.inlineCallbacks
    def process_exception(failure: Failure):
        exception = failure.value
        for method in self.methods['process_exception']:
            method = cast(Callable, method)
            response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
            if response is not None and not isinstance(response, (Response, Request)):
                raise _InvalidOutput(
                    f"Middleware {method.__qualname__} must return None, Response or "
                    f"Request, got {type(response)}"
                )
            if response:
                return response
        return failure
    deferred = mustbe_deferred(process_request, request)
    deferred.addErrback(process_exception)
    deferred.addCallback(process_response)
    return deferred

scrapy/core/downloader/init.py#Downloader _enqueue_request(self, request, spider)

def _enqueue_request(self, request, spider):
    key, slot = self._get_slot(request, spider)
    request.meta[self.DOWNLOAD_SLOT] = key
    def _deactivate(response):
        slot.active.remove(request)
        return response
    slot.active.add(request)
    self.signals.send_catch_log(signal=signals.request_reached_downloader,
                                request=request,
                                spider=spider)
    deferred = defer.Deferred().addBoth(_deactivate)
    slot.queue.append((request, deferred))
    # 根据 slot的策略 处理队列中的请求
    self._process_queue(spider, slot)
    return deferred

scrapy/core/downloader/init.py#Downloader _process_queue(self, spider, slot)

def _process_queue(self, spider, slot):
    from twisted.internet import reactor
    if slot.latercall and slot.latercall.active():
        return
    # Delay queue processing if a download_delay is configured
    now = time()
    delay = slot.download_delay()
    if delay:
        penalty = delay - now + slot.lastseen
        if penalty > 0:
            slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
            return
    # Process enqueued requests if there are free slots to transfer for this slot
    while slot.queue and slot.free_transfer_slots() > 0:
        slot.lastseen = now
        request, deferred = slot.queue.popleft()
        # 下载实现
        dfd = self._download(slot, request, spider)
        dfd.chainDeferred(deferred)
        # prevent burst if inter-request delays were configured
        if delay:
            self._process_queue(spider, slot)
            break
scrapy/core/downloader/init.py#Downloader _download(self, slot, request, spider)
def _download(self, slot, request, spider):
    # The order is very important for the following deferreds. Do not change!
    # 1. Create the download deferred
    # 通过执行 self.handlers.download_request 实现具体下载逻辑
    dfd = mustbe_deferred(self.handlers.download_request, request, spider)
    # 2. Notify response_downloaded listeners about the recent download
    # before querying queue for next request
    def _downloaded(response):
        self.signals.send_catch_log(signal=signals.response_downloaded,
                                    response=response,
                                    request=request,
                                    spider=spider)
        return response
    dfd.addCallback(_downloaded)
    # 3. After response arrives, remove the request from transferring
    # state to free up the transferring slot so it can be used by the
    # following requests (perhaps those which came from the downloader
    # middleware itself)
    slot.transferring.add(request)
    def finish_transferring(_):
        slot.transferring.remove(request)
        self._process_queue(spider, slot)
        self.signals.send_catch_log(signal=signals.request_left_downloader,
                                    request=request,
                                    spider=spider)
        return _
    return dfd.addBoth(finish_transferring)
Download Handlers 支持
# 默认支持的 Download Handlers
DOWNLOAD_HANDLERS_BASE = {
    'data': 'scrapy.core.downloader.handlers.datauri.DataURIDownloadHandler',
    'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler',
    'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
    's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler',
    'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}

Ⅶ.Re:Scrapy 之 Downloader
https://元气码农少女酱.我爱你/d58e7a860216/
作者
元气码农少女酱
发布于
2023年5月2日
许可协议