如何使用pyspark中的foreach来编写Kafka主题?

mnemlml8  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(283)

我正在尝试通过插入在每行上创建的优化日志 foreach &希望存储到Kafka主题中,如下所示-

def refine(df):
    log = df.value
    event_logs = json.dumps(get_event_logs(log)) #A function to refine the row/log
    pdf = pd.DataFrame({"value": event_logs}, index=[0])

    spark = SparkSession.builder.appName("myAPP").getOrCreate() 
    df = spark.createDataFrame(pdf)

    query = df.selectExpr("CAST(value AS STRING)") \
       .write \
       .format("kafka") \
       .option("kafka.bootstrap.servers", "localhost:9092") \
       .option("topic", "intest") \
       .save()

我用下面的代码调用它。

query = streaming_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")  \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .foreach(refine)\
    .start()
query.awaitTermination()

但是 refine 函数无法获取我在提交代码时发送的kafka包。我相信刽子手没有访问Kafka包发送通过以下命令-

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

因为当我提交我的代码时,我得到了下面的错误信息,

pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

所以,我的问题是如何将数据放入Kafka的内部 foreach ? 作为一个附带问题,我想知道在里面创建另一个会话是否是个好主意 foreach ; 我得在里面重新申报 foreach 因为主驱动程序的退出会话无法在foreach中用于某些有关序列化的问题。
p、 s:如果我想把它沉到控制台上( ...format("console") )内部 foreach ,则可以正常工作。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题