当检查点还原时,flink kafkaproducer以一次模式发送重复消息

ruoxqz4g  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(341)

我正在写一个测试flink两步提交的案例,下面是概述。
sink kafka 曾经是Kafka的制作人。 sink step mysql是否扩展 two step commit . sink compare mysql是否扩展 two step commit ,此接收器偶尔会抛出一个exeption来模拟检查点失败。
当checkpoint失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka使用者会读取上次成功的偏移量,kafka生产者会生成消息,即使他在这个checkpoint失败之前已经完成了。
在这种情况下如何避免重复消息?
谢谢你的帮助。
环境:
Flink1.9.1
java 1.8版
Kafka2.11
Kafka制作人代码:

  1. dataStreamReduce.addSink(new FlinkKafkaProducer<>(
  2. "flink_output",
  3. new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
  4. @Override
  5. public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
  6. UUID uuid = UUID.randomUUID();
  7. JSONObject jsonObject = new JSONObject();
  8. jsonObject.put("uuid", uuid.toString());
  9. jsonObject.put("key1", element.f0);
  10. jsonObject.put("key2", element.f1);
  11. jsonObject.put("key3", element.f2);
  12. jsonObject.put("indicate", element.f3);
  13. return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
  14. }
  15. },
  16. kafkaProps,
  17. FlinkKafkaProducer.Semantic.EXACTLY_ONCE
  18. )).name("sink kafka");

检查点设置:

  1. StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
  2. executionEnvironment.enableCheckpointing(10000);
  3. executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
  4. executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

mysql接收器:

  1. dataStreamReduce.addSink(
  2. new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
  3. Connection, Void>
  4. (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {
  5. int count = 0;
  6. Connection connection;
  7. @Override
  8. protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
  9. if (count > 10) {
  10. throw new Exception("compare test exception.");
  11. }
  12. PreparedStatement ps = transaction.prepareStatement(
  13. " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
  14. " values(?, ?, ?, ?, ?) " +
  15. " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
  16. );
  17. ps.setString(1, context.timestamp().toString());
  18. ps.setString(2, value.f0);
  19. ps.setString(3, value.f1);
  20. ps.setString(4, value.f1);
  21. ps.setLong(5, value.f3);
  22. ps.execute();
  23. ps.close();
  24. count += 1;
  25. }
  26. @Override
  27. protected Connection beginTransaction() throws Exception {
  28. LOGGER.error("compare in begin transaction");
  29. try {
  30. if (connection.isClosed()) {
  31. throw new Exception("mysql connection closed");
  32. }
  33. }catch (Exception e) {
  34. LOGGER.error("mysql connection is error: " + e.toString());
  35. LOGGER.error("reconnect mysql connection");
  36. String jdbcURI = "jdbc:mysql://";
  37. Class.forName("com.mysql.jdbc.Driver");
  38. Connection connection = DriverManager.getConnection(jdbcURI);
  39. connection.setAutoCommit(false);
  40. this.connection = connection;
  41. }
  42. return this.connection;
  43. }
  44. @Override
  45. protected void preCommit(Connection transaction) throws Exception {
  46. LOGGER.error("compare in pre Commit");
  47. }
  48. @Override
  49. protected void commit(Connection transaction) {
  50. LOGGER.error("compare in commit");
  51. try {
  52. transaction.commit();
  53. } catch (Exception e) {
  54. LOGGER.error("compare Commit error: " + e.toString());
  55. }
  56. }
  57. @Override
  58. protected void abort(Connection transaction) {
  59. LOGGER.error("compare in abort");
  60. try {
  61. transaction.rollback();
  62. } catch (Exception e) {
  63. LOGGER.error("compare abort error." + e.toString());
  64. }
  65. }
  66. @Override
  67. protected void recoverAndCommit(Connection transaction) {
  68. super.recoverAndCommit(transaction);
  69. LOGGER.error("compare in recover And Commit");
  70. }
  71. @Override
  72. protected void recoverAndAbort(Connection transaction) {
  73. super.recoverAndAbort(transaction);
  74. LOGGER.error("compare in recover And Abort");
  75. }
  76. })
  77. .setParallelism(1).name("sink compare");
wa7juj8i

wa7juj8i1#

我不太确定我是否正确理解了这个问题:
当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka生产者将读取上次成功的偏移量并生成消息,即使他在检查点失败之前已经完成了。
Kafka制作人没有读取任何数据。所以,我假设整个管道重新读取旧的偏移量并生成重复的偏移量。如果是这样的话,你需要了解Flink是如何确保一次。
创建定期检查点以在出现故障时保持一致的状态。
这些检查点包含检查点时最后一条成功读取记录的偏移量。
恢复后,flink将从上次成功检查点中存储的偏移量中重新读取所有记录。因此,将重放上次检查点和失败之间生成的相同记录。
重放的记录将恢复故障前的状态。
它将产生源于重放输入记录的重复输出。
接收器有责任确保没有副本有效地写入目标系统。
对于最后一点,有两个选项:
只输出数据,当一个检查点被写入时,这样目标中就不会出现有效的重复项。这种简单的方法非常通用(独立于接收器),但会将检查点间隔添加到延迟中。
让接收器对输出进行重复数据消除。
后者用于KafkaFlume。它使用kafka事务来消除重复数据。为了避免在使用者端重复,您需要确保它没有读取文档中提到的未提交数据。还要确保事务超时足够大,在失败和恢复之间不会丢弃数据。

相关问题