使用sparkstream将无模式的json写入hdfs

t2a7ltrp  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(259)

我的用例是使用java中的spark流将kafka写入hdfs接收器(由于历史原因使用java,由于缺乏对hdfs 3的支持而没有使用kafka connect。
代码看起来像

public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = SparkSession.builder()
                .appName("testApp")
                .master("local")
                .getOrCreate();

        StreamingQuery query = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:39092")
                .option("subscribe", "sample.prices")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .writeStream()
                .outputMode("append")
                .format("json")
                .option("path", "/tmp/kafka2hdfsSink")
                .option("checkpointLocation", "/tmp/kafka2hdfsCheckPoint")
                .start();

        query.awaitTermination();
    }

问题是json文件的“value”字符串被转义了。喜欢

{"value" : "{\"_headers\": {\"schemaId\":

我猜是“cast(value as string)”导致了这种情况,除非存在一些版本兼容性问题(我使用的是java8、spark-sql 2.11-2.2.0、hadoop-3.2.1、spark 2.4.5-在mac上本地尝试)
如果不强制转换,则值是base64编码的

{"value":"eyJfaGVhZGVycy...=="}

我无法使用from\u json,因为它需要schema。有没有一种不用为json指定模式就可以做到这一点的方法?我能够使用jsonpath查询提取值,但它仍然以与上面相同的方式处理它

get_json_object(col("_value"), "$")

我不想使用format(“text”),因为我需要.json扩展名,可能需要更多的列,比如timestamp。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题