我有一个sql这样定义
CREATE TABLE raw_table
(
headers VARCHAR,
id VARCHAR,
eventTimestamp TIMESTAMP_LTZ(3) NOT NULL,
type VARCHAR,
contentJson VARCHAR,
WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'role__.+?',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'role_local_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'properties.allow.auto.create.topics' = 'true',
'json.timestamp-format.standard' = 'ISO-8601',
'sink.parallelism' = '3',
'sink.buffer-flush.interval' = '10s',
'sink.buffer-flush.max-rows' = '1000'
);
create view ROLES_NORMALIZED as
(
select
JSON_VALUE(contentJson, '$.id') as id,
rr.type as type
from raw_table rr
);
CREATE VIEW ROLES_UPSERTS_V1 AS
(
SELECT *
FROM ROLES_NORMALIZED
WHERE type in ('ROLE_CREATED', 'ROLE_UPDATED')
);
CREATE VIEW ROLES_DELETED_V1 AS
(
SELECT org,
pod,
tenantId,
id,
modified,
modified as deleted,
event_timestamp
FROM ROLES_NORMALIZED
WHERE type in ('ROLES_DELETED')
);
-------
CREATE TABLE final_topic
(
event_timestamp TIMESTAMP_LTZ,
id VARCHAR,
name VARCHAR,
deleted TIMESTAMP_LTZ,
PRIMARY KEY (pod, org, id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'final_topic',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'some-group-id',
'value.format' = 'json',
'key.format' = 'json',
'properties.allow.auto.create.topics' = 'true',
'properties.replication.factor' = '3',
'value.json.timestamp-format.standard' = 'ISO-8601',
'sink.parallelism' = '3'
);
INSERT INTO final_topic
select
GREATEST(r.event_timestamp, d.event_timestamp) as event_timestamp,
r.id,
r.name,
d.deleted
from ROLES_UPSERTS_V1 r
LEFT JOIN ROLES_DELETED_V1 d
ON r.id = d.id;
字符串
当我在接收器上使用'sink.buffer-flush.interval' = '10s'
,'sink.buffer-flush.max-rows' = '1000'
这些选项时,我在类ReducingUpsertWriter
中的(145)wrappedContext.setTimestamp(value.f1);
行上看到了空点异常。发生这种情况是因为context
对象timestamp()方法返回null,这里是https://github.com/apache/flink/blob/release-1.16.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java#L85
。我使用eventTimestamp TIMESTAMP_LTZ(3) NOT NULL,
定义了事件时间属性,使用WATERMARK FOR eventTimestamp AS eventTimestamp - INTERVAL '5' SECONDS
定义了水印。
当使用flink SQL时,我还需要定义什么才能使context.timestamp()方法返回一个非空值?我使用的是Flink版本1.16.0。
在我发送到源主题的示例消息中,eventTimestamp
不是空。
我明白了例外
2023-07-09 12:57:38
java.io.IOException: Could not perform checkpoint 2 for operator SinkMaterializer[44] -> Sink: Writer (3/3)#1.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145)
at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:90)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:167)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink
型
更多更新
1.当使用view
转换源表数据时,似乎会出现这个问题。不知何故,记录的时间戳为空。
有没有办法解决这个问题?
1条答案
按热度按时间lzfw57am1#
这是一个在https://issues.apache.org/jira/browse/FLINK-25916下跟踪的bug-如果您将Flink Kafka Connector升级到3.0.0,问题将得到解决。