我已经编写了一个akka应用程序,它从kafka获取输入,然后用sharded actors处理数据并输出到kafka。
但在某些情况下,碎片区域无法处理负载,我得到:
您可能应该实现流控制,以避免淹没远程连接。
如何在此链/流中实现背压?
Kafka消费者->共享演员->Kafka制作人
代码中的一些片段:
ReactiveKafka kafka = new ReactiveKafka();
Subscriber subscriber = kafka.publish(pp, system);
ActorRef kafkaWriterActor = (ActorRef) Source.actorRef(10000, OverflowStrategy.dropHead())
.map(ix -> KeyValueProducerMessage.apply(Integer.toString(ix.hashCode()), ix))
.to(Sink.fromSubscriber(subscriber))
.run(materializer);
ConsumerProperties cp = new PropertiesBuilder.Consumer(brokerList, intopic, consumergroup, new ByteArrayDeserializer(), new NgMsgDecoder())
.build().consumerTimeoutMs(5000).commitInterval(Duration.create(60, TimeUnit.SECONDS)).readFromEndOfStream();
Publisher<ConsumerRecord<byte[], StreamEvent>> publisher = kafka.consume(cp,system);
ActorRef streamActor = ClusterSharding.get(system).start("StreamActor",
Props.create(StreamActor.class, synctime), ClusterShardingSettings.create(system), messageExtractor);
shardRegionTypenames.add("StreamActor");
Source.fromPublisher(publisher)
.runWith(Sink.foreach(msg -> {
streamActor.tell(msg.value(),ActorRef.noSender());
}), materializer);
1条答案
按热度按时间ttygqcqt1#
也许您可以考虑将主题并行化为分区(如果适用的话),并通过调整来创建每个分区背压的使用者
ConsumerWithPerPartitionBackpressure
在本例中,使用mapasync和ask与actor集成。