我创造了 stream1
在 KSQL
(5.0 beta版)带有支持主题 topic1
以及 avro
架构。我能看懂网上所有的留言 topic1
使用 kafka-avro-console-consumer
.
然后我创造了 stream2
在 KSQL
这是基于 stream1
但与 json
名为的消息和备份主题的格式 topic2
. 我能看懂网上所有的留言 topic2
使用 kafka-console-consumer
我创造了 stream3
在 KSQL
基于 stream2
与 json
消息格式和主题名为 topic3
. 但是,我无法阅读上的消息 topic3
使用 kafka-console-consumer
.
使用 kafkacat
我得到了不同分区的偏移量 topic3
但没有一条真正的信息被打印出来。
看起来信息好像在主题中,但两者都不是 kafkacat
不是 kafka-console-consumer
能够打印出来。
尝试使用 --from-beginning
以及 --offset earliest --partition 0
没有运气。
以下是ksql语句
CREATE STREAM stream1(p_id STRING, location STRING, u_id STRING, r_id STRING, b_id STRING, recorded_dtm STRING,
v_type STRING, value STRING) WITH (kafka_topic='topic1', value_format='AVRO');
CREATE STREAM stream2 WITH (KAFKA_topic='topic2', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP')
AS select P_ID+'-'+LOCATION+'-'+U_ID+'-'+R_ID+'-'+B_ID+'-'+V_TYPE as PARTITION_KEY,
LOCATION, U_ID, R_ID, V_TYPE, B_ID, STRINGTOTIMESTAMP(recorded_dtm, 'yyyyMMddHHmmss') as RECORDED_TIMESTAMP,
P_ID, VALUE, RECORDED_DTM,'NM' as DATA_TYPE
FROM stream1 PARTITION BY PARTITION_KEY;
CREATE STREAM stream3 WITH (KAFKA_topic='topic3', VALUE_FORMAT='json', TIMESTAMP='RECORDED_TIMESTAMP')
AS select PARTITION_KEY, LOCATION, U_ID, R_ID, V_TYPE, B_ID, RECORDED_TIMESTAMP,
P_ID, VALUE, RECORDED_DTM FROM stream2 PARTITION BY PARTITION_KEY;
附加信息
在 ksql
如果我跑了 SET 'auto.offset.reset'='earliest';
然后跑 select * from stream1 limit 5;
或者 select * from stream2 limit 5
我看到打印的记录,但是 select * from stream3 limit 5
不返回任何记录。
如果我跑了 describe extended stream3
我明白了
消息总数:212条
正好是我发给topic1的信息数
1条答案
按热度按时间3lxsmp7m1#
根本原因是
Timestamp
在STREAM3
的值recorded_dtm
发送到主题1上的邮件的列早于log.retention.hours
中设置的值kafka server.properties
.我们的
log.retention.hours
值设置为24 hours
而且记录的dtm值早于24小时。这导致了STREAM3
以及topic3
根据保留策略立即删除。