使用Flink SQL向Kafka发出缓冲产生空指针异常

iovurdzv  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(214)

我有一个sql这样定义

CREATE TABLE raw_table
(
    headers     VARCHAR,
    id          VARCHAR,
    eventTimestamp  TIMESTAMP_LTZ(3) NOT NULL,
    type        VARCHAR,
    contentJson VARCHAR,
    WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS
) WITH (
      'connector' = 'kafka',
      'topic-pattern' = 'role__.+?',
      'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
      'properties.group.id' = 'role_local_1',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json',
      'properties.allow.auto.create.topics' = 'true',
      'json.timestamp-format.standard' = 'ISO-8601',
      'sink.parallelism' = '3',
      'sink.buffer-flush.interval' = '10s',
      'sink.buffer-flush.max-rows' = '1000'
      );

create view ROLES_NORMALIZED as
(
select 
       JSON_VALUE(contentJson, '$.id')                                 as id,
       rr.type                                                         as type
from raw_table rr
    );


CREATE VIEW ROLES_UPSERTS_V1 AS
(
SELECT *
FROM ROLES_NORMALIZED
WHERE type in ('ROLE_CREATED', 'ROLE_UPDATED')
    );

CREATE VIEW ROLES_DELETED_V1 AS
(
SELECT org,
       pod,
       tenantId,
       id,
       modified,
       modified as deleted,
       event_timestamp
FROM ROLES_NORMALIZED
WHERE type in ('ROLES_DELETED')
    );

-------

CREATE TABLE final_topic
(
    event_timestamp         TIMESTAMP_LTZ,
    id                      VARCHAR,
    name                    VARCHAR,
    deleted                 TIMESTAMP_LTZ,
    PRIMARY KEY (pod, org, id) NOT ENFORCED
) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'final_topic',
      'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
      'properties.group.id' = 'some-group-id',
      'value.format' = 'json',
      'key.format' = 'json',
      'properties.allow.auto.create.topics' = 'true',
      'properties.replication.factor' = '3',
      'value.json.timestamp-format.standard' = 'ISO-8601',
      'sink.parallelism' = '3'
      );

INSERT INTO final_topic
select 
       GREATEST(r.event_timestamp, d.event_timestamp) as event_timestamp,
       r.id,
       r.name,
       d.deleted
from ROLES_UPSERTS_V1 r
         LEFT JOIN ROLES_DELETED_V1 d
                   ON r.id = d.id;

字符串
当我在接收器上使用'sink.buffer-flush.interval' = '10s''sink.buffer-flush.max-rows' = '1000'这些选项时,我在类ReducingUpsertWriter中的(145)wrappedContext.setTimestamp(value.f1);行上看到了空点异常。发生这种情况是因为context对象timestamp()方法返回null,这里是https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L85。我使用eventTimestamp TIMESTAMP_LTZ(3) NOT NULL,定义了事件时间属性,使用WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS定义了水印。
当使用flink SQL时,我还需要定义什么才能使context.timestamp()方法返回一个非空值?我使用的是Flink版本1.16.0。
在我发送到源主题的示例消息中,eventTimestamp不是空。
我明白了例外

2023-07-09 12:57:38
java.io.IOException: Could not perform checkpoint 2 for operator SinkMaterializer[44] -> Sink: Writer (3/3)#1.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145)
    at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:90)
    at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:167)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
    at org.apache.flink


更多更新
1.当使用view转换源表数据时,似乎会出现这个问题。不知何故,记录的时间戳为空。
有没有办法解决这个问题?

lzfw57am

lzfw57am1#

这是一个在https://issues.apache.org/jira/browse/FLINK-25916下跟踪的bug-如果您将Flink Kafka Connector升级到3.0.0,问题将得到解决。

相关问题