Flink消费Kafka写入Mysql

x33g5p2x  于2020-09-08 发布在 Flink  
字(2.2k)|赞(0)|评价(0)|浏览(1507)

本文介绍消费Kafka的消息实时写入Mysql

  1. maven新增依赖:
  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.39</version>
  5. </dependency>

2.重写RichSinkFunction,实现一个Mysql Sink

  1. public class MysqlSink extends
  2. RichSinkFunction<Tuple3<Integer, String, Integer>> {
  3. private Connection connection;
  4. private PreparedStatement preparedStatement;
  5. String username = "";
  6. String password = "";
  7. String drivername = ""; //配置改成自己的配置
  8. String dburl = "";
  9. @Override
  10. public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
  11. Class.forName(drivername);
  12. connection = DriverManager.getConnection(dburl, username, password);
  13. String sql = "replace into table(id,num,price) values(?,?,?)"; //假设mysql 有3列 id,num,price
  14. preparedStatement = connection.prepareStatement(sql);
  15. preparedStatement.setInt(1, value.f0);
  16. preparedStatement.setString(2, value.f1);
  17. preparedStatement.setInt(3, value.f2);
  18. preparedStatement.executeUpdate();
  19. if (preparedStatement != null) {
  20. preparedStatement.close();
  21. }
  22. if (connection != null) {
  23. connection.close();
  24. }
  25. }
  26. }
  1. Flink主类
  1. public class MysqlSinkTest {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. Properties properties = new Properties();
  5. properties.setProperty("bootstrap.servers", "localhost:9092");
  6. // 1,abc,100 类似这样的数据,当然也可以是很复杂的json数据,去做解析
  7. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
  8. env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
  9. env.getConfig().setRestartStrategy(
  10. RestartStrategies.fixedDelayRestart(5, 5000));
  11. env.enableCheckpointing(2000);
  12. DataStream<String> stream = env
  13. .addSource(consumer);
  14. DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
  15. .map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
  16. String[] args1 = value.split(",");
  17. return new Tuple3<Integer, String, Integer>(Integer
  18. .valueOf(args1[0]), args1[1],Integer
  19. .valueOf(args1[2]));
  20. });
  21. sourceStream.addSink(new MysqlSink());
  22. env.execute("data to mysql start");
  23. }
  24. }

相关文章

最新文章

更多