1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| class Slot(object): """Scraper slot (one per running spider)"""
MIN_RESPONSE_SIZE = 1024
def __init__(self, max_active_size=5000000): self.max_active_size = max_active_size self.queue = deque() self.active = set() self.active_size = 0 self.itemproc_size = 0 self.closing = None
def add_response_request(self, response, request): deferred = defer.Deferred() self.queue.append((response, request, deferred)) if isinstance(response, Response): self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE) else: self.active_size += self.MIN_RESPONSE_SIZE return deferred
def enqueue_scrape(self, response, request, spider): slot = self.slot dfd = slot.add_response_request(response, request) def finish_scraping(_): slot.finish_response(response, request) self._check_if_closing(spider, slot) self._scrape_next(spider, slot) return _ dfd.addBoth(finish_scraping) dfd.addErrback( lambda f: logger.error('Scraper bug processing %(request)s', {'request': request}, exc_info=failure_to_exc_info(f), extra={'spider': spider})) self._scrape_next(spider, slot) return dfd def _scrape_next(self, spider, slot): while slot.queue: response, request, deferred = slot.next_response_request_deferred() self._scrape(response, request, spider).chainDeferred(deferred)
def _scrape(self, response, request, spider): """Handle the downloaded response or failure through the spider callback/errback""" assert isinstance(response, (Response, Failure)) dfd = self._scrape2(response, request, spider) dfd.addErrback(self.handle_spider_error, request, response, spider) dfd.addCallback(self.handle_spider_output, request, response, spider) return dfd
def _scrape2(self, request_result, request, spider): """Handle the different cases of request's result been a Response or a Failure""" if not isinstance(request_result, Failure): return self.spidermw.scrape_response( self.call_spider, request_result, request, spider) else: dfd = self.call_spider(request_result, request, spider) return dfd.addErrback( self._log_download_errors, request_result, request, spider) def scrape_response(self, scrape_func, response, request, spider): fname = lambda f:'%s.%s' % ( six.get_method_self(f).__class__.__name__, six.get_method_function(f).__name__)
def process_spider_input(response): for method in self.methods['process_spider_input']: try: result = method(response=response, spider=spider) assert result is None, \ 'Middleware %s must returns None or ' \ 'raise an exception, got %s ' \ % (fname(method), type(result)) except: return scrape_func(Failure(), request, spider) return scrape_func(response, request, spider)
def process_spider_exception(_failure): exception = _failure.value for method in self.methods['process_spider_exception']: result = method(response=response, exception=exception, spider=spider) assert result is None or _isiterable(result), \ 'Middleware %s must returns None, or an iterable object, got %s ' % \ (fname(method), type(result)) if result is not None: return result return _failure
def process_spider_output(result): for method in self.methods['process_spider_output']: result = method(response=response, result=result, spider=spider) assert _isiterable(result), \ 'Middleware %s must returns an iterable object, got %s ' % \ (fname(method), type(result)) return result dfd = mustbe_deferred(process_spider_input, response) dfd.addErrback(process_spider_exception) dfd.addCallback(process_spider_output) return dfd
|