Scrapy最佳实践

oknrviil  于 2022-11-09  发布在  其他
关注(0)|答案(1)|浏览(204)

我正在使用Scrapy下载大量的数据。我使用默认的16个并发请求。如指南所示,我使用管道方法process_item在共享变量处收集数据。并在close_spider处将数据保存到SQL。如果我加载了太大的网站,我会丢失所有的系统内存。我应该如何避免这个问题?
现在我使用一个DB连接,它在open_spider方法中准备,我不能同时在每个process_item中使用它。

gwbalxhn

gwbalxhn1#

创建一个管道中的废弃项列表,一旦列表的大小大于N,就调用DB函数保存数据。下面是我的项目中100%工作的代码。请参见close_spider(),在spider关闭时,self.items中有可能包含少于N个项,因此当spider关闭时,self.items列表中的任何剩余数据也将保存在DB中。

from scrapy import signals

class YourPipeline(object):
    def __init__(self):
        self.items = []

    def process_item(self, item, spider):
        self.items.extend([ item ])
        if len(self.items) >= 50:
            self.insert_current_items(spider)
        return item

    def insert_current_items(self, spider):
        for item in self.items:
            update_query = ', '.join(["`" + key + "` = %s " for key, value in item.iteritems()])
            query = "SELECT asin FROM " + spider.tbl_name + " WHERE asin = %s LIMIT 1"
            spider.cursor.execute(query, (item['asin']))
            existing = spider.cursor.fetchone()
            if spider.cursor.rowcount > 0:
                query = "UPDATE " + spider.tbl_name + " SET " + update_query + ", date_update = CURRENT_TIMESTAMP WHERE asin = %s"
                update_query_vals = list(item.values())
                update_query_vals.extend([existing['YOUR_UNIQUE_COLUMN']])
                try:
                    spider.cursor.execute(query, update_query_vals)
                except Exception as e:
                    if 'MySQL server has gone away' in str(e):
                        spider.connectDB()
                        spider.cursor.execute(query, update_query_vals)
                    else:
                        raise e
            else:
                # This ELSE is likely never to get executed because we are not scraping ASINS from Amazon website, we just import ASINs into DB from another script
                try:
                    placeholders = ', '.join(['%s'] * len(item))
                    columns = ', '.join(item.keys())
                    query = "INSERT INTO %s ( %s ) VALUES ( %s )" % (spider.tbl_name, columns, placeholders)
                    spider.cursor.execute(query, item)
                except Exception as e:
                    if 'MySQL server has gone away' in str(e):
                        spider.connectDB()
                        spider.cursor.execute(query, item)
                    else:
                        raise e
        self.items = []

    def close_spider(self, spider):
        self.insert_current_items(spider)

相关问题