Scrapy-Redis源码解读

喜欢ヅ旅行 2022-02-04 09:57 268阅读 0赞

在上一章《Scrapy-Redis入门实战》中,我们在一个普通的Scrapy项目的settings.py文件中仅额外增加了如下几个配置就使项目实现了基于Redis的Requests请求过滤和Items持久化两大功能。

  1. ######################################################
  2. ##############下面是Scrapy-Redis相关配置################
  3. ######################################################
  4. # 指定Redis的主机名和端口
  5. REDIS_HOST = 'localhost'
  6. REDIS_PORT = 6379
  7. # 调度器启用Redis存储Requests队列
  8. SCHEDULER = "scrapy_redis.scheduler.Scheduler"
  9. # 确保所有的爬虫实例使用Redis进行重复过滤
  10. DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
  11. # 将Requests队列持久化到Redis,可支持暂停或重启爬虫
  12. SCHEDULER_PERSIST = True
  13. # Requests的调度策略,默认优先级队列
  14. SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
  15. # 将爬取到的items保存到Redis 以便进行后续处理
  16. ITEM_PIPELINES = {
  17. 'scrapy_redis.pipelines.RedisPipeline': 300
  18. }

本文将通过解读Scrapy-Redis源码来进一步认识一下其中涉及的几个组件。

RedisPipeline

scrapy_redis.pipelines.RedisPipeline 用来将序列化后的Item持久化到Redis中。

主要源码如下,Scrapy-Redis正是通过其中的_process_item()方法来将序列化的Item保存到Redis列表的。

  1. from scrapy.utils.misc import load_object
  2. from scrapy.utils.serialize import ScrapyJSONEncoder
  3. from twisted.internet.threads import deferToThread
  4. from . import connection, defaults
  5. default_serialize = scrapy.utils.serialize.ScrapyJSONEncoder().encode
  6. class RedisPipeline(object):
  7. """将序列化后的Item保存到Redis 列表或者队列中
  8. 相关配置
  9. --------
  10. REDIS_ITEMS_KEY : str
  11. 保存Items 到 Redis 哪个key,默认 %(spider)s:items
  12. REDIS_ITEMS_SERIALIZER : str
  13. 所使用的序列化方法,默认 scrapy.utils.serialize.ScrapyJSONEncoder().encode
  14. """
  15. def __init__(self, server,
  16. key=defaults.PIPELINE_KEY,
  17. serialize_func=default_serialize):
  18. """初始化 pipeline
  19. 参数
  20. ----------
  21. server : StrictRedis
  22. Redis 客户端实例
  23. key : str
  24. 保存Items 到 Redis 哪个key,默认 %(spider)s:items
  25. serialize_func : callable
  26. 所使用的序列化方法,默认 scrapy.utils.serialize.ScrapyJSONEncoder().encode
  27. """
  28. self.server = server
  29. self.key = key
  30. self.serialize = serialize_func
  31. @classmethod
  32. def from_settings(cls, settings): # 从 settings.py 读取配置
  33. params = {
  34. 'server': connection.from_settings(settings),
  35. }
  36. if settings.get('REDIS_ITEMS_KEY'):
  37. params['key'] = settings['REDIS_ITEMS_KEY']
  38. if settings.get('REDIS_ITEMS_SERIALIZER'):
  39. params['serialize_func'] = load_object(
  40. settings['REDIS_ITEMS_SERIALIZER']
  41. )
  42. return cls(**params)
  43. @classmethod
  44. def from_crawler(cls, crawler): # 从 settings.py 读取配置
  45. return cls.from_settings(crawler.settings)
  46. def process_item(self, item, spider): # 另起一个线程来保存Item
  47. return deferToThread(self._process_item, item, spider)
  48. def _process_item(self, item, spider): # 保存Item
  49. key = self.item_key(item, spider) # 获取保存Item的Redis key
  50. data = self.serialize(item) # 将Item序列化
  51. self.server.rpush(key, data) # 将序列化后的 Item保存到Redis key
  52. return item
  53. def item_key(self, item, spider):
  54. """根据spider名称生成用于存放Items的 Redis key
  55. 通过重写这个方法来使用其他的key,可以使用item 和/或者 spider 来生成你想要的 key
  56. 默认 spider.name:items
  57. """
  58. return self.key % {'spider': spider.name}

