flink将xml流式传输到jdbc

fykwrbwg  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(242)

我是flink的新手,目前正在尝试构建一个数据管道,从kafka主题中获取xml文档,并将其写入关系数据库。
波乔

public class MyClass{
public String firstAttribute;
public String secondAttribute;
public List<Item> items;

public MyClass(){}
}
public String generateSqlString(String tableName){
   ...
   return sqlString;
}

public class Item{
public String name;
public String value;
public String type;

public Item(){}
}

流媒体应用程序

StreamingExecutionEnvironment env = StreamingExecutionEnvironment.getExecutionEnvironment();
...
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(consumerTopic,new SimpleStringSchema(),consumerProperties);
DataStream<String> stream = env.addSource(consumer);
DataStreamSink<MyClass> myClassStream = stream
.map(new ParseJsonMapFunction())
.addSink(JdbcSink.sink(
                      ??.generateSqlString(tableName), // At this point i want to access the instance of MyClass to 
                      (ps,t)->{
                      ps.setString(1,t.firstAttribute);
                      ps.setString(2,t.secondAttribute);
                      int counter = 3;
                      for(Item item : t.items){
                         ps.setObject(counter,item.name);
                         counter++;
                         ps.setObject(counter,item.value);
                         counter++;
                         ps.setObject(counter,item.type);
                         counter++;
                      }
                      },
                      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(driverName)
                        .withUserName(user)
                        .withPassword(password)
                        .build()));
env.execute("Flink Streaming Sample");

我的问题是我不知道如何访问myclass的示例来调用函数generatesqlstring。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题