Ⅵ.Re:Scrapy 之 Scheduler
Scheduler
Scheduler 组件会接收来自 Engine 的请求,它负责将请求存储到 持久化或非持久化的数据结构中。
它还可以获取这些请求,并在 Engine 要求下载下一个请求时,将其返回给 Engine。
默认 Scheduler
from_crawler
@classmethod
def from_crawler(cls: Type[SchedulerTV], crawler) -> SchedulerTV:
dupefilter_cls = load_object(crawler.settings['DUPEFILTER_CLASS'])
return cls(
# 创建去重过滤器实例:scrapy.dupefilters.RFPDupeFilter
dupefilter=create_instance(dupefilter_cls, crawler.settings, crawler),
# jobdir 用于持久化爬行状态的目录
jobdir=job_dir(crawler.settings),
# 持久化请求队列使用的类(文件):scrapy.squeues.PickleLifoDiskQueue
dqclass=load_object(crawler.settings['SCHEDULER_DISK_QUEUE']),
# 非持久化请求队列使用的类(内存):scrapy.squeues.LifoMemoryQueue
mqclass=load_object(crawler.settings['SCHEDULER_MEMORY_QUEUE']),
# 是否开启 Scheduler 的 Debug 模式
logunser=crawler.settings.getbool('SCHEDULER_DEBUG'),
# 爬虫统计收集器
stats=crawler.stats,
# 请求优先级队列使用的类:scrapy.pqueues.ScrapyPriorityQueue
pqclass=load_object(crawler.settings['SCHEDULER_PRIORITY_QUEUE']),
crawler=crawler,
)
Scrapy 默认已实现的 Queue
持久/非持久化队列:
PickleFifoDiskQueue
:pickle 实现的存储磁盘文件的 先进先出 队列PickleLifoDiskQueue
:pickle 实现的存储磁盘文件的 后进先出 队列MarshalFifoDiskQueue
:marshal 实现的存储磁盘文件的 先进先出 队列MarshalLifoDiskQueue
:marshal 实现的存储磁盘文件的 后进先出 队列FifoMemoryQueue
:基于内存的 先进先出 队列LifoMemoryQueue
:基于内存的 后进先出 队列
请求优先级队列:
ScrapyPriorityQueue
:默认请求优先级队列DownloaderAwarePriorityQueue
:并行抓不同的域时,这个效果好。但不能与CONCURRENT_REQUESTS_PER_IP
同时使用
enqueue_request
def enqueue_request(self, request: Request) -> bool:
# 没有dont_filter标识,再让dupefilter过滤掉的话,返回False
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
# 如果是False,引擎将发出require_dropped信号,并且不会在以后的时间里继续尝试调度该请求。
return False
# 尝试推入磁盘队列
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
# 没推入磁盘队列成功,则推入内存队列
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True
has_pending_requests
def has_pending_requests(self) -> bool:
# 如果 Scheduler 有排队的请求,则为真,否则为假。
return len(self) > 0
next_request
def next_request(self) -> Optional[Request]:
# 先从内存队列取 request,若为空,则从磁盘队列取 request。直至 has_pending_requests 为 False
request = self.mqs.pop()
if request is not None:
self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
else:
request = self._dqpop()
if request is not None:
self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
if request is not None:
self.stats.inc_value('scheduler/dequeued', spider=self.spider)
return request
重写 Scheduler
继承
BaseScheduler
至少要实现
enqueue_request
、has_pending_requests
和next_request
三个方法Scheduler 负责存储来自 Engine 的 Request,并调度 Request 反馈 Request 给 Engine。
Request 的来源,可以是:
- Spider:
start_requests
方法,start_urls
列表中生成的 request ,request callbacks - Spider 中间件:
process_spider_output
和process_spider_exception
方法 - Downloader 中间件:
process_request
,process_response
和process_exception
方法
- Spider:
Ⅵ.Re:Scrapy 之 Scheduler
https://元气码农少女酱.我爱你/ba134d557b8d/