在apachehudi表中编写sparkDataframe

up9lanfz  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(480)

我对apacehudi是一个新手,尝试使用sparkshell在hudi表中编写Dataframe。第一次输入时,我没有创建任何表,也没有以覆盖模式写入,所以我希望它会创建hudi表。

  1. spark-shell \
  2. --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
  3. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
  4. //Initialize a Spark Session for Hudi
  5. import org.apache.spark.sql.SaveMode
  6. import org.apache.spark.sql.SaveMode
  7. import org.apache.spark.sql.functions._
  8. import org.apache.hudi.DataSourceWriteOptions
  9. import org.apache.hudi.config.HoodieWriteConfig
  10. import org.apache.hudi.hive.MultiPartKeysValueExtractor
  11. import org.apache.spark.sql.SparkSession
  12. val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertMetastoreParquet", "false").getOrCreat ()
  13. //Write to a Hudi Dataset
  14. val inputDF = Seq(
  15. ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
  16. ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
  17. ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
  18. ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
  19. ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
  20. ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
  21. ).toDF("id", "creation_date", "last_update_time")
  22. val hudiOptions = Map[String,String](
  23. HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",
  24. DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
  25. DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  26. DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  27. DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  28. DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  29. DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",
  30. DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  31. DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
  32. // Upsert Data
  33. // Create a new DataFrame from the first row of inputDF with a different creation_date value
  34. val updateDF = inputDF.limit(1).withColumn("creation_date", lit("2014-01-01"))
  35. updateDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")
  36. while writing this write statement i m getting below error message.
  37. java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

有谁能指点一下我该怎么写这句话。

sqxo8psd

sqxo8psd1#

以下是pyspark中您的问题的工作示例:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import lit
  3. spark = (
  4. SparkSession.builder.appName("Hudi_Data_Processing_Framework")
  5. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  6. .config("spark.sql.hive.convertMetastoreParquet", "false")
  7. .config(
  8. "spark.jars.packages",
  9. "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
  10. )
  11. .getOrCreate()
  12. )
  13. input_df = spark.createDataFrame(
  14. [
  15. ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
  16. ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
  17. ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
  18. ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
  19. ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
  20. ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
  21. ],
  22. ("id", "creation_date", "last_update_time"),
  23. )
  24. hudi_options = {
  25. # ---------------DATA SOURCE WRITE CONFIGS---------------#
  26. "hoodie.table.name": "hudi_test",
  27. "hoodie.datasource.write.recordkey.field": "id",
  28. "hoodie.datasource.write.precombine.field": "last_update_time",
  29. "hoodie.datasource.write.partitionpath.field": "creation_date",
  30. "hoodie.datasource.write.hive_style_partitioning": "true",
  31. "hoodie.upsert.shuffle.parallelism": 1,
  32. "hoodie.insert.shuffle.parallelism": 1,
  33. "hoodie.consistency.check.enabled": True,
  34. "hoodie.index.type": "BLOOM",
  35. "hoodie.index.bloom.num_entries": 60000,
  36. "hoodie.index.bloom.fpp": 0.000000001,
  37. "hoodie.cleaner.commits.retained": 2,
  38. }
  39. # INSERT
  40. (
  41. input_df.write.format("org.apache.hudi")
  42. .options(**hudi_options)
  43. .mode("append")
  44. .save("/tmp/hudi_test")
  45. )
  46. # UPDATE
  47. update_df = input_df.limit(1).withColumn("creation_date", lit("2014-01-01"))
  48. (
  49. update_df.write.format("org.apache.hudi")
  50. .options(**hudi_options)
  51. .mode("append")
  52. .save("/tmp/hudi_test")
  53. )
  54. # REAL UPDATE
  55. update_df = input_df.limit(1).withColumn("last_update_time", lit("2016-01-01T13:51:39.340396Z"))
  56. (
  57. update_df.write.format("org.apache.hudi")
  58. .options(**hudi_options)
  59. .mode("append")
  60. .save("/tmp/hudi_test")
  61. )
  62. output_df = spark.read.format("org.apache.hudi").load(
  63. "/tmp/hudi_test/*/*"
  64. )
  65. output_df.show()

输出:

文件系统中的hudi表如下所示:

注意:由于您正在修改分区列(2015-01-01->2014-01-01),更新操作实际上创建了一个新分区并执行插入操作。你可以在输出中看到。
我提供了一个更新示例,它将上次更新时间更新为2016-01-01t13:51:39.340396z,实际上将分区2015-01-01中的id 100从2015-01-01t13:51:39.340396z更新为2016-01-01t13:51:39.340396z
更多示例可在hudi快速入门指南中找到

展开查看全部

相关问题