我正在尝试通过插入在每行上创建的优化日志 foreach
&希望存储到Kafka主题中,如下所示-
def refine(df):
log = df.value
event_logs = json.dumps(get_event_logs(log)) #A function to refine the row/log
pdf = pd.DataFrame({"value": event_logs}, index=[0])
spark = SparkSession.builder.appName("myAPP").getOrCreate()
df = spark.createDataFrame(pdf)
query = df.selectExpr("CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "intest") \
.save()
我用下面的代码调用它。
query = streaming_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.outputMode("append") \
.format("console") \
.foreach(refine)\
.start()
query.awaitTermination()
但是 refine
函数无法获取我在提交代码时发送的kafka包。我相信刽子手没有访问Kafka包发送通过以下命令-
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
因为当我提交我的代码时,我得到了下面的错误信息,
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
所以,我的问题是如何将数据放入Kafka的内部 foreach
? 作为一个附带问题,我想知道在里面创建另一个会话是否是个好主意 foreach
; 我得在里面重新申报 foreach
因为主驱动程序的退出会话无法在foreach中用于某些有关序列化的问题。
p、 s:如果我想把它沉到控制台上( ...format("console")
)内部 foreach
,则可以正常工作。
暂无答案!
目前还没有任何答案,快来回答吧!