使用Flink SQL向Kafka发出缓冲产生空指针异常

iovurdzv  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(226)

我有一个sql这样定义

  1. CREATE TABLE raw_table
  2. (
  3. headers VARCHAR,
  4. id VARCHAR,
  5. eventTimestamp TIMESTAMP_LTZ(3) NOT NULL,
  6. type VARCHAR,
  7. contentJson VARCHAR,
  8. WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS
  9. ) WITH (
  10. 'connector' = 'kafka',
  11. 'topic-pattern' = 'role__.+?',
  12. 'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
  13. 'properties.group.id' = 'role_local_1',
  14. 'scan.startup.mode' = 'earliest-offset',
  15. 'format' = 'json',
  16. 'properties.allow.auto.create.topics' = 'true',
  17. 'json.timestamp-format.standard' = 'ISO-8601',
  18. 'sink.parallelism' = '3',
  19. 'sink.buffer-flush.interval' = '10s',
  20. 'sink.buffer-flush.max-rows' = '1000'
  21. );
  22. create view ROLES_NORMALIZED as
  23. (
  24. select
  25. JSON_VALUE(contentJson, '$.id') as id,
  26. rr.type as type
  27. from raw_table rr
  28. );
  29. CREATE VIEW ROLES_UPSERTS_V1 AS
  30. (
  31. SELECT *
  32. FROM ROLES_NORMALIZED
  33. WHERE type in ('ROLE_CREATED', 'ROLE_UPDATED')
  34. );
  35. CREATE VIEW ROLES_DELETED_V1 AS
  36. (
  37. SELECT org,
  38. pod,
  39. tenantId,
  40. id,
  41. modified,
  42. modified as deleted,
  43. event_timestamp
  44. FROM ROLES_NORMALIZED
  45. WHERE type in ('ROLES_DELETED')
  46. );
  47. -------
  48. CREATE TABLE final_topic
  49. (
  50. event_timestamp TIMESTAMP_LTZ,
  51. id VARCHAR,
  52. name VARCHAR,
  53. deleted TIMESTAMP_LTZ,
  54. PRIMARY KEY (pod, org, id) NOT ENFORCED
  55. ) WITH (
  56. 'connector' = 'upsert-kafka',
  57. 'topic' = 'final_topic',
  58. 'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
  59. 'properties.group.id' = 'some-group-id',
  60. 'value.format' = 'json',
  61. 'key.format' = 'json',
  62. 'properties.allow.auto.create.topics' = 'true',
  63. 'properties.replication.factor' = '3',
  64. 'value.json.timestamp-format.standard' = 'ISO-8601',
  65. 'sink.parallelism' = '3'
  66. );
  67. INSERT INTO final_topic
  68. select
  69. GREATEST(r.event_timestamp, d.event_timestamp) as event_timestamp,
  70. r.id,
  71. r.name,
  72. d.deleted
  73. from ROLES_UPSERTS_V1 r
  74. LEFT JOIN ROLES_DELETED_V1 d
  75. ON r.id = d.id;

字符串
当我在接收器上使用'sink.buffer-flush.interval' = '10s''sink.buffer-flush.max-rows' = '1000'这些选项时,我在类ReducingUpsertWriter中的(145)wrappedContext.setTimestamp(value.f1);行上看到了空点异常。发生这种情况是因为context对象timestamp()方法返回null,这里是https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L85。我使用eventTimestamp TIMESTAMP_LTZ(3) NOT NULL,定义了事件时间属性,使用WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS定义了水印。
当使用flink SQL时,我还需要定义什么才能使context.timestamp()方法返回一个非空值?我使用的是Flink版本1.16.0。
在我发送到源主题的示例消息中,eventTimestamp不是空。
我明白了例外

  1. 2023-07-09 12:57:38
  2. java.io.IOException: Could not perform checkpoint 2 for operator SinkMaterializer[44] -> Sink: Writer (3/3)#1.
  3. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
  4. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
  5. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
  6. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
  7. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
  8. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
  9. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
  10. at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
  11. at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
  12. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
  13. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
  14. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
  15. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
  16. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
  17. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
  18. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
  19. at java.base/java.lang.Thread.run(Thread.java:829)
  20. Caused by: java.lang.NullPointerException
  21. at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145)
  22. at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:90)
  23. at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:167)
  24. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
  25. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
  26. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
  27. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  28. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
  29. at org.apache.flink


更多更新
1.当使用view转换源表数据时,似乎会出现这个问题。不知何故,记录的时间戳为空。
有没有办法解决这个问题?

lzfw57am

lzfw57am1#

这是一个在https://issues.apache.org/jira/browse/FLINK-25916下跟踪的bug-如果您将Flink Kafka Connector升级到3.0.0,问题将得到解决。

相关问题