Kafka、Flink和蒂德的新成员。假设我有三个源mysql表 s_a
, s_b
,和 s_c
,并希望将记录收集到目标tidb表 t_a
以及 t_b
实时的。Map规则是
`s_a` --> `t_a`
`s_b` union `s_c` ---> `t_b` with some transformation (e.g., field remapping).
我采用的解决方案是kafka+flink和tidb-sink,binlog更改订阅到kafka主题;flink使用主题并将转换后的结果写入tidb。我在flink代码部分遇到的问题是:
如何轻松地将kafka轮询的json字符串(包含operatin、tables的信息)恢复到不同类型的dto操作(如insert/creat)中 t_a
或者 t_b
). 我找到了一个叫做 Debezium
作为kafka&flink连接器,但它看起来需要源表和目标表之间的相等性。
如何编写转换 VKDataMapper
如果我有多个目标表。我很难定义 T
尽可能地 t_a
dto(数据传输对象),或 t_b
dto公司。
我现有的示例代码如下:
//主要程序。
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
//consume is FlinkkafkaConsumer. TopicFilter returns true.
environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
.addSink(new TidbSink());
try {
environment.execute();
} catch (Exception e) {
log.error("exception {}", e);
}
public class VKDataMapper implements MapFunction<String, T> {
@Override
public T map(String value) throws Exception {
//How T can represents both `T_a data DTO` `T_b`....,
return null;
}
}
1条答案
按热度按时间sz81bmfz1#
为什么不试试flink sql呢?这样,您只需在flink中创建一些表,然后通过sql定义任务,如:
请参阅中的一些示例https://github.com/littlefall/flink-tidb-rdw,随便问任何让你困惑的问题。