Ⅵ.Re:Scrapy 之 Scheduler

Scheduler

Scheduler 组件会接收来自 Engine 的请求,它负责将请求存储到 持久化或非持久化的数据结构中。

它还可以获取这些请求,并在 Engine 要求下载下一个请求时,将其返回给 Engine。

默认 Scheduler

from_crawler

@classmethod
def from_crawler(cls: Type[SchedulerTV], crawler) -> SchedulerTV:
    dupefilter_cls = load_object(crawler.settings['DUPEFILTER_CLASS'])
    return cls(
        # 创建去重过滤器实例:scrapy.dupefilters.RFPDupeFilter
        dupefilter=create_instance(dupefilter_cls, crawler.settings, crawler),
        # jobdir 用于持久化爬行状态的目录
        jobdir=job_dir(crawler.settings),
        # 持久化请求队列使用的类(文件):scrapy.squeues.PickleLifoDiskQueue
        dqclass=load_object(crawler.settings['SCHEDULER_DISK_QUEUE']),
        # 非持久化请求队列使用的类(内存):scrapy.squeues.LifoMemoryQueue
        mqclass=load_object(crawler.settings['SCHEDULER_MEMORY_QUEUE']),
        # 是否开启 Scheduler 的 Debug 模式
        logunser=crawler.settings.getbool('SCHEDULER_DEBUG'),
        # 爬虫统计收集器
        stats=crawler.stats,
        # 请求优先级队列使用的类:scrapy.pqueues.ScrapyPriorityQueue
        pqclass=load_object(crawler.settings['SCHEDULER_PRIORITY_QUEUE']),
        crawler=crawler,
    )

Scrapy 默认已实现的 Queue

持久/非持久化队列:

  • PickleFifoDiskQueue :pickle 实现的存储磁盘文件的 先进先出 队列
  • PickleLifoDiskQueue :pickle 实现的存储磁盘文件的 后进先出 队列
  • MarshalFifoDiskQueue :marshal 实现的存储磁盘文件的 先进先出 队列
  • MarshalLifoDiskQueue :marshal 实现的存储磁盘文件的 后进先出 队列
  • FifoMemoryQueue :基于内存的 先进先出 队列
  • LifoMemoryQueue :基于内存的 后进先出 队列

请求优先级队列:

  • ScrapyPriorityQueue :默认请求优先级队列
  • DownloaderAwarePriorityQueue :并行抓不同的域时,这个效果好。但不能与CONCURRENT_REQUESTS_PER_IP同时使用

enqueue_request

def enqueue_request(self, request: Request) -> bool:
    # 没有dont_filter标识,再让dupefilter过滤掉的话,返回False
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        # 如果是False,引擎将发出require_dropped信号,并且不会在以后的时间里继续尝试调度该请求。
        return False
    # 尝试推入磁盘队列
    dqok = self._dqpush(request)
    if dqok:
        self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
    else:
        # 没推入磁盘队列成功,则推入内存队列
        self._mqpush(request)
        self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
    self.stats.inc_value('scheduler/enqueued', spider=self.spider)
    return True

has_pending_requests

def has_pending_requests(self) -> bool:
    # 如果 Scheduler 有排队的请求,则为真,否则为假。
    return len(self) > 0

next_request

def next_request(self) -> Optional[Request]:
    # 先从内存队列取 request,若为空,则从磁盘队列取 request。直至 has_pending_requests 为 False
    request = self.mqs.pop()
    if request is not None:
        self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
    else:
        request = self._dqpop()
        if request is not None:
            self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
    if request is not None:
        self.stats.inc_value('scheduler/dequeued', spider=self.spider)
    return request

重写 Scheduler

  1. 继承BaseScheduler

  2. 至少要实现 enqueue_requesthas_pending_requestsnext_request 三个方法

  3. Scheduler 负责存储来自 Engine 的 Request,并调度 Request 反馈 Request 给 Engine。

    Request 的来源,可以是:

    • Spider:start_requests方法,start_urls列表中生成的 request ,request callbacks
    • Spider 中间件:process_spider_outputprocess_spider_exception 方法
    • Downloader 中间件: process_request, process_responseprocess_exception 方法

Ⅵ.Re:Scrapy 之 Scheduler
https://元气码农少女酱.我爱你/ba134d557b8d/
作者
元气码农少女酱
发布于
2023年5月2日
许可协议