postgresql在sparkscala中构建etl逻辑

nhaq1z21  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(269)

我是spark scala世界的新手。我正在尝试用同样的方法复制一个etl逻辑。所以本质上,我想创建动态代码,其中我需要从存在特定列的表中提取数据,对该列进行筛选,然后将其数据存储到azure blob。

  1. val url = "<Host Address>"
  2. val user = "<Username>"
  3. val pw = "<Password>"
  4. val driver = "org.postgresql.Driver"
  5. val sslfactory = "org.postgresql.ssl.NonValidatingFactory"
  6. var sql_lookup = " select * from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = 'Schema_name' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schemaname>'"
  7. var dfTbl = (spark.read
  8. .format("jdbc")
  9. .option("url", url)
  10. .option("ssl","true")
  11. .option("sslfactory",sslfactory)
  12. .option("user", user)
  13. .option("password", pw)
  14. .option("driver", driver)
  15. .option("query",sql_lookup)
  16. .load())
  17. var dfTbl_withCountry = (dfTbl.select(dfTbl.col("*")).filter(dfTbl.col( "country_table_column_name" ).isNotNull)).select("table_name")
  18. val dfTbl_wc = dfTbl_withCountry.collect().foreach(row => row.toSeq.foreach(col => (col)))
  19. for (table <- dfTbl_wc ){
  20. var sql = " select * from <Schemaname>."+s"${table}" + " where <Colume_name> = '<Value>'"
  21. var df = (spark.read
  22. .format("jdbc")
  23. .option("url", url)
  24. .option("ssl","true")
  25. .option("sslfactory",sslfactory)
  26. .option("user", user)
  27. .option("password", pw)
  28. .option("driver", driver)
  29. .option("query",sql)
  30. .load())
  31. var File_withCountry = df
  32. .coalesce(1)
  33. .write
  34. .format("com.databricks.spark.csv")
  35. .option("header","true")
  36. .option("delimiter", "~")
  37. .mode(SaveMode.Overwrite)
  38. .option("encoding", "UTF-8")
  39. .csv("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
  40. val partition_path = dbutils.fs.ls("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
  41. .filter(file=>file.name.startsWith("part"))(0).path
  42. dbutils.fs.cp(partition_path,"wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}"+".csv")
  43. dbutils.fs.rm (partition_path, recurse = true)
  44. }

下面是查询输出内部子查询

  1. SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema_name>' AND columns.column_name = 'Column_A'


“country\u table\u name”列中dataframe sql\u lookup的输出的每个表名,我都要提取它们。我将其输出存储在Dataframedftbl中。所以,在dataframedftbl\uwc中,我迭代dataframedftbl中的每一行。在这里,我使用for循环从dftbl\u wc的dataframe中的每一行中选择完整的数据
但是由于某些原因,这段代码在for循环部分不能正常工作。请帮帮我!

vxqlmq5t

vxqlmq5t1#

可以在dataframe中创建包含要运行的查询的新列。然后您可以选择查询列并将其转换为数组并在其上循环以获得最终的Dataframe,然后对其执行任何操作,如将其保存为表、Parquet文件、csv文件等。如果要单独保存每个表数据,则必须在下面的for循环中编写代码才能完成此操作。

  1. //source data
  2. val df = Seq(("Schemaname","Table1","Column_A"),("Schemaname","Table2","Column_A"),("Schemaname","Table3","Column_A"),("Schemaname","Table4","Column_A"),("Schemaname","Table5","Column_A"),("Schemaname","Table6","Column_A"))
  3. .toDF("country_table_schema","country_table_name","country_table_column_name")
  4. //add a column for the query that gets generated for each row
  5. import org.apache.spark.sql.functions._
  6. val df1 = df.withColumn("fulltableName",concat_ws(".",$"country_table_schema",$"country_table_name"))
  7. .withColumn("Query",concat_ws("",concat(lit("("),lit(" Select * from ")) , $"fulltableName" , lit("where column_name = "), concat($"country_table_column_name", lit(") a"))))
  8. .drop("fulltableName")
  9. import org.apache.spark.sql.DataFrame
  10. //convert it to array. I am using collect here but if you have large volume don't use collect otherwise it would crash your driver.
  11. val queryArray = df1.select("Query").rdd.collect()
  12. val rowscount = queryArray.length
  13. //create an array of dataframe to hold the details of each query output.
  14. var QueryArrayDF: Array[DataFrame] = new Array[DataFrame](rowscount)
  15. //loop through the query and create dataframe and add that to the array of dataframe
  16. for(i <- 0 to rowscount - 1){
  17. val df = (spark.read
  18. .format("jdbc")
  19. .option("url", url)
  20. .option("ssl","true")
  21. .option("sslfactory",sslfactory)
  22. .option("user", user)
  23. .option("password", pw)
  24. .option("driver", driver)
  25. .option("query",queryArray(i).toString().replace("[","").replace("]",""))
  26. .load())
  27. QueryArrayDF(i) = df
  28. }
  29. // now let's combine the dataframes, if we have more than one
  30. var CombinedDF = QueryArrayDF(0)
  31. for (i <- 1 to QueryArrayDF.length - 1) {
  32. CombinedDF = CombinedDF.union(QueryArrayDF(i))
  33. }

现在您可以根据需要保存组合的Dataframe。

展开查看全部
pgky5nke

pgky5nke2#

我对代码做了一些调整(基本上结合了我之前使用的代码和@nikunkakadiya共享的代码中的几行),它对我很有用。共享代码以供参考-

  1. val sql="select inf_schema.table_name, inf_schema.table_schema, country_table_column_name from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema>' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schema>'"
  2. var dfTbl = (spark.read
  3. .format("jdbc")
  4. .option("url", url)
  5. .option("ssl","true")
  6. .option("sslfactory",sslfactory)
  7. .option("user", user)
  8. .option("password", pw)
  9. .option("driver", driver)
  10. .option("query",sql)
  11. .load())
  12. val df = dfTbl.select(dfTbl.col("table_name")).where(dfTbl.col("country_table_column_name").isNotNull)
  13. println(df)
  14. import org.apache.spark.sql.DataFrame
  15. val df2 = (df.select("table_name").collect())
  16. val rows = df2.length
  17. for (i <- 0 to rows - 1){
  18. println(df2(i).mkString(","))
  19. val sql2 = "select * from <Schema>."+df2(i).mkString(",") + " where Column_A = '<Column_Value>'"
  20. println(sql2)
  21. var df_f = (spark.read
  22. .format("jdbc")
  23. .option("url", url)
  24. .option("ssl","true")
  25. .option("sslfactory",sslfactory)
  26. .option("user", user)
  27. .option("password", pw)
  28. .option("driver", driver)
  29. .option("query",sql2)
  30. .load())
  31. var File_withCountry = df_f
  32. .coalesce(1)
  33. .write
  34. .format("com.databricks.spark.csv")
  35. .option("header","true")
  36. .option("delimiter", "~")
  37. .mode(SaveMode.Overwrite)
  38. .option("encoding", "UTF-8")
  39. .csv("wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",") )
  40. val partition_path = dbutils.fs.ls("wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",")).filter(file=>file.name.startsWith("part"))(0).path
  41. dbutils.fs.cp(partition_path,"wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",")+".csv")
  42. dbutils.fs.rm (partition_path, recurse = true)
  43. }

如果有什么问题,请告诉我。
谢谢你们的支持。真的很感激。干杯!

展开查看全部

相关问题