我正在把大量的项目和管道一个一个的插入到数据库中。这需要很长的时间。
因此,每当管道接收到一个项时,它就插入到数据库中。这不是一个聪明的方法。我正在寻找一种缓冲管道项的方法,例如,当我们接收到1000个项时,执行批量插入。我如何才能实现这一点?
我目前的管道:
def __init__(self,**kwargs):
self.cnx = self.mysql_connect()
def open_spider(self, spider):
print("spider open")
def process_item(self, item, spider):
print("Saving item into db ...")
self.save(dict(item))
return item
def close_spider(self, spider):
self.mysql_close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# Insert new row
cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
# Make sure data is committed to the database
self.cnx.commit()
cursor.close()
print("Item saved with ID: {}" . format(lastRecordId))
product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
new_cursor = self.cnx.cursor()
new_cursor.execute(create_query, row)
lastRecordId = cursor.lastrowid
self.cnx.commit()
new_cursor.close()
print("New Item saved with ID: {}" . format(lastRecordId))
def mysql_close(self):
self.cnx.close()
1条答案
按热度按时间tvokkenx1#
您可以在管道构造函数中添加一个缓存,并执行一个方法,在处理项目时对其进行缓存,然后在缓存的行数达到某个阈值时运行批量保存。然后,当蜘蛛退出时,它可以批量保存该高速缓存中尚未保存的任何内容。
我在下面创建了一个例子来演示这个策略,但是我没有对
sql
代码做任何修改,所以代码仍然将项目一个接一个地保存到数据库中,只是同时保存所有项目。我确信,通过对sql
代码做一些修改,性能还有进一步提高的空间。