Scrapy:如何通过api将项目发送到站点

lrl1mhuk  于 2022-11-09  发布在  其他
关注(0)|答案(2)|浏览(150)

现在我的蜘蛛正在发送数据到我的网站以这种方式:

def parse_product(response,**cb_kwargs):
    item = {}
    item[url] = response.url
    data = {
        "source_id": 505,
        "token": f"{API_TOKEN}",
        "products": [item]
         }
    headers = {'Content-Type': 'application/json'}
    url = 'http://some.site.com/api/'
    requests.post(url=url, headers=headers, data=json.dumps(data))

是否有可能通过管道或中间件以某种方式实现这种设计,因为为每个spider指定是不方便?
p.s.数据(data)需要以json格式(json.dumps(data))发送,如果我制作了item = MyItemClass()类,会出现错误...

1bqhqjot

1bqhqjot1#

你也可以使用scrapy的Item类和item Field类,只要在调用json.dumps之前将它们转换为dict即可。
例如:

class Pipeline:

    def process_item(self, item, spider):
        data = dict(item)
        headers = {'Content-Type': 'application/json'}
        url = 'http://some.site.com/api/'
        requests.post(url=url, headers=headers, data=json.dumps(data))
        return item

如果你使用这个例子,它会在你从蜘蛛得到的每一件物品上调用它。记住在你的www.example.com文件中激活它settings.py。

9ceoxa92

9ceoxa922#

我发现了另一个额外的解决方案(在github上),也许有人会感兴趣...
pipeline.py

import json
import logging

import requests
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.defer import DeferredLock
from twisted.internet.threads import deferToThread

default_serialize = ScrapyJSONEncoder().encode

class HttpPostPipeline(object):
    settings = None
    items_buffer = []

    DEFAULT_HTTP_POST_PIPELINE_BUFFERED = False
    DEFAULT_HTTP_POST_PIPELINE_BUFFER_SIZE = 100

def __init__(self, url, headers=None, serialize_func=default_serialize):
    """Initialize pipeline.
    Parameters
    ----------
    url : StrictRedis
        Redis client instance.
    serialize_func : callable
        Items serializer function.
    """
    self.url = url
    self.headers = headers if headers else {}
    self.serialize_func = serialize_func
    self._lock = DeferredLock()

@classmethod
def from_crawler(cls, crawler):
    params = {
        'url': crawler.settings.get('HTTP_POST_PIPELINE_URL'),
    }
    if crawler.settings.get('HTTP_POST_PIPELINE_HEADERS'):
        params['headers'] = crawler.settings['HTTP_POST_PIPELINE_HEADERS']

    ext = cls(**params)
    ext.settings = crawler.settings

    return ext

def process_item(self, item, spider):
    if self.settings.get('HTTP_POST_PIPELINE_BUFFERED', self.DEFAULT_HTTP_POST_PIPELINE_BUFFERED):
        self._lock.run(self._process_items, item)
        return item
    else:
        return deferToThread(self._process_item, item, spider)

def _process_item(self, item, spider):
    data = self.serialize_func(item)
    requests.post(self.url, json=json.loads(data), headers=self.headers)
    return item

def _process_items(self, item):
    self.items_buffer.append(item)
    if len(self.items_buffer) >= int(self.settings.get('HTTP_POST_PIPELINE_BUFFER_SIZE',
                                                       self.DEFAULT_HTTP_POST_PIPELINE_BUFFER_SIZE)):
        deferToThread(self.send_items, self.items_buffer)
        self.items_buffer = []

def send_items(self, items):
    logging.debug("Sending batch of {} items".format(len(items)))

    serialized_items = [self.serialize_func(item) for item in items]
    requests.post(self.url, json=[json.loads(data) for data in serialized_items], headers=self.headers)

def close_spider(self, spider):
    if len(self.items_buffer) > 0:
        deferToThread(self.send_items, self.items_buffer)

相关问题