RFPDupeFilter

scrapy_redis.dupefilter.RFPDupeFilter 是一个基于Redis的请求去重过滤器,它为Scheduler调度器提供了为Request生成指纹和判断Request是否重复等方法。

主要源码如下,重要部分已经添加上注释,其中 request_fingerprint()用来为Request生成指纹,request_seen()用来判断Request是否重复。

  1. import logging
  2. import time
  3. from scrapy.dupefilters import BaseDupeFilter
  4. from scrapy.utils.request import request_fingerprint
  5. from . import defaults
  6. from .connection import get_redis_from_settings
  7. logger = logging.getLogger(__name__)
  8. # TODO: Rename class to RedisDupeFilter.
  9. class RFPDupeFilter(BaseDupeFilter):
  10. """基于Redis的请求去重过滤器.
  11. Scrapy 的 scheduler调度器也可以使用该类
  12. """
  13. logger = logger
  14. def __init__(self, server, key, debug=False):
  15. """初始化过滤器
  16. 参数
  17. ----------
  18. server : redis.StrictRedis
  19. Redis server 实例
  20. key : str
  21. 保存Requests的指纹 到 Redis 哪个key
  22. debug : bool, optional
  23. 是否在日志中记录被过滤的请求
  24. """
  25. self.server = server
  26. self.key = key
  27. self.debug = debug
  28. self.logdupes = True
  29. @classmethod
  30. def from_settings(cls, settings):
  31. """根据给定的配置返回一个RFPDupeFilter实例
  32. 单独使用时,默认会使用 dupefilter:<timestamp> 作为key
  33. 当使用 scrapy_redis.scheduler.Scheduler 时,默认使用 %(spider)s:dupefilter 作为key
  34. 参数
  35. ----------
  36. settings : scrapy.settings.Settings
  37. 返回
  38. -------
  39. 一个RFPDupeFilter实例
  40. """
  41. server = get_redis_from_settings(settings)
  42. # 这个过滤器创建的key是一次性的,只能作为一个单独的过滤器和scrapy的默认scheduler 一起使用
  43. # 如果scrapy 在open() 方法中传入了spider 作为参数,此处可不必设置key
  44. # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
  45. key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
  46. debug = settings.getbool('DUPEFILTER_DEBUG')
  47. return cls(server, key=key, debug=debug)
  48. @classmethod
  49. def from_crawler(cls, crawler):
  50. """根据给定的crawler返回一个RFPDupeFilter实例
  51. 参数
  52. ----------
  53. crawler : scrapy.crawler.Crawler
  54. 返回
  55. -------
  56. 一个RFPDupeFilter实例
  57. """
  58. return cls.from_settings(crawler.settings)
  59. def request_seen(self, request):
  60. """判断一个Request请求之前是否见到过
  61. 参数
  62. ----------
  63. request : scrapy.http.Request
  64. 返回
  65. -------
  66. bool
  67. """
  68. fp = self.request_fingerprint(request) # 生成该请求的唯一指纹
  69. added = self.server.sadd(self.key, fp) # 返回添加成功的值的数量,如果该值在集合中已经存在则返回 0
  70. return added == 0
  71. def request_fingerprint(self, request):
  72. """生成给定请求的唯一指纹
  73. 参数
  74. ----------
  75. request : scrapy.http.Request
  76. 返回
  77. -------
  78. str
  79. """
  80. return request_fingerprint(request)
  81. def close(self, reason=''):
  82. """爬虫关闭时清除数据,供 Scrapy 的 scheduler 使用
  83. Parameters
  84. ----------
  85. reason : str, optional
  86. """
  87. self.clear()
  88. def clear(self):
  89. """清空指纹数据"""
  90. self.server.delete(self.key)
  91. def log(self, request, spider):
  92. """在日志中记录给定的请求
  93. 参数
  94. ----------
  95. request : scrapy.http.Request
  96. spider : scrapy.spiders.Spider
  97. """
  98. if self.debug:
  99. msg = "Filtered duplicate request: %(request)s"
  100. self.logger.debug(msg, {'request': request}, extra={'spider': spider})
  101. elif self.logdupes:
  102. msg = ("Filtered duplicate request %(request)s"
  103. " - no more duplicates will be shown"
  104. " (see DUPEFILTER_DEBUG to show all duplicates)")
  105. self.logger.debug(msg, {'request': request}, extra={'spider': spider})
  106. self.logdupes = False

