当前设置-我正在从16个单分区主题流式传输数据,执行ktable ktable连接,并发送一个包含所有流的聚合数据的输出。我还将每个ktable具体化为本地州商店。
情景-
当我试着运行两个应用程序示例时,我期望kafka streams在一个示例上运行,但由于某些原因,它也在另一个示例上运行。看起来它可以在kafka streams期间在其他应用程序示例上创建流任务在示例#1上发生故障时导致某些流异常。主题上的大量积压工作被清除了,但当我通过interactivequeryservice查询聚合数据状态存储时,我发现很少有数据流丢失。看起来它为示例2上的几个流创建了状态存储。但不确定。
当我试着在一个示例上运行这个繁重的backlog时,我看到了各种各样的异常-超时异常,recordtoolarge异常。对于recordtoolarge异常,我实现了productionexceptionhandler来捕获异常并继续。但看起来它是超时,而不是Kafka团队不断崩溃-
org.apache.kafka.common.errors.timeoutexception:60000毫秒后更新元数据失败。可以增加producer参数 retries
以及 retry.backoff.ms
以避免此错误。
分区0的5条记录过期:自上次追加后已过381960毫秒;将不再发送记录,也不再记录此任务的偏移量。
由于以前的记录(键1值)出现错误而中止发送[b@2 由于org.apache.kafka.common.errors.timeoutexception,主题 xyz store changelog的时间戳1548700011925):60000毫秒后更新元数据失败。
当前配置-
retry.backoff.ms:5000 request.timeout.ms:300000重试次数:5 commit.interval.ms:1000处理_guarantee:at_least_once
所有其他值都是默认值,如-
最大轮询间隔\u ms:2147483647最大轮询_records:1000 batch_size:16384数字流_threads:1 linger_ms:100最大阻塞:60000最大飞行需求_conn:5
我想在单个示例上运行应用程序而不崩溃,即使它的进程缓慢,以确保它清除积压。
暂无答案!
目前还没有任何答案,快来回答吧!