kafka-flink:binlog到多dto的转换及flink中的转换方法

zsbz8rwp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(614)

Kafka、Flink和蒂德的新成员。假设我有三个源mysql表 s_a , s_b ,和 s_c ,并希望将记录收集到目标tidb表 t_a 以及 t_b 实时的。Map规则是

`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

我采用的解决方案是kafka+flink和tidb-sink,binlog更改订阅到kafka主题;flink使用主题并将转换后的结果写入tidb。我在flink代码部分遇到的问题是:
如何轻松地将kafka轮询的json字符串(包含operatin、tables的信息)恢复到不同类型的dto操作(如insert/creat)中 t_a 或者 t_b ). 我找到了一个叫做 Debezium 作为kafka&flink连接器,但它看起来需要源表和目标表之间的相等性。
如何编写转换 VKDataMapper 如果我有多个目标表。我很难定义 T 尽可能地 t_a dto(数据传输对象),或 t_b dto公司。
我现有的示例代码如下:
//主要程序。

StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());

        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }

 public class VKDataMapper implements MapFunction<String, T> {

    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }

}
sz81bmfz

sz81bmfz1#

为什么不试试flink sql呢?这样,您只需在flink中创建一些表,然后通过sql定义任务,如:

insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

请参阅中的一些示例https://github.com/littlefall/flink-tidb-rdw,随便问任何让你困惑的问题。

相关问题