apache-kafka 从kafka读取,然后将Stream写入json文件,但在HDFS json文件中只找到一条消息

9nvpjoqh  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(163)

只需设置一个hadoop/kafka/spark,1个节点的演示环境。(.readStream)Kafka的信息和写作(.writeStream)将其写入Hadoop中的json文件。奇怪的是,在Hadoop的“output/test”目录下,我可以发现有一个创建的json文件,但只有在一个消息。所有新的消息从Kafka将不会更新json文件。但是我想把所有来自Kafka的信息都存储到一个json文件中。
我试过console(writeStream.format(“console”))或kafak(writeStream.format(“kafka”))的接收器类型,它工作正常。有什么建议或意见吗?下面是示例代码。

schema = StructType([StructField("stock_name",StringType(),True),
                     StructField("stock_value", DoubleType(), True),
                     StructField("timestamp", LongType(), True)])

line = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.1.1:9092") \
  .option("subscribe", "fakestock") \
  .option("startingOffsets","earliest")\
  .load()\
  .selectExpr("CAST(value AS STRING)")

df=line.select(functions.from_json(functions.col("value")\
  .cast("string"),schema).alias("parse_value"))\
  .select("parse_value.stock_name","parse_value.stock_value","parse_value.timestamp")
query=df.writeStream\
  .format("json")\
  .option("checkpointLocation", "output/checkpoint")\
  .option("path","output/test")\
  .start()
s71maibg

s71maibg1#

不可能将所有记录存储在一个文件中,Spark作为Kafka消费者定期轮询数据批,然后将这些数据批写入唯一的文件。
如果不知道主题中有多少条记录,就很难说输出路径中应该有多少条记录,但代码看起来不错。不过,与JSON相比,Parquet是更推荐的输出格式。
另外值得一提的是,Kafka Connect有一个HDFS插件,只需要写一个配置文件,没有Spark解析代码。

相关问题