如何将beamsql窗口查询与kafkaio集成?

wsewodh2  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(343)

首先,我们有一个json格式的kafka输入源:

  1. {"event_time": "2020-08-23 18:36:10", "word": "apple", "cnt": 1}
  2. {"event_time": "2020-08-23 18:36:20", "word": "banana", "cnt": 1}
  3. {"event_time": "2020-08-23 18:37:30", "word": "apple", "cnt": 2}
  4. {"event_time": "2020-08-23 18:37:40", "word": "apple", "cnt": 1}
  5. ... ...

我要做的是把每分钟每一个单词的总数加起来:

  1. +---------+----------+---------------------+
  2. | word | SUM(cnt) | window_start |
  3. +---------+----------+---------------------+
  4. | apple | 1 | 2020-08-23 18:36:00 |
  5. +---------+----------+---------------------+
  6. | banana | 1 | 2020-08-23 18:36:00 |
  7. +---------+----------+---------------------+
  8. | apple | 3 | 2020-08-23 18:37:00 |
  9. +---------+----------+---------------------+

所以这个例子非常适合下面的beam sql语句:

  1. SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start
  2. FROM t_count_stats
  3. GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)

下面是我当前使用beam的java sdk执行此流式sql查询的工作代码:

  1. import avro.shaded.com.google.common.collect.ImmutableMap;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.beam.sdk.Pipeline;
  5. import org.apache.beam.sdk.extensions.sql.SqlTransform;
  6. import org.apache.beam.sdk.io.kafka.KafkaIO;
  7. import org.apache.beam.sdk.options.PipelineOptions;
  8. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  9. import org.apache.beam.sdk.schemas.Schema;
  10. import org.apache.beam.sdk.transforms.DoFn;
  11. import org.apache.beam.sdk.transforms.MapElements;
  12. import org.apache.beam.sdk.transforms.ParDo;
  13. import org.apache.beam.sdk.transforms.SimpleFunction;
  14. import org.apache.beam.sdk.values.*;
  15. import org.apache.kafka.common.serialization.StringDeserializer;
  16. import org.apache.kafka.common.serialization.StringSerializer;
  17. import org.joda.time.format.DateTimeFormat;
  18. import org.joda.time.format.DateTimeFormatter;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. public class KafkaBeamSqlTest {
  22. private static DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
  23. public static void main(String[] args) {
  24. // create pipeline
  25. PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args)
  26. .withoutStrictParsing()
  27. .as(PipelineOptions.class);
  28. Pipeline pipeline = Pipeline.create(kafkaOption);
  29. // create kafka IO
  30. KafkaIO.Read<String, String> kafkaRead =
  31. KafkaIO.<String, String>read()
  32. .withBootstrapServers("127.0.0.1:9092")
  33. .withTopic("beamKafkaTest")
  34. .withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
  35. .withReadCommitted()
  36. .withKeyDeserializer(StringDeserializer.class)
  37. .withValueDeserializer(StringDeserializer.class)
  38. .commitOffsetsInFinalize();
  39. // read from kafka
  40. PCollection<KV<String, String>> messages = pipeline.apply(kafkaRead.withoutMetadata());
  41. // build input schema
  42. Schema inputSchema = Schema.builder()
  43. .addStringField("word")
  44. .addDateTimeField("event_time")
  45. .addInt32Field("cnt")
  46. .build();
  47. // convert kafka message to Row
  48. PCollection<Row> rows = messages.apply(ParDo.of(new DoFn<KV<String, String>, Row>(){
  49. @ProcessElement
  50. public void processElement(ProcessContext c) {
  51. String jsonData = c.element().getValue();
  52. // parse json
  53. JSONObject jsonObject = JSON.parseObject(jsonData);
  54. // build row
  55. List<Object> list = new ArrayList<>();
  56. list.add(jsonObject.get("word"));
  57. list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
  58. list.add(jsonObject.get("cnt"));
  59. Row row = Row.withSchema(inputSchema)
  60. .addValues(list)
  61. .build();
  62. System.out.println(row);
  63. // emit row
  64. c.output(row);
  65. }
  66. }))
  67. .setRowSchema(inputSchema);
  68. // sql query
  69. PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("t_count_stats"), rows)
  70. .apply(
  71. SqlTransform.query(
  72. "SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start\n" +
  73. "FROM t_count_stats\n" +
  74. "GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)"
  75. )
  76. );
  77. // sink results back to another kafka topic
  78. result.apply(MapElements.via(new SimpleFunction<Row, KV<String, String>>() {
  79. @Override
  80. public KV<String, String> apply(Row input) {
  81. System.out.println("result: " + input.getValues());
  82. return KV.of(input.getValue("word"), "result=" + input.getValues());
  83. }
  84. }))
  85. .apply(KafkaIO.<String, String>write()
  86. .withBootstrapServers("127.0.0.1:9092")
  87. .withTopic("beamPrint")
  88. .withKeySerializer(StringSerializer.class)
  89. .withValueSerializer(StringSerializer.class));
  90. // run
  91. pipeline.run();
  92. }
  93. }

我的问题是,当我运行这段代码并将一些消息输入kafka时,没有异常抛出,它已经从kafka接收到一些消息,但是我看不到它触发窗口聚合的进程。结果并没有像预期的那样(就像我之前展示的表格一样)。
那么beamsql目前是否支持无界kafka输入源的窗口语法呢?如果是的话,我现在的代码怎么了?如何调试和修复它?有没有任何将beamsql与kafkaio集成在一起的代码示例?
请帮帮我!谢谢!!

xxe27gdn

xxe27gdn1#

看来这是在https://lists.apache.org/thread.html/rea75c0eb665f90b8483e64bee96740ebb01942c606f065066c2ecc56%40%3cuser.beam.apache.org%3e

相关问题