生成Request指纹的详细过程如下:

  1. _fingerprint_cache = weakref.WeakKeyDictionary()
  2. def request_fingerprint(request, include_headers=None):
  3. """
  4. 生成 Request 的指纹
  5. Request 的指纹是一个用来唯一识别这个Request所指向资源的哈希值,例如下面这两个URL:
  6. http://www.example.com/query?id=111&cat=222
  7. http://www.example.com/query?cat=222&id=111
  8. 尽管这两个URL 写法上有所区别,但它们指向的资源是相同的 (二者的响应内容是一样的),所以它们的指纹应当相同
  9. 对于那些只能由经过身份验证的用户访问的页面,大多数的网站都会使用cookie 来存储SESSION ID
  10. 这会在Request中增加一部分随机内容并且与请求的资源无关,因此cookie在计算指纹时应该被忽略
  11. 默认情况下,scrapy在计算指纹的时候也会忽略其他的请求头
  12. 如果你想要在计算指纹时包含上指定的请求头可以使用 include_headers 参数传入你想要包含的请求头列表
  13. """
  14. if include_headers:
  15. include_headers = tuple(to_bytes(h.lower())
  16. for h in sorted(include_headers))
  17. cache = _fingerprint_cache.setdefault(request, {})
  18. if include_headers not in cache:
  19. fp = hashlib.sha1()
  20. fp.update(to_bytes(request.method))
  21. fp.update(to_bytes(canonicalize_url(request.url)))
  22. fp.update(request.body or b'')
  23. if include_headers:
  24. for hdr in include_headers:
  25. if hdr in request.headers:
  26. fp.update(hdr)
  27. for v in request.headers.getlist(hdr):
  28. fp.update(v)
  29. cache[include_headers] = fp.hexdigest()
  30. return cache[include_headers]

从Request指纹的生成过程不难看出,默认情况下,Request的指纹是对request.method、request.url和request.body经过SHA1加密后转换成的十六进制字符串。

Scheduler

