我有一个消费者主题ctopic(数据由外部代理编写)和一个生产者ptopic。这个流是消费者从ctopic读取消息,处理它并写入生产者主题。代码如下
tranxId = "O8XXD";
cache = new HashMap()
RestartSource.onFailuresWithBackoff(minBackoff = 2.seconds, maxBackoff = 600.seconds, randomFactor = 0.4)
{ () =>
Transactional.source(consumerProperties, Subscriptions.topics("CTOPIC"))
.mapAsync(1)(myMessage => {
upsertDLQ(myMessage) // insert/update myMessge into DLQ and with the count of retries
EvaluateMyMessage(myMessage) //calls google API via n/w call for e.g.
.recoverWith{
case exception:NetworkException =>{
//log the error and do a backoff-retry via the above RestartSource block
if (DLQ_retry_count(myMessage)> 4{
Future(ProducerMessage.passThrough[String, String, PartitionOffset](myMessage.partitionOffset))
}
else {
Future failed exception
}
}
}
})
.via(Transactional.flow(producerProperties, tranxId)) //write to "PTOPIC"
}.runWith(Sink.foreach(res => log.info(s" Response:" +s" ${res.passThrough}")))
[info] a.k.i.TransactionalSourceLogic - [0cd3d] Starting. StageActor
Actor[akka://application/system/Materializers/StreamSupervisor-0/$$b#987621478]
''consumerconfig值:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
fetch.max.wait.ms = 500
group.id = mygroup-1
group.instance.id = null
heartbeat.interval.ms = 3000
isolation.level = read_committed
max.poll.interval.ms = 300000
max.poll.records = 500
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
''producerconfig值:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
transaction.timeout.ms = 60000
transactional.id = O8XXD
消息m1、m2、m3、m4依次出现,当第二条消息m2到达时出现网络错误,将发生重新定位。
我的经验是m2应该重试4次,但我看到m3,m4也立即进来,它像m2,m3,m4都试图得到处理。这个过程持续了大约3-4分钟,在这段时间之后它被提交(m2,m3和m4如预期的那样在dlq中)。
但是,当m2还没有提交时,我怎么才能停止阅读m3、m4呢?因为重新平衡,m2被代码读取了大约7-8次。请帮忙
暂无答案!
目前还没有任何答案,快来回答吧!