使用Scrapy将缓冲项和大容量插入到Mysql

4xy9mtcn  于 2022-11-09  发布在  Mysql
关注(0)|答案(1)|浏览(188)

我正在把大量的项目和管道一个一个的插入到数据库中。这需要很长的时间。
因此,每当管道接收到一个项时,它就插入到数据库中。这不是一个聪明的方法。我正在寻找一种缓冲管道项的方法,例如,当我们接收到1000个项时,执行批量插入。我如何才能实现这一点?
我目前的管道:

def __init__(self,**kwargs):
    self.cnx = self.mysql_connect()

def open_spider(self, spider):
    print("spider open")

def process_item(self, item, spider):
    print("Saving item into db ...")
    self.save(dict(item))
    return item

def close_spider(self, spider):
    self.mysql_close()

def mysql_connect(self):
    try:
        return mysql.connector.connect(**self.conf)
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
        else:
            print(err)

def save(self, row): 
    cursor = self.cnx.cursor()

    cursor.execute("SELECT DISTINCT product_id FROM products;")
    existing_ids = [row[0] for row in cursor.fetchall()]
    create_query = ("INSERT INTO " + self.table + 
        "(rowid, date, listing_id, product_id, product_name, price, url) "
        "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")

    # Insert new row
    cursor.execute(create_query, row)
    lastRecordId = cursor.lastrowid

    # Make sure data is committed to the database
    self.cnx.commit()
    cursor.close()
    print("Item saved with ID: {}" . format(lastRecordId))

    product_id = row['product_id']
    if not product_id in existing_ids:
        create_query = ("INSERT INTO " + self.table2 + 
            "(product_rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        new_cursor = self.cnx.cursor()
        new_cursor.execute(create_query, row)
        lastRecordId = cursor.lastrowid

        self.cnx.commit()
        new_cursor.close()
        print("New Item saved with ID: {}" . format(lastRecordId))

def mysql_close(self):
    self.cnx.close()
tvokkenx

tvokkenx1#

您可以在管道构造函数中添加一个缓存,并执行一个方法,在处理项目时对其进行缓存,然后在缓存的行数达到某个阈值时运行批量保存。然后,当蜘蛛退出时,它可以批量保存该高速缓存中尚未保存的任何内容。
我在下面创建了一个例子来演示这个策略,但是我没有对sql代码做任何修改,所以代码仍然将项目一个接一个地保存到数据库中,只是同时保存所有项目。我确信,通过对sql代码做一些修改,性能还有进一步提高的空间。

class Pipeline:
    def __init__(self,**kwargs):
        self._rows = []   #  store rows temporarily
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            list(map(self.save, self._rows))
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache

    def cache_result(self, item):  # adds new row to cache
        self._rows.append(dict(item))
        self._cached_rows += 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows

    def process_item(self, item, spider):
        print("Saving item into db ...")
        self.cache_result(item)    # cache this item
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)

    def save(self, row):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        existing_ids = [row[0] for row in cursor.fetchall()]
        create_query = ("INSERT INTO " + self.table +
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")

        # Insert new row
        cursor.execute(create_query, row)
        lastRecordId = cursor.lastrowid

        # Make sure data is committed to the database
        self.cnx.commit()
        cursor.close()
        print("Item saved with ID: {}" . format(lastRecordId))

        product_id = row['product_id']
        if not product_id in existing_ids:
            create_query = ("INSERT INTO " + self.table2 +
                "(product_rowid, date, listing_id, product_id, product_name, price, url) "
                "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
            new_cursor = self.cnx.cursor()
            new_cursor.execute(create_query, row)
            lastRecordId = cursor.lastrowid
            self.cnx.commit()
            new_cursor.close()
            print("New Item saved with ID: {}" . format(lastRecordId))

相关问题