Ⅷ.Re:Scrapy 之 Item Pipelines

Item Pipeline

它负责将抓取到的数据可进行校验准确性、检测重复数据、持久化数据到数据库中。

源码解析

scrapy/core/engine.py#ExecutionEngine open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True)

@inlineCallbacks
def open_spider(self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True):
    if self.slot is not None:
        raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
    logger.info("Spider opened", extra={'spider': spider})
    nextcall = CallLaterOnce(self._next_request)
    scheduler = create_instance(self.scheduler_cls, settings=None, crawler=self.crawler)
    start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
    self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
    self.spider = spider
    if hasattr(scheduler, "open"):
        yield scheduler.open(spider)
    # 开启scraper
    yield self.scraper.open_spider(spider)
    self.crawler.stats.open_spider(spider)
    yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
    self.slot.nextcall.schedule()
    self.slot.heartbeat.start(5)

scrapy/core/scraper.py#Scraper open_spider(self, spider: Spider)

@inlineCallbacks
def open_spider(self, spider: Spider):
    """Open the given spider for scraping and allocate resources for it"""
    self.slot = Slot(self.crawler.settings.getint('SCRAPER_SLOT_MAX_ACTIVE_SIZE'))
    # 创建 ItemPipelineManager 默认 ITEM_PROCESSOR = 'scrapy.pipelines.ItemPipelineManager'
    yield self.itemproc.open_spider(spider)

scrapy/pipelines/init.py#ItemPipelineManager(MiddlewareManager)

class ItemPipelineManager(MiddlewareManager):

    component_name = 'item pipeline'

    @classmethod
    def _get_mwlist_from_settings(cls, settings):
        # 默认 ITEM_PIPELINES = {} 约定需要自行在pipelines.py添加业务逻辑
        return build_component_list(settings.getwithbase('ITEM_PIPELINES'))

    def _add_middleware(self, pipe):
        super(ItemPipelineManager, self)._add_middleware(pipe)
        if hasattr(pipe, 'process_item'):
            self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item))

    def process_item(self, item, spider):
        return self._process_chain('process_item', item, spider)

Ⅷ.Re:Scrapy 之 Item Pipelines
https://元气码农少女酱.我爱你/27decef6ec3e/
作者
元气码农少女酱
发布于
2023年5月2日
许可协议