将spark structured streaming dataframe中的记录写入mysql的问题

klr1opcd  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(744)

我使用下面的代码将spark streaming dataframe写入mqsql db。下面是kafka主题json数据格式和mysql表模式。列名和类型相同。但是我看不到mysql表中写的记录。表为空,没有记录。请建议。
Kafka主题数据表
ssingh@renltp2n073:/mnt/d/confluent-6.0.0/bin$。/kafka console consumer--主题sarvtopic--从一开始--引导服务器localhost:9092 {“id”:1,“firstname”:“james”,“middlename”:“,”lastname”:“smith”,“出生年份”:2018,“出生月份”:1,“gender”:“m”,“salary”:3000}{“id”:2,“firstname”:“michael”,“middlename”:“rose”,“lastname”:“,”出生年份:2010,“出生月份”:3,“性别”:“m”,“工资”:4000}

  1. import pyspark
  2. from pyspark.sql import SparkSession
  3. spark = SparkSession \
  4. .builder \
  5. .appName("SSKafka") \
  6. .getOrCreate()
  7. dsraw = spark \
  8. .readStream \
  9. .format("kafka") \
  10. .option("kafka.bootstrap.servers", "localhost:9092") \
  11. .option("subscribe", "sarvtopic") \
  12. .option("startingOffsets", "earliest") \
  13. .load()
  14. ds = dsraw.selectExpr("CAST(value AS STRING)")
  15. dsraw.printSchema()
  16. from pyspark.sql.types import StructField, StructType, StringType,LongType
  17. from pyspark.sql.functions import *
  18. custom_schema = StructType([
  19. StructField("id", LongType(), True),
  20. StructField("firstname", StringType(), True),
  21. StructField("middlename", StringType(), True),
  22. StructField("lastname", StringType(), True),
  23. StructField("dob_year", StringType(), True),
  24. StructField("dob_month", LongType(), True),
  25. StructField("gender", StringType(), True),
  26. StructField("salary", LongType(), True),
  27. ])
  28. Person_details_df2 = ds\
  29. .select(from_json(col("value"), custom_schema).alias("Person_details"))
  30. Person_details_df3 = Person_details_df2.select("Person_details.*")
  31. from pyspark.sql import DataFrameWriter
  32. def foreach_batch_function(df, epoch_id):
  33. Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka", user='root',password='root$1234')
  34. pass
  35. query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()
  36. query
  37. Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>
  38. MYSQL table Schema:
  39. create table sparkkafka(
  40. id int,
  41. firstname VARCHAR(40) NOT NULL,
  42. middlename VARCHAR(40) NOT NULL,
  43. lastname VARCHAR(40) NOT NULL,
  44. dob_year int(40) NOT NULL,
  45. dob_month int(40) NOT NULL,
  46. gender VARCHAR(40) NOT NULL,
  47. salary int(40) NOT NULL,
  48. PRIMARY KEY (id)
  49. );
z6psavjg

z6psavjg1#

我假设person\u details\u df3是您的流式Dataframe,您的spark版本高于2.4.0版本。
要使用foreachbatch api,请编写以下代码:

  1. db_target_properties = {"user":"xxxx", "password":"yyyyy"}
  2. def foreach_batch_function(df, epoch_id):
  3. df.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', table="sparkkafka", properties=db_target_properties)
  4. pass
  5. query = Person_details_df3.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start()
  6. query.awaitTermination()

相关问题