scrapy_redis.scheduler.Scheduler是一个基于Redis的 scheduler调度器,借助于RedisPipeline和RFPDupeFilter可实现Request队列持久化和Request重复过滤等功能。

  1. import importlib
  2. import six
  3. from scrapy.utils.misc import load_object
  4. from . import connection, defaults
  5. # TODO: add SCRAPY_JOB support.
  6. class Scheduler(object):
  7. """基于Redis的 scheduler调度器
  8. 相关配置
  9. --------
  10. SCHEDULER_PERSIST : bool (default: False)
  11. 是否要将Request队列持久化到 Redis
  12. SCHEDULER_FLUSH_ON_START : bool (default: False)
  13. 在启动时是否要清空 Redis 队列
  14. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
  15. 如果没有收到消息,在关闭前等待多少秒
  16. SCHEDULER_QUEUE_KEY : str
  17. 调度器要处理的Requests队列所对应的Redis key,默认 spider.name:requests
  18. SCHEDULER_QUEUE_CLASS : str
  19. 调度器所使用的队列类,默认 scrapy_redis.queue.PriorityQueue
  20. SCHEDULER_DUPEFILTER_KEY : str
  21. 调度器要已请求过的Requests集合所对应的Redis key,默认 spider.name:dupefilter
  22. SCHEDULER_DUPEFILTER_CLASS : str
  23. 调度器所使用的过滤器类,默认 scrapy_redis.dupefilter.RFPDupeFilter
  24. SCHEDULER_SERIALIZER : str
  25. 调度器的序列化器
  26. """
  27. def __init__(self, server,
  28. persist=False,
  29. flush_on_start=False,
  30. queue_key=defaults.SCHEDULER_QUEUE_KEY,
  31. queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
  32. dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
  33. dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
  34. idle_before_close=0,
  35. serializer=None):
  36. """初始化调度器
  37. 参数
  38. ----------
  39. server : Redis
  40. Redis server 实例
  41. persist : bool
  42. 是否要将Request队列持久化到 Redis ,默认 False,即爬虫关闭时会清空Request队列
  43. flush_on_start : bool
  44. 在启动时是否要清空 Redis 队列 ,默认 False
  45. queue_key : str
  46. 调度器要处理的Requests队列所对应的Redis key,默认 spider.name:requests
  47. queue_cls : str
  48. 调度器所使用的队列类,默认 scrapy_redis.queue.PriorityQueue
  49. dupefilter_key : str
  50. 调度器要已请求过的Requests集合所对应的Redis key,默认 spider.name:dupefilter
  51. dupefilter_cls : str
  52. 调度器所使用的过滤器类,默认 scrapy_redis.dupefilter.RFPDupeFilter
  53. idle_before_close : int
  54. 如果没有收到消息,在关闭前等待多少秒
  55. """
  56. if idle_before_close < 0:
  57. raise TypeError("idle_before_close cannot be negative")
  58. self.server = server
  59. self.persist = persist
  60. self.flush_on_start = flush_on_start
  61. self.queue_key = queue_key
  62. self.queue_cls = queue_cls
  63. self.dupefilter_cls = dupefilter_cls
  64. self.dupefilter_key = dupefilter_key
  65. self.idle_before_close = idle_before_close
  66. self.serializer = serializer
  67. self.stats = None
  68. def __len__(self): # 返回Requests队列的长度
  69. return len(self.queue)
  70. @classmethod
  71. def from_settings(cls, settings): # 根据给定的配置返回一个Scheduler实例
  72. kwargs = {
  73. 'persist': settings.getbool('SCHEDULER_PERSIST'),
  74. 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
  75. 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
  76. }
  77. # If these values are missing, it means we want to use the defaults.
  78. optional = {
  79. # TODO: Use custom prefixes for this settings to note that are
  80. # specific to scrapy-redis.
  81. 'queue_key': 'SCHEDULER_QUEUE_KEY',
  82. 'queue_cls': 'SCHEDULER_QUEUE_CLASS',
  83. 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
  84. # We use the default setting name to keep compatibility.
  85. 'dupefilter_cls': 'DUPEFILTER_CLASS',
  86. 'serializer': 'SCHEDULER_SERIALIZER',
  87. }
  88. for name, setting_name in optional.items():
  89. val = settings.get(setting_name)
  90. if val:
  91. kwargs[name] = val
  92. # Support serializer as a path to a module.
  93. if isinstance(kwargs.get('serializer'), six.string_types):
  94. kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
  95. server = connection.from_settings(settings)
  96. # Ensure the connection is working.
  97. server.ping()
  98. return cls(server=server, **kwargs)
  99. @classmethod
  100. def from_crawler(cls, crawler): # 根据给定的crawler返回一个Scheduler实例
  101. instance = cls.from_settings(crawler.settings)
  102. # FIXME: for now, stats are only supported from this constructor
  103. instance.stats = crawler.stats
  104. return instance
  105. def open(self, spider): # Scheduler 开启
  106. self.spider = spider
  107. try:
  108. self.queue = load_object(self.queue_cls)(
  109. server=self.server,
  110. spider=spider,
  111. key=self.queue_key % {'spider': spider.name},
  112. serializer=self.serializer,
  113. )
  114. except TypeError as e:
  115. raise ValueError("Failed to instantiate queue class '%s': %s",
  116. self.queue_cls, e)
  117. try:
  118. self.df = load_object(self.dupefilter_cls)(
  119. server=self.server,
  120. key=self.dupefilter_key % {'spider': spider.name},
  121. debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
  122. )
  123. except TypeError as e:
  124. raise ValueError("Failed to instantiate dupefilter class '%s': %s",
  125. self.dupefilter_cls, e)
  126. if self.flush_on_start:
  127. self.flush()
  128. # notice if there are requests already in the queue to resume the crawl
  129. if len(self.queue):
  130. spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
  131. def close(self, reason): # Scheduler 关闭
  132. """
  133. 若未开启持久化,则清空数据
  134. """
  135. if not self.persist:
  136. self.flush()
  137. def flush(self): # 清空数据
  138. self.df.clear() # 清空已请求过的Requests集合
  139. self.queue.clear() # 清空Requests队列
  140. def enqueue_request(self, request): # 将Request添加到队列
  141. """
  142. 若开启过滤,并且之前见过这个Request,返回False
  143. 否则,将Request添加到队列
  144. """
  145. if not request.dont_filter and self.df.request_seen(request):
  146. self.df.log(request, self.spider)
  147. return False
  148. if self.stats:
  149. self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
  150. self.queue.push(request)
  151. return True
  152. def next_request(self): # 获取下一个要处理的请求
  153. block_pop_timeout = self.idle_before_close
  154. request = self.queue.pop(block_pop_timeout)
  155. if request and self.stats:
  156. self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
  157. return request
  158. def has_pending_requests(self): # 是否还有待处理的请求
  159. return len(self) > 0

