使用spark sql流时缺少avro自定义头

kfgdxczn  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(477)

在向Kafka发送avro genericord之前,会像这样插入一个头文件。

ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);

消耗记录。
使用spark streaming时,consumerrecord的标头是完整的。

KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> {
          rdd.foreach(record -> {

            System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
          });
        });
    ;

但是当使用sparksql流时,似乎缺少头。

StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() {

      ...

      @Override
      public void process(Row row) {
        String topic = (String) row.get(2);
        int partition = (int) row.get(3);
        long offset = (long) row.get(4);
        String key = new String((byte[]) row.get(0));
        byte[] value = (byte[]) row.get(1);

        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
            value);

        //I need the schema to decode the Avro!

      }
    }).start();

使用sparksql流式处理方法时,在哪里可以找到自定义头值?
版本:

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>

更新
我尝试了spark-sql 2.12和spark-sql-kafka-0-10 2.12的3.0.0-preview2。我补充道

.option("includeHeaders", true)

但我还是只能从行中得到这些列。

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
7ajki6be

7ajki6be1#

仅3.0支持结构化流媒体中的kafka头:https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html 请寻找 includeHeaders 更多细节。

相关问题