SQL Server SQLAlchemy MSSQL bulk upSert

k5ifujac  于 2023-03-17  发布在  其他
关注(0)|答案(1)|浏览(179)

I've been trying various methods to bulk upSert an Azure SQL (MSSQL) database using SQLAlchemy 2.0, the source table is fairly large 2M records and I need to bulk upSert 100,000 records (most of which won't be there).

NOTE This will run as an Azure function so if there is a better way I'm open to this

  1. class issues(Base):
  2. __tablename__ = "issues"
  3. id = mapped_column('id', String(36), primary_key=True)
  4. created = mapped_column ('created', DateTime())
  5. updated = mapped_column ('updated', DateTime())
  6. status = mapped_column('status', String(50))
  7. severity = mapped_column('severity', String(10))
  8. control_id = mapped_column('control_id', String(36))
  9. entity_id = mapped_column('entity_id', String(36))

Example data

  1. issueList = {
  2. issues( "1234", datetime.now(), datetime.now() , "Test", "Low8", "con123", "ent123"),
  3. issues( "5678", datetime.now(), datetime.now() , "Test", "Low9", "con123", "ent123"),
  4. }

Currently I'm doing session.merge(issue) but it's slow and doesn't support bulk inserts, I've looked at https://stackoverflow.com/a/69968892/1697288 but have been getting errors as I was passing:

  1. issueList = {
  2. "1234": { id: "1234", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low16", "control_id": "con123", "entity_id": "ent123" },
  3. "5678": { id: "5678", "created": datetime.now(), "updated": datetime.now, "status": "Test", "severity": "Low9", "control_id": "con123", "entity_id": "ent123" },
  4. }
  5. upsert_data (session, issueList, "issues", "id")

It seems to be expecting a model not text for the 3rd params, so I wasn't sure what to send.

Any suggestions of a fast model would be great. Only this application will be inserting data so locking the db isn't an issue as long as the lock is cleared on error.

Thanks.

nr7wwzry

nr7wwzry1#

I ended up having writing my own function in the end:

  1. import json
  2. import logging
  3. log = logging.getLogger(__name__)

Make sure the model is defined as this will need to be passed, entities are a list of dictionaries (make sure the dictionary keys match your database field names).

Function, with logging an optional json dump (remove as needed)

  1. def upsert_data(session, entries, model, key):
  2. batch_size = 1000
  3. if batch_size > len(entries):
  4. batch_size = len(entries)
  5. if jsonDump:
  6. with open("json/" + model.__tablename__ + "_entries_preprocess.json", "w") as f:
  7. json.dump(entries, default=str, fp=f)
  8. modelKey = getattr(model, key)
  9. for i in range(0, len(entries), batch_size):
  10. log.info("Working Batch " + str(i) + "-" + str(i + batch_size))
  11. # Get the next batch
  12. batch = entries[i:i + batch_size]
  13. entries_to_update = []
  14. entries_to_insert = batch
  15. # Extract keys from batch
  16. keysinbatch = [entry.get(key) for entry in batch]
  17. existing_records = session.query(modelKey).filter(modelKey.in_(keysinbatch)).all()
  18. # Iterate results
  19. for entry in existing_records:
  20. # Process this batch
  21. dbIndex = getattr(entry, key)
  22. index = 0
  23. for x in entries_to_insert:
  24. if dbIndex == x[key]:
  25. # Matches item in DB, move this item to the update list
  26. # Remove from insert list
  27. entries_to_update.append(entries_to_insert.pop(index))
  28. break;
  29. index = index + 1
  30. # Completed lists get sqlalchemy to handle the operations
  31. # If any items left in entries insert them
  32. if jsonDump:
  33. with open("json/" + model.__tablename__ + "_entries_insert.json", "w") as f:
  34. json.dump(entries_to_insert, default=str, fp=f)
  35. with open("json/" + model.__tablename__ + "_entries_update.json", "w") as f:
  36. json.dump(entries_to_update, default=str, fp=f)
  37. if len(entries_to_insert) > 0:
  38. log.info("Handing over to sqlalchemy to INSERT " + str(len(entries_to_insert)) + " records")
  39. session.execute(insert(model), entries_to_insert)
  40. # Update items if exist
  41. if len(entries_to_update) > 0:
  42. log.info("Handing over to sqlalchemy to UPDATE " + str(len(entries_to_update)) + " records")
  43. session.execute(update(model), entries_to_update)
  44. # Commit DB
  45. log.info("Issuing Database Commit")
  46. session.commit()
  47. log.info("UpSert Complete")
展开查看全部

相关问题