http请求源到kafka接收器?

dzhpxtsq  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(587)

我如何连接一个播放后路线到KafkaFlume?
我发现https://github.com/jamesward/hello-play-kafka. 但它使用随机数字的滴答声源连接到Kafka接收器上。1
如何使post-request路由成为连接到kafka接收器的源?
编辑:post请求主体的格式是json,内容类型为application/json。发送给Kafka的消息应该是完全相同的json。路由需要一条json消息。

ni65a41a

ni65a41a1#

也许我误解了您的问题,但您可以使用rest代理向Kafka发送消息:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{ "records": [ { "value": { "name": "testUser0" } }, { "value": { "name": "testUser1" } } ] }' \
      "http://localhost:8082/topics/jsontest"

参考:融合rest代理文档

mw3dktmi

mw3dktmi2#

由于您的post请求是直接进入kafka的,一对一,因此不清楚akkastreams提供的流模型是否适合。使用 KafkaProducer api直接发送消息,一次发送一条消息到一个Kafka主题。这将使代码,特别是错误处理变得更加简单。
可能仍然有很好的理由选择使用akka流,例如管理并发或排序。具体的原因可能会影响最佳的解决方法,所以如果你有更多的细节,请添加到问题,我会更新答案。
例如,如果要确保没有并发写操作,可以在guice中将类注册为单例(假设使用guice进行依赖注入)。当它被创建时,它运行一个akka流,使用 Source.queue , Source.actorRef ,或 Source.actorRefWithBackpressure 还有Kafka Sink . 它还需要使用 RestartSource 以确保流在出现错误时重新启动。公开一个公共方法,该方法允许调用者向具体化的源队列或参与者发送消息,然后将singleton注入到控制器类中,以允许它使用该方法发布其post数据。
这种方法的一个大缺点是,控制器操作只能知道消息已成功发送到流,而不能知道消息已成功写入kafka。如果kafka代理关闭,应用程序崩溃,或者由于其他原因写操作失败,则消息可能会丢失,即使在将成功的结果返回给发起客户端之后也是如此。这不是所有用例的问题,但是需要考虑。

相关问题