kafka-timestampextractor的问题

rdrgkggo  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(328)

我用 org.apache.kafka:kafka-streams:0.10.0.1 我正在尝试使用一个基于时间序列的流,它似乎不会触发 KStream.Process() 触发(“标点符号”)(参见此处以供参考)
在一个 KafkaStreams 我正在传递的配置参数(包括其他参数):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

在这里, EventTimeExtractor 是一个自定义时间戳提取器(实现 org.apache.kafka.streams.processor.TimestampExtractor )从json数据中提取时间戳信息。
我希望这个调用我的对象(派生自 TimestampExtractor )当每一个新的记录被拉进来。所讨论的流是2*10^6条记录/分钟。我有 punctuate() 设置为60秒,它就不会开火。我知道数据经常通过这个跨度,因为它会拉旧值来追赶。
事实上,它从来没有被调用。
这是不是在kstream记录上设置时间戳的错误方法?
这是不是声明此配置的错误方法?

qyyhg6bp

qyyhg6bp1#

我认为这是另一个经纪人层面的问题。我去用cpu和ram更多的示例重建了集群。现在我得到了我期望的结果。
请注意:如果您的kstream应用程序行为异常,请查看您的代理程序,确保它们没有卡在gc中,并为文件句柄、ram等留出足够的空间。
另请参见

gab6jxml

gab6jxml2#

你的方法似乎是正确的。比较中的pargraph“timestamp extractor(timestamp.extractor):”http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-配置参数
不确定,为什么不使用自定义时间戳提取器。看一看 org.apache.kafka.streams.processor.internals.StreamTask . 在构造函数中应该有

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

检查你的定制提取器是否在那里取走。。。

wqsoz72f

wqsoz72f3#

2017年11月更新:kafka 1.0中的kafka流现在支持 punctuate() 具有流时间和处理时间(挂钟时间)行为。所以你可以选择你喜欢的行为。
你的设置对我来说似乎是正确的。
你需要注意的是:从Kafka0.10.0开始 punctuate() 方法在流时间上操作(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。并且流时间仅在新数据记录进入时提前,流时间提前多少由这些新记录的相关时间戳决定。
例如:
假设你已经设定了 punctuate() 每1分钟调用一次= 60 * 1000 (注:流时间为1分钟)。现在,如果在接下来的5分钟内没有收到任何数据, punctuate() 根本不会被调用——即使您可能期望它被调用5次。为什么?再说一次,因为 punctuate() 取决于流时间,流时间仅根据新接收的数据记录提前。
这可能是你所看到的行为的原因吗?
展望未来:在Kafka项目中,已经有一个正在进行的关于如何使 punctuate() 更灵活,例如,不仅基于 stream-time (默认为 event-time )但也基于 processing-time .

相关问题