PriorityQueue

scrapy_redis.queue.PriorityQueue是一个优先级队列,优先级越高越先处理,其内部是通过 Redis 有序集合进行实现的。

  1. class PriorityQueue(Base):
  2. """Per-spider priority queue abstraction using redis' sorted set"""
  3. def __len__(self):
  4. """Return the length of the queue"""
  5. return self.server.zcard(self.key)
  6. def push(self, request):
  7. """Push a request"""
  8. data = self._encode_request(request)
  9. score = -request.priority
  10. # We don't use zadd method as the order of arguments change depending on
  11. # whether the class is Redis or StrictRedis, and the option of using
  12. # kwargs only accepts strings, not bytes.
  13. self.server.execute_command('ZADD', self.key, score, data)
  14. def pop(self, timeout=0):
  15. """
  16. Pop a request
  17. timeout not support in this queue class
  18. """
  19. # use atomic range/remove using multi/exec
  20. pipe = self.server.pipeline()
  21. pipe.multi()
  22. pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
  23. results, count = pipe.execute()
  24. if results:
  25. return self._decode_request(results[0])

此外,Scrapy-Redis还提供了FifoQueue先进先出和LifoQueue后进先出两个队列。

  1. class FifoQueue(Base):
  2. """先进先出队列"""
  3. def __len__(self):
  4. """返回队列的长度"""
  5. return self.server.llen(self.key)
  6. def push(self, request):
  7. """压入一个请求"""
  8. self.server.lpush(self.key, self._encode_request(request))
  9. def pop(self, timeout=0):
  10. """弹出一个请求"""
  11. if timeout > 0:
  12. data = self.server.brpop(self.key, timeout)
  13. if isinstance(data, tuple):
  14. data = data[1]
  15. else:
  16. data = self.server.rpop(self.key)
  17. if data:
  18. return self._decode_request(data)
  19. class LifoQueue(Base):
  20. """后进先出队列"""
  21. def __len__(self):
  22. """返回栈的长度"""
  23. return self.server.llen(self.key)
  24. def push(self, request):
  25. """压入一个请求"""
  26. self.server.lpush(self.key, self._encode_request(request))
  27. def pop(self, timeout=0):
  28. """弹出一个请求"""
  29. if timeout > 0:
  30. data = self.server.blpop(self.key, timeout)
  31. if isinstance(data, tuple):
  32. data = data[1]
  33. else:
  34. data = self.server.lpop(self.key)
  35. if data:
  36. return self._decode_request(data)

发表评论

表情:
评论列表 (有 0 条评论,268人围观)

还没有评论,来说两句吧...

相关阅读

    相关 spring解读

    Spring技术内幕 深入解析Spring架构与设计原理(一)引子 我打算用这个帖子,把自己在这个过程中的一些心得,特别是对Spring新的理解,记录下来。使用这个帖子的标

    相关 ThreadLocal解读

    1. 背景 ThreadLocal源码解读,网上面早已经泛滥了,大多比较浅,甚至有的连基本原理都说的很有问题,包括百度搜索出来的第一篇高访问量博文,说ThreadLoca

    相关 CyclicBarrier解读

    CyclicBarrier 本意是循环栅栏,顾名思义,它可以循环使用。它是一个同步辅助器,它允许一个或多个线程阻塞等待直到其他线程执行完一系列的操作之后到达一个共同的屏障