如果重试失败,如何处理Kafka producer中的消息

xfb7svmp  于 2023-10-15  发布在  Apache
关注(0)|答案(4)|浏览(110)

我们必须实现用户注册模块,我们有两个服务
1.身份服务

  1. UserRegistration服务
    现在,当用户从UserRegistration服务注册时,我们希望将用户详细信息发送到Identity服务,以便用户可以登录到系统。我们正在使用Kafka来实现这一点。
    在我们的案例中,UserRegistration服务充当Kafka生产者。流程如下:
    1.请求是为了注册用户。
    1.将用户数据存储在数据库中。
    1.给Kafka发个信息。

案例:

  • 如果生产者的请求成功,即消息存储在主题中。
  • 向最终用户返回成功消息。
  • 如果制片人的请求失败,
  • 通过发送消息直到重试次数限制来进行重试。
    *如果重试已用尽。
    在这种情况下该怎么办?在这种情况下可能会丢失消息。

我认为我们应该把消息存储在数据库中(即。在表“failed-messages”中),如果重试已用尽,并且具有后台服务,该后台服务将循环失败的消息以尝试在每个时间间隔之后再次将它们发送到Kafka。
请向我们建议处理这种情况的最佳做法。
谢了索拉布

6kkfgxo0

6kkfgxo01#

如果重试失败...
在KafkaProduer中,重试通常设置为Integer.MAX_VALUE,限制设置为 time 而不是no。的重试。
请向我们建议处理这种情况的最佳做法。

方法#1

当您将数据存储在数据库中时,您可以使用Kafka源连接器(例如Debezium)将数据库更改流式传输到Kafka,以便您可以避免在UserRegistration服务中编写生产者。这将是一个更清洁的方法海事组织,因为几个原因:
1.您不需要让用户等待代理发布并确认Kafka消息
1.您不必担心UserRegistration服务中的重试逻辑。
1.即使Kafka关闭一段时间,您的用户也不会受到影响。

方法#2

另一种方法是将生产任务委托给另一个周期性运行的线程。
这个线程的工作是 * 检查 * 数据库的任何更新,并将这些更新推送到Kafka。如果任何事情失败,该线程负责重试并确保消息到达Kafka。
关于这种方法,这里需要提到的一点是,如果您有多个UserRegistration服务示例正在运行,则需要确保将待发送的记录分布在不同示例之间,以便最终不会向Kafka发送重复的记录。这成为一个有状态的服务,因为您需要在多个示例之间进行协调。这是“相对困难”的。

方法#3

如果你的代码已经写好了,你的UserRegistration服务正在使用KafkaProducer,它在请求处理线程本身中生成记录,你可以尝试根据你的Kafka集群将超时增加到一个更大的值(delivery.timeout.ms),并将retries保留为Integer.MAX_VALUE。请记住,你需要确保你的信息以某种方式传递。有两种方法:
1.Async:信任您的超时值,并通过设置为更大的值重试。在某些时候,你应该希望你的信息会被发送给Kafka。对其他错误要格外小心,如序列化问题,缓冲区内存限制等。
1.Sync:调用get()(或)get(time, TimeUnit)并阻止您的请求线程,直到它被发送到Kafka。这会延迟对客户端的响应。
如果我必须选择,我会选择方法1,因为它干净而直接。

r9f1avp5

r9f1avp52#

我认为这取决于这里的业务用例有多重要。虽然您可以将消息存储在一个单独的数据库中并继续工作流,但Kafka代理可能有问题,或者需要仔细查看。IMO,将消息临时移动到数据库是一种处理 * 背压 * 的方法,而不是一种解决方案-它会产生更多的松散结果,比如你如何处理消息的重新处理?
在这种情况下,我更喜欢 * 吻 *(保持简单和愚蠢)。如果情况允许,只需发出警报并使该过程失败

polhcujo

polhcujo3#

我使用了上面文章中提到的方法1。
我已经创建了Cosmos DB源连接器来从cosmos DB获取User数据并将其发布到Kafka主题“new_user_registered”。Cosmos DB源连接器配置:

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "new_user_registered#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "output.data.format": "JSON",
    "transforms": "replacefield",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "id,_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "Login,Email,Password,Name"
  }
}

然后创建了一个Azure SQL接收器连接器,它从Kafka主题“new_user_registered”获取数据。Azure SQL接收器连接器配置:

{
    "name": "sqlserver-sink-azure-connector",
    "config": {
        "name": "sqlserver-sink-azure-connector",
        "connector.class": "io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "transforms": "RenameFields",
        "transforms.RenameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameFields.renames": "Email:vchEmail,Login:vchLogin,Name:vchName,Password:vchPassword",
        "topics": "NEW_USER_REGISTERED_AVRO",
        "azure.sql.dw.url": "jdbc:sqlserver://192.168.154.131:1433;",
        "azure.sql.dw.user": "sa",
        "azure.sql.dw.password": "password123",
        "azure.sql.dw.database.name": "DatabaseName",
        "table.name.format": User"
        "insert.mode": "insert",
        "auto.create": "true",
        "auto.evolve": "true",
        "tasks.max": "1",
        "confluent.topic.bootstrap.servers": "broker:29092"
    }
}

但是接收器连接器抛出异常**“No fields found using key and value schemas for table:用户”**
为此,我在下面的文章中找到了解决方案:Kafka Connect JDBC sink connector not working

  • 解决方案1:* 我们需要在消息中发送schema和payload。(这不适合我们)
  • 解决方案2:* 使用Confluent Avro序列化器。为了使用Avro,我们找到了Confluent提供的一个视频(https://www.youtube.com/watch?v=b-3qN_tlYR4&t=1300s),我们可以使用KSQL和流将JSON转换为Avro,然后使用连接器从流新创建的主题中获取数据。

但是我在想,对于我的情况,我是否应该使用一个带有KSQL/Streams冗长内容的接收器连接器,我只是想在两个服务之间同步用户,而不需要任何转换和模式。
有谁能建议一下,我应该选择传统的消费者还是Kafka连接?
谢谢,索拉布

dphi5xsq

dphi5xsq4#

transmitting发件箱模式与Polling publisher模式互补,可以作为另一种可能的选择进行探索!
transmitting发件箱模式-https://microservices.io/patterns/data/transactional-outbox.html轮询发布者模式-https://microservices.io/patterns/data/polling-publisher.html

相关问题