我在本地主机上设置了kafka,并尝试在网络出现问题时监视kafka制作者的行为。即使所有经纪人都倒下了,制片人也没有给出任何错误。我用的是同步制作人和Kafka版本0.8。如果所有经纪人都倒下了,生产商有没有办法收到例外情况?
yftpprvb1#
java/scala中较新的8.2 producer(异步producer)为每条消息传递了一个回调方法。您可以尝试处理回调中的失败。。可能重试。回调方法采用两个参数(exception和messagemetatdata)。任何时候都只能设置一个。meatadata是在您的消息成功发送时设置的,以防出错。使用sync producer,您必须设置max.retries和@leshkin指出的其他配置。
yqhsw0fo2#
请在生产者中添加异常处理:
try{ Producer logic } catch (Exception ex) { String errorMsg = "Failed to publish events"; logger.error("Failed to publish events", ex); result = Status.BACKOFF;
如果还是不行,请告诉我。
k75qkfdt3#
这取决于生产者配置(生产者配置)特别注意参数:
(METADATA_FETCH_TIMEOUT_CONFIG, 60000), (TIMEOUT_CONFIG, 10000), (RETRY_BACKOFF_MS_CONFIG, 100), (RECONNECT_BACKOFF_MS_CONFIG, 1000)
您一定要更改有关设置的参数。所有这些参数都会影响生产者的行为。
3条答案
按热度按时间yftpprvb1#
java/scala中较新的8.2 producer(异步producer)为每条消息传递了一个回调方法。您可以尝试处理回调中的失败。。可能重试。回调方法采用两个参数(exception和messagemetatdata)。任何时候都只能设置一个。meatadata是在您的消息成功发送时设置的,以防出错。
使用sync producer,您必须设置max.retries和@leshkin指出的其他配置。
yqhsw0fo2#
请在生产者中添加异常处理:
如果还是不行,请告诉我。
k75qkfdt3#
这取决于生产者配置(生产者配置)
特别注意参数:
您一定要更改有关设置的参数。所有这些参数都会影响生产者的行为。