apache-flink事务聚合

qacovj5a  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(427)

我一直在想如何写一个flink程序,从Kafka的3个主题中接收事件,总结今天、昨天和前天的事件。
因此,第一个问题是,如何对3天的事务求和,并将它们提取为json文件

5sxhfpxr

5sxhfpxr1#

如果您想阅读3个不同的Kafka主题或分区,您必须创建3个Kafka源
Flink关于Kafka消费者的文档

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer0 = new FlinkKafkaConsumer08[String](...)
val consumer1 = new FlinkKafkaConsumer08[String](...)
val consumer2 = new FlinkKafkaConsumer08[String](...)
consumer0.setStartFromGroupOffsets()
consumer1.setStartFromGroupOffsets()
consumer2.setStartFromGroupOffsets()

val stream0 = env.addSource(consumer0)
val stream1 = env.addSource(consumer1)
val stream2 = env.addSource(consumer2)

val unitedStream = stream0.union(stream1,stream2)

/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/

val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day

val reducedStream = unitedStream
    .keyBy("transactionId") // or any field that groups transactions in the same group
    .timeWindow(Time.days(windowSize),Time.days(windowStep))
    .map(transaction => {
        transaction.numberOfTransactions = 1
        transaction
    }).sum("numberOfTransactions");

val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson) 
// you can use a library like GSON for this
// or a scala string template

streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)

如果主题名称可以与常规表达式匹配,则只能创建一个kafka使用者,如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val consumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("day-[1-3]"),
  ..., //check documentation to know how to fill this field
  ...) //check documentation to know how to fill this field

val stream = env.addSource(consumer)

最常见的方法是将所有事务放在同一个kafka主题中,而不是放在不同的主题中,在这种情况下,代码会更简单,因为您只需要使用一个窗口来处理数据

Day 1 -> 11111 -\
Day 2 -> 22222 --> 1111122222333 -> Window -> 11111 22222 333 -> reduce operation per window partition
Day 3 -> 3333 --/                            |-----|-----|---|

示例代码

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer = new FlinkKafkaConsumer08[String](...)
consumer.setStartFromGroupOffsets()

val stream = env.addSource(consumer)

/* Logic to group transactions from 3 days */
/* I need more info, but it should be a Sliding or Fixed windows Keyed by the id of the transactions*/

val windowSize = 1 // number of days that the window use to group events
val windowStep = 1 // window slides 1 day

val reducedStream = stream
    .keyBy("transactionId") // or any field that groups transactions in the same group
    .timeWindow(Time.days(windowSize),Time.days(windowStep))
    .map(transaction => {
        transaction.numberOfTransactions = 1
        transaction
    }).sum("numberOfTransactions");

val streamFormatedAsJson = reducedStream.map(functionToParseDataAsJson) 
// you can use a library like GSON for this
// or a scala string template

streamFormatedAsJson.sink(yourFavoriteSinkToWriteYourData)

相关问题