Ⅶ.Re:Scrapy 之 Downloader
Downloader
Downloader 负责获取网页数据并交递响应给 Engine ,换言之,实际上是交给 Spiders
源码解析
scrapy/core/engine.py#ExecutionEngine _download(self, request: Request, spider: Spider) -> Deferred
# open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True) -->
# _next_request(self) -> None -->
# _next_request_from_scheduler(self) -> Optional[Deferred] -->
# _download(self, request: Request, spider: Spider) -> Deferred
def _download(self, request: Request, spider: Spider) -> Deferred:
assert self.slot is not None # typing
self.slot.add_request(request)
def _on_success(result: Union[Response, Request]) -> Union[Response, Request]:
if not isinstance(result, (Response, Request)):
raise TypeError(f"Incorrect type: expected Response or Request, got {type(result)}: {result!r}")
if isinstance(result, Response):
if result.request is None:
result.request = request
logkws = self.logformatter.crawled(result.request, result, spider)
if logkws is not None:
logger.log(*logformatter_adapter(logkws), extra={"spider": spider})
self.signals.send_catch_log(
signal=signals.response_received,
response=result,
request=result.request,
spider=spider,
)
return result
def _on_complete(_):
self.slot.nextcall.schedule()
return _
# 默认使用 'scrapy.core.downloader.Downloader' 作为 Downloader
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
scrapy/core/downloader/init.py#Downloader fetch(self, request, spider)
def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
# active 是 set集合,用于记录当前正在下载的 Request
self.active.add(request)
# 执行 Downloader Middlewares 中的逻辑
# _enqueue_request 请求队列
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)
scrapy/core/downloader/middleware.py#DownloaderMiddlewareManager(MiddlewareManager) download(self, download_func: Callable, request: Request, spider: Spider)
# 实现下载逻辑,返回deferred对象
def download(self, download_func: Callable, request: Request, spider: Spider):
@defer.inlineCallbacks
def process_request(request: Request):
for method in self.methods['process_request']:
method = cast(Callable, method)
response = yield deferred_from_coro(method(request=request, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
raise _InvalidOutput(
f"Middleware {method.__qualname__} must return None, Response or "
f"Request, got {response.__class__.__name__}"
)
if response:
return response
return (yield download_func(request=request, spider=spider))
@defer.inlineCallbacks
def process_response(response: Union[Response, Request]):
if response is None:
raise TypeError("Received None in process_response")
elif isinstance(response, Request):
return response
for method in self.methods['process_response']:
method = cast(Callable, method)
response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
if not isinstance(response, (Response, Request)):
raise _InvalidOutput(
f"Middleware {method.__qualname__} must return Response or Request, "
f"got {type(response)}"
)
if isinstance(response, Request):
return response
return response
@defer.inlineCallbacks
def process_exception(failure: Failure):
exception = failure.value
for method in self.methods['process_exception']:
method = cast(Callable, method)
response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
raise _InvalidOutput(
f"Middleware {method.__qualname__} must return None, Response or "
f"Request, got {type(response)}"
)
if response:
return response
return failure
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
scrapy/core/downloader/init.py#Downloader _enqueue_request(self, request, spider)
def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta[self.DOWNLOAD_SLOT] = key
def _deactivate(response):
slot.active.remove(request)
return response
slot.active.add(request)
self.signals.send_catch_log(signal=signals.request_reached_downloader,
request=request,
spider=spider)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred))
# 根据 slot的策略 处理队列中的请求
self._process_queue(spider, slot)
return deferred
scrapy/core/downloader/init.py#Downloader _process_queue(self, spider, slot)
def _process_queue(self, spider, slot):
from twisted.internet import reactor
if slot.latercall and slot.latercall.active():
return
# Delay queue processing if a download_delay is configured
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
request, deferred = slot.queue.popleft()
# 下载实现
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break
scrapy/core/downloader/init.py#Downloader _download(self, slot, request, spider)
def _download(self, slot, request, spider):
# The order is very important for the following deferreds. Do not change!
# 1. Create the download deferred
# 通过执行 self.handlers.download_request 实现具体下载逻辑
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
# 2. Notify response_downloaded listeners about the recent download
# before querying queue for next request
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded,
response=response,
request=request,
spider=spider)
return response
dfd.addCallback(_downloaded)
# 3. After response arrives, remove the request from transferring
# state to free up the transferring slot so it can be used by the
# following requests (perhaps those which came from the downloader
# middleware itself)
slot.transferring.add(request)
def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
self.signals.send_catch_log(signal=signals.request_left_downloader,
request=request,
spider=spider)
return _
return dfd.addBoth(finish_transferring)
Download Handlers 支持
# 默认支持的 Download Handlers
DOWNLOAD_HANDLERS_BASE = {
'data': 'scrapy.core.downloader.handlers.datauri.DataURIDownloadHandler',
'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler',
'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler',
'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}
Ⅶ.Re:Scrapy 之 Downloader
https://元气码农少女酱.我爱你/d58e7a860216/