我用 org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用一个基于时间序列的流,它似乎不会触发 KStream.Process()
触发(“标点符号”)(参见此处以供参考)
在一个 KafkaStreams
我正在传递的配置参数(包括其他参数):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
在这里, EventTimeExtractor
是一个自定义时间戳提取器(实现 org.apache.kafka.streams.processor.TimestampExtractor
)从json数据中提取时间戳信息。
我希望这个调用我的对象(派生自 TimestampExtractor
)当每一个新的记录被拉进来。所讨论的流是2*10^6条记录/分钟。我有 punctuate()
设置为60秒,它就不会开火。我知道数据经常通过这个跨度,因为它会拉旧值来追赶。
事实上,它从来没有被调用。
这是不是在kstream记录上设置时间戳的错误方法?
这是不是声明此配置的错误方法?
3条答案
按热度按时间qyyhg6bp1#
我认为这是另一个经纪人层面的问题。我去用cpu和ram更多的示例重建了集群。现在我得到了我期望的结果。
请注意:如果您的kstream应用程序行为异常,请查看您的代理程序,确保它们没有卡在gc中,并为文件句柄、ram等留出足够的空间。
另请参见
gab6jxml2#
你的方法似乎是正确的。比较中的pargraph“timestamp extractor(timestamp.extractor):”http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-配置参数
不确定,为什么不使用自定义时间戳提取器。看一看
org.apache.kafka.streams.processor.internals.StreamTask
. 在构造函数中应该有检查你的定制提取器是否在那里取走。。。
wqsoz72f3#
2017年11月更新:kafka 1.0中的kafka流现在支持
punctuate()
具有流时间和处理时间(挂钟时间)行为。所以你可以选择你喜欢的行为。你的设置对我来说似乎是正确的。
你需要注意的是:从Kafka0.10.0开始
punctuate()
方法在流时间上操作(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。并且流时间仅在新数据记录进入时提前,流时间提前多少由这些新记录的相关时间戳决定。例如:
假设你已经设定了
punctuate()
每1分钟调用一次=60 * 1000
(注:流时间为1分钟)。现在,如果在接下来的5分钟内没有收到任何数据,punctuate()
根本不会被调用——即使您可能期望它被调用5次。为什么?再说一次,因为punctuate()
取决于流时间,流时间仅根据新接收的数据记录提前。这可能是你所看到的行为的原因吗?
展望未来:在Kafka项目中,已经有一个正在进行的关于如何使
punctuate()
更灵活,例如,不仅基于stream-time
(默认为event-time
)但也基于processing-time
.