I have the below 2 tables already being ingested to my SQL Server from Salesforce. I have no control over the storage in Salesforce.
1. LiveTable contains the latest data. All changes are updated in the row itself.
------------------------------------------------
ID Date User Task Status
------------------------------------------------
1 15/09/2023 Mark Visualization Stage 4
2 06/08/2023 Jane Analytics Stage 3
3 10/09/2023 John Data Entry Stage 3
2. ChangeTable which contains all the changes done on LiveTable over time (for each field).
---------------------------------------------------
ID Date Field OldValue NewValue
---------------------------------------------------
1 02/09/2023 User Antony Mary
1 02/09/2023 Status Stage 1 Stage 2
1 05/09/2023 Status Stage 2 Stage 3
1 10/09/2023 User Mary Mark
1 15/09/2023 Task Data Entry Visualization
1 15/09/2023 Status Stage 3 Stage 4
2 02/08/2023 Status Stage 1 Stage 2
2 06/08/2023 Status Stage 2 Stage 3
2 06/08/2023 User Jacob Jane
Now, I want to create a history table which is similar to LiveTable to track all changes made to the LiveTable grouped by date.
Expected History table:
ID Date User Task Status
-----------------------------------------------
1 02/09/2023 Mary Data Entry Stage 2
1 05/09/2023 Mary Data Entry Stage 3
1 10/09/2023 Mark Data Entry Stage 3
1 15/09/2023 Mark Visualization Stage 4
2 02/08/2023 Jacob Visualization Stage 2
2 06/08/2023 Jane Visualization Stage 3
As you can see, I need to have a record for each change, but keep other columns as per the latest data available at that point. Multiple field changes could happen on the same day, which can be consolidated together grouped by date.
I can write SQL query with case
to achieve the below table to get the changes on a daily basis. But I don't want to do it this way as I have 500+ columns in the table.
ID Date User Task Status
-----------------------------------------------
1 02/09/2023 Mary Stage 2
1 05/09/2023 Stage 3
1 10/09/2023 Mark
1 15/09/2023 Visualization Stage 4
2 02/08/2023 Stage 2
2 06/08/2023 Jane Stage 3
What would be the best way to achieve this? I am using Azure DataFactory and Databricks in the ETL process. I can bring the logic into the ETL phase if recommended.
1条答案
按热度按时间w8rqjzmb1#
You can use below pyspark code for getting history table.
First, load the LiveTable and ChangeTable table into databricks from sql.
I assume you loaded data into 2 dataframes. liveTableDF and changeTableDF.
Get the changes by Date and ID.
Below code generates a list of dictionary further converted to dataframe and write to sql database.
Above searches the unknown column value in the previous latest dates
NewValue
value field.Next create dataframe out of this.
If you observe here getting partial old values. This is because there no previous record available of that particular column since i am just using sample data. I believe you have more number of record in ChangeTable.
So, i added below records to current one and create table.
After adding i got output for the sample record but not for the newly added records.
So until you have records for the changes done on the particular date you will get output.
Finally, save it as table into your sql database. Here, in the main logic filtering occurs many times on dataframe I recommend to do partition on
ID
andDate
column.