我目前有一个foreachwriter,它处理来自kafka的类型化数据集。除每条消息的标题外,所有属性都正确地显示在foreachwriter中。要复制的代码位于底部。
该模式与kafka格式的datastreamreader完全相同。
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
标题清楚地显示在正在打印的数据集列中。
+------------+--------+-------+-----------------------+----------------------------------+
|topic |key |value |timestamp |headers |
+------------+--------+-------+-----------------------+----------------------------------+
|controlTopic| |TEXT |2020-11-24 16:07:33.09 |[[SAMPLE, TEXT]] |
|controlTopic| |2Header|2020-11-24 16:43:46.755|[[HEADER1, VAL1], [HEADER2, VAL2]]|
+------------+--------+-------+-----------------------+----------------------------------+
适当数量的keyvalue对象甚至出现在对象的header属性中,但它们的值为null。例如,第一条消息将显示列表中的一个元素(左边的图像),第二条消息将显示列表中的两个元素(右边的图像)。
代码如下:
Kafka消息.java
public class KafkaMessage {
public String topic;
public String key;
public String value;
public String timestamp;
public List<KV> headers;
public KafkaMessage() {
}
public KafkaMessage(String topic, String key, String value, String timestamp, List<KV> headers) {
this.topic = topic;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = headers;
}
}
kv. java
public class KV {
public String key;
public byte[] value;
public KV() {
}
public KV(String key, byte[] value) {
this.key = key;
this.value = value;
}
}
mysparkwriter.java文件
public class MySparkWriter extends ForeachWriter<KafkaMessage> {
@Override
public boolean open(long partitionId, long epochId) {
// open connection, not needed in this example
return true;
}
@Override
public void process(KafkaMessage kafkaMessage) {
// here headers will not be null, if there are headers
List<KV> headers = kafkaMessage.headers;
for (KV kv : headers) {
System.out.println(kv.key != null); // kv.key will always be null, will only print in local mode
}
}
@Override
public void close(Throwable errorOrNull) {
// close connection, not needed in this example
}
}
java注意这是唯一一个省略了一些代码的类,您只需要创建一个spark上下文并调用startspark()
public class MySparkApp {
...
public StreamingQuery startSpark() throws TimeoutException {
// Get Input
DataStreamReader dsReader = this.sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("includeHeaders", true)
.option("subscribe", "controlTopic");
Dataset<KafkaMessage> messages = dsReader.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)", "headers")
.as(ExpressionEncoder.javaBean(KafkaMessage.class));
// debug prints
messages.writeStream()
.format("console")
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.option("truncate", "false")
.start();
messages.printSchema();
// Start Streaming
return messages
.writeStream()
.trigger(Trigger.ProcessingTime("2 seconds"))
.foreach(this.sparkWriter) // this is a MySparkWriter instance
.option("checkpointLocation", "/checkpoint/"+ sparkSession.sparkContext().applicationId())
.start();
}
...
}
暂无答案!
目前还没有任何答案,快来回答吧!