SQL Server SQLAlchemy MSSQL bulk upSert

k5ifujac  于 2023-03-17  发布在  其他
关注(0)|答案(1)|浏览(158)

I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).

NOTE This will run as an Azure function so if there is a better way I'm open to this

class issues(Base):
__tablename__ = "issues"
id = mapped_column('id', String(36), primary_key=True)
created = mapped_column ('created', DateTime())
updated = mapped_column ('updated', DateTime())
status = mapped_column('status', String(50))
severity = mapped_column('severity', String(10))
control_id = mapped_column('control_id', String(36))
entity_id = mapped_column('entity_id', String(36))

Example data

issueList = {
        issues( "1234", datetime.now(), datetime.now() , "Test", "Low8", "con123", "ent123"),
        issues( "5678", datetime.now(), datetime.now() , "Test", "Low9", "con123", "ent123"),
}

Currently I'm doing session.merge(issue) but it's slow and doesn't support bulk inserts, I've looked at https://stackoverflow.com/a/69968892/1697288 but have been getting errors as I was passing:

issueList = {
    "1234": { id: "1234", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
    "5678": { id: "5678", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
}
upsert_data (session, issueList, "issues", "id")

It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.

Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.

Thanks.

nr7wwzry

nr7wwzry1#

I ended up having writing my own function in the end:

import json
import logging
log = logging.getLogger(__name__)

Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).

Function, with logging an optional json dump (remove as needed)

def upsert_data(session, entries, model, key):
batch_size = 1000
if batch_size > len(entries):
    batch_size = len(entries)

if jsonDump:
  with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
      json.dump(entries, default=str, fp=f)

modelKey = getattr(model, key)

for i in range(0, len(entries), batch_size):

    log.info("Working Batch " + str(i) + "-" + str(i + batch_size))
    # Get the next batch
    batch = entries[i:i + batch_size]

    entries_to_update = []
    entries_to_insert = batch

    # Extract keys from batch
    keysinbatch = [entry.get(key) for entry in batch]

    existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()

    # Iterate results
    for entry in existing_records:
        # Process this batch
        dbIndex = getattr(entry, key)
        index = 0
        for x in entries_to_insert:
            if dbIndex == x[key]:
                # Matches item in DB, move this item to the update list
                # Remove from insert list
                entries_to_update.append(entries_to_insert.pop(index))
                break;
            index = index + 1

    # Completed lists get sqlalchemy to handle the operations
    # If any items left in entries insert them

    if jsonDump:
      with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
          json.dump(entries_to_insert, default=str, fp=f)
      
      with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
          json.dump(entries_to_update, default=str, fp=f)

    if len(entries_to_insert) > 0:
        log.info("Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
        session.execute(insert(model), entries_to_insert)

    # Update items if exist
    if len(entries_to_update) > 0:
        log.info("Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
        session.execute(update(model), entries_to_update)

    # Commit DB
    log.info("Issuing Database Commit")
    session.commit()

log.info("UpSert Complete")

相关问题