只需设置一个hadoop/kafka/spark,1个节点的演示环境。(.readStream)Kafka的信息和写作(.writeStream)将其写入Hadoop中的json文件。奇怪的是,在Hadoop的“output/test”目录下,我可以发现有一个创建的json文件,但只有在一个消息。所有新的消息从Kafka将不会更新json文件。但是我想把所有来自Kafka的信息都存储到一个json文件中。
我试过console(writeStream.format(“console”))或kafak(writeStream.format(“kafka”))的接收器类型,它工作正常。有什么建议或意见吗?下面是示例代码。
schema = StructType([StructField("stock_name",StringType(),True),
StructField("stock_value", DoubleType(), True),
StructField("timestamp", LongType(), True)])
line = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.1.1:9092") \
.option("subscribe", "fakestock") \
.option("startingOffsets","earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")
df=line.select(functions.from_json(functions.col("value")\
.cast("string"),schema).alias("parse_value"))\
.select("parse_value.stock_name","parse_value.stock_value","parse_value.timestamp")
query=df.writeStream\
.format("json")\
.option("checkpointLocation", "output/checkpoint")\
.option("path","output/test")\
.start()
1条答案
按热度按时间s71maibg1#
不可能将所有记录存储在一个文件中,Spark作为Kafka消费者定期轮询数据批,然后将这些数据批写入唯一的文件。
如果不知道主题中有多少条记录,就很难说输出路径中应该有多少条记录,但代码看起来不错。不过,与JSON相比,Parquet是更推荐的输出格式。
另外值得一提的是,Kafka Connect有一个HDFS插件,只需要写一个配置文件,没有Spark解析代码。