我的用例是使用java中的spark流将kafka写入hdfs接收器(由于历史原因使用java,由于缺乏对hdfs 3的支持而没有使用kafka connect。
代码看起来像
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("testApp")
.master("local")
.getOrCreate();
StreamingQuery query = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:39092")
.option("subscribe", "sample.prices")
.load()
.selectExpr("CAST(value AS STRING)")
.writeStream()
.outputMode("append")
.format("json")
.option("path", "/tmp/kafka2hdfsSink")
.option("checkpointLocation", "/tmp/kafka2hdfsCheckPoint")
.start();
query.awaitTermination();
}
问题是json文件的“value”字符串被转义了。喜欢
{"value" : "{\"_headers\": {\"schemaId\":
我猜是“cast(value as string)”导致了这种情况,除非存在一些版本兼容性问题(我使用的是java8、spark-sql 2.11-2.2.0、hadoop-3.2.1、spark 2.4.5-在mac上本地尝试)
如果不强制转换,则值是base64编码的
{"value":"eyJfaGVhZGVycy...=="}
我无法使用from\u json,因为它需要schema。有没有一种不用为json指定模式就可以做到这一点的方法?我能够使用jsonpath查询提取值,但它仍然以与上面相同的方式处理它
get_json_object(col("_value"), "$")
我不想使用format(“text”),因为我需要.json扩展名,可能需要更多的列,比如timestamp。
暂无答案!
目前还没有任何答案,快来回答吧!