我使用下面的代码将spark streaming dataframe写入mqsql db。下面是kafka主题json数据格式和mysql表模式。列名和类型相同。但是我看不到mysql表中写的记录。表为空,没有记录。请建议。
Kafka主题数据表
ssingh@renltp2n073:/mnt/d/confluent-6.0.0/bin$。/kafka console consumer--主题sarvtopic--从一开始--引导服务器localhost:9092 {“id”:1,“firstname”:“james”,“middlename”:“,”lastname”:“smith”,“出生年份”:2018,“出生月份”:1,“gender”:“m”,“salary”:3000}{“id”:2,“firstname”:“michael”,“middlename”:“rose”,“lastname”:“,”出生年份:2010,“出生月份”:3,“性别”:“m”,“工资”:4000}
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("SSKafka") \
.getOrCreate()
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sarvtopic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
dsraw.printSchema()
from pyspark.sql.types import StructField, StructType, StringType,LongType
from pyspark.sql.functions import *
custom_schema = StructType([
StructField("id", LongType(), True),
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
StructField("dob_year", StringType(), True),
StructField("dob_month", LongType(), True),
StructField("gender", StringType(), True),
StructField("salary", LongType(), True),
])
Person_details_df2 = ds\
.select(from_json(col("value"), custom_schema).alias("Person_details"))
Person_details_df3 = Person_details_df2.select("Person_details.*")
from pyspark.sql import DataFrameWriter
def foreach_batch_function(df, epoch_id):
Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka", user='root',password='root$1234')
pass
query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()
query
Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>
MYSQL table Schema:
create table sparkkafka(
id int,
firstname VARCHAR(40) NOT NULL,
middlename VARCHAR(40) NOT NULL,
lastname VARCHAR(40) NOT NULL,
dob_year int(40) NOT NULL,
dob_month int(40) NOT NULL,
gender VARCHAR(40) NOT NULL,
salary int(40) NOT NULL,
PRIMARY KEY (id)
);
1条答案
按热度按时间z6psavjg1#
我假设person\u details\u df3是您的流式Dataframe,您的spark版本高于2.4.0版本。
要使用foreachbatch api,请编写以下代码: