当kafka服务器和producer之间的连接中断时,消息会发生什么变化?

v1uwarro  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(427)

我是kafka使用spring boot的新手,我在projet工作,我想将kafka使用spring集成到它中,所以问题是即使kafka服务器不运行(脱机模式),我也想从生产者向消费者发送消息
有谁能给我举个例子,说明在脱机模式下使用Kafka时,我找不到该主题的芭蕾舞曲我想停止我的Kafka服务器(例如),同时制作人想向该主题发送数据,然后消费者可以得到这些信息?最好的解决办法是什么?是真的吗?

  • 将数据发送到文件,当服务器返回运行时(例如,我测试连接),我将数据从文件导出到主题
  • 将数据发送到数据库,当服务器返回运行(test connexion)时,我将同样的消息(数据)从数据库发送到我的主题
  • 使用队列或列表存储消息,当服务器返回运行(test connexion)时,我将数据从列表发送到主题,但问题是我有很多消息

-->如果有其他的解决方案,举个简单的例子,有人能帮我吗?
这是brocker redis的一个例子,我们测试redis brocker和producer之间的连接,如果连接失败,我会把数据存储在一个队列中,这个队列可以存储很多消息,当redis和producer之间的连接恢复工作时,producer现在从队列中获取这些消息,并将它们发送给redis brocker。
但是这个brocker的问题是,有一些信息丢失了,所以我们决定将kafka brocker集成到我的项目中,而不是redis brocker!
有人能给我举一个java例子,在生产者将大量消息发送到kafka集群之前,如何存储这些消息?或者因为我们不想使用相同的队列解决方案,所以最好的解决方案是什么?
python中的这个示例是在connexion未能连接到服务器时如何在队列中存储消息:

try:
    urllib.request.urlopen('http://serverAdress', timeout=0.1)
    r.publish(topicProduction,json_background_of_message1)
    print(json_background_of_message1)
    arretControle=Tru
    except Exception as e:
    qArret.put(json_background_of_message1)
    print("arret")
    arretControle=True

//如果连接失败,我们可以在发送这些消息之前在这个队列中存储大量消息

qoefvg9y

qoefvg9y1#

kafka被设计成一个高度可用的消息传递系统。正确配置,并且取决于复制因素,您可以让多个代理一次完全停机几天,并且集群仍然可以工作(尽管负载可能更高)。我使用过的每一个kafka生产集群,在成功部署之后,都没有完全关闭过。我们已经有个别经纪人倒闭,有时一次倒闭好几天,但这从来都不是问题。
你所建议的是一个回退或备份方法,以防Kafka不可用。然而,你仍然有同样的问题。如果将邮件转储到文件中,需要多长时间才能用完磁盘空间?如果将消息存储在数据库中,那么数据库需要多长时间才能用完空间?如果将消息存储在内存队列中,那么要多久才能耗尽内存并使应用程序崩溃?现在还必须构建一种机制来从kafka中断中恢复,这会增加复杂性和开销。
使用kafka的最佳方法是将其配置为一个高可用性系统,并将其作为一个高可用性系统进行处理,正确地配置警报和度量,这样您将立即收到警报,并且在出现问题时可以迅速做出React。此外,您应该始终调整和测试应用程序的大小,以便有足够的空间来处理最坏的情况。如果将其配置为使用复制因子3,则可以丢失任何两个代理,并且集群仍可以正常工作,而不会丢失数据。
现在,在应用程序方面,如果kafka不可用,您的行为应该取决于消息的重要性。如果您可以容忍消息丢失,那么只要在生产者返回异常时删除它们,并记录它/发送警报即可。但是,如果它们是非常重要的记录,那么在完全确认它们保存在kafka中之前,不应该确认/提交上游系统上的消息(记录来自何处)。我建议将producer acks设置为 -1 或者 all 为此,在失败的情况下进行多次重试,并在 producer.send() 方法。更多详细说明请参见此处:https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/callback.html
更多细节,如其他人所说,请阅读官方文件:https://kafka.apache.org/documentation/

相关问题