spark结构化流kafka头属性为空

ippsafx7  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(245)

我目前有一个foreachwriter,它处理来自kafka的类型化数据集。除每条消息的标题外,所有属性都正确地显示在foreachwriter中。要复制的代码位于底部。
该模式与kafka格式的datastreamreader完全相同。

root
 |-- topic: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)

标题清楚地显示在正在打印的数据集列中。

+------------+--------+-------+-----------------------+----------------------------------+
|topic       |key     |value  |timestamp              |headers                           |
+------------+--------+-------+-----------------------+----------------------------------+
|controlTopic|        |TEXT   |2020-11-24 16:07:33.09 |[[SAMPLE, TEXT]]                  |
|controlTopic|        |2Header|2020-11-24 16:43:46.755|[[HEADER1, VAL1], [HEADER2, VAL2]]|
+------------+--------+-------+-----------------------+----------------------------------+

适当数量的keyvalue对象甚至出现在对象的header属性中,但它们的值为null。例如,第一条消息将显示列表中的一个元素(左边的图像),第二条消息将显示列表中的两个元素(右边的图像)。

代码如下:
Kafka消息.java

public class KafkaMessage {

    public String topic;

    public String key;

    public String value;

    public String timestamp;

    public List<KV> headers;

    public KafkaMessage() {
    }

    public KafkaMessage(String topic, String key, String value, String timestamp, List<KV> headers) {

        this.topic = topic;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = headers;
    }
}

kv. java

public class KV {
    public String key;
    public byte[] value;

    public KV() {
    }

    public KV(String key, byte[] value) {
        this.key = key;
        this.value = value;
    }
}

mysparkwriter.java文件

public class MySparkWriter extends ForeachWriter<KafkaMessage> {

    @Override
    public boolean open(long partitionId, long epochId) {
        // open connection, not needed in this example
        return true;
    }

    @Override
    public void process(KafkaMessage kafkaMessage) {
        // here headers will not be null, if there are headers
        List<KV> headers = kafkaMessage.headers;
        for (KV kv : headers) {
            System.out.println(kv.key != null); // kv.key will always be null, will only print in local mode
        }
    }

    @Override
    public void close(Throwable errorOrNull) {
        // close connection, not needed in this example
    }
}

java注意这是唯一一个省略了一些代码的类,您只需要创建一个spark上下文并调用startspark()

public class MySparkApp {

    ...

    public StreamingQuery startSpark() throws TimeoutException {
        // Get Input
        DataStreamReader dsReader = this.sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("includeHeaders", true)
                .option("subscribe", "controlTopic");

        Dataset<KafkaMessage> messages = dsReader.load()
                .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)", "headers")
                .as(ExpressionEncoder.javaBean(KafkaMessage.class));

        // debug prints
        messages.writeStream()
            .format("console")
            .outputMode("update")
            .trigger(Trigger.ProcessingTime("2 seconds"))
            .option("truncate", "false")
            .start();
        messages.printSchema();

        // Start Streaming
        return messages
                .writeStream()
                .trigger(Trigger.ProcessingTime("2 seconds"))
                .foreach(this.sparkWriter) // this is a MySparkWriter instance
                .option("checkpointLocation", "/checkpoint/"+ sparkSession.sparkContext().applicationId())
                .start();
    }

    ...

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题