我是flink的新手,目前正在尝试构建一个数据管道,从kafka主题中获取xml文档,并将其写入关系数据库。
波乔
public class MyClass{
public String firstAttribute;
public String secondAttribute;
public List<Item> items;
public MyClass(){}
}
public String generateSqlString(String tableName){
...
return sqlString;
}
public class Item{
public String name;
public String value;
public String type;
public Item(){}
}
流媒体应用程序
StreamingExecutionEnvironment env = StreamingExecutionEnvironment.getExecutionEnvironment();
...
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(consumerTopic,new SimpleStringSchema(),consumerProperties);
DataStream<String> stream = env.addSource(consumer);
DataStreamSink<MyClass> myClassStream = stream
.map(new ParseJsonMapFunction())
.addSink(JdbcSink.sink(
??.generateSqlString(tableName), // At this point i want to access the instance of MyClass to
(ps,t)->{
ps.setString(1,t.firstAttribute);
ps.setString(2,t.secondAttribute);
int counter = 3;
for(Item item : t.items){
ps.setObject(counter,item.name);
counter++;
ps.setObject(counter,item.value);
counter++;
ps.setObject(counter,item.type);
counter++;
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
.withDriverName(driverName)
.withUserName(user)
.withPassword(password)
.build()));
env.execute("Flink Streaming Sample");
我的问题是我不知道如何访问myclass的示例来调用函数generatesqlstring。
暂无答案!
目前还没有任何答案,快来回答吧!