tranxId = "O8XXD";
cache = new HashMap()
RestartSource.onFailuresWithBackoff(minBackoff = 2.seconds, maxBackoff = 600.seconds, randomFactor = 0.4)
{ () =>
Transactional.source(consumerProperties, Subscriptions.topics("CTOPIC"))
.mapAsync(1)(myMessage => {
upsertDLQ(myMessage) // insert/update myMessge into DLQ and with the count of retries
EvaluateMyMessage(myMessage) //calls google API via n/w call for e.g.
case exception:NetworkException =>{
//log the error and do a backoff-retry via the above RestartSource block
if (DLQ_retry_count(myMessage)> 4{
Future(ProducerMessage.passThrough[String, String, PartitionOffset](myMessage.partitionOffset))
else {
Future failed exception
.via(Transactional.flow(producerProperties, tranxId)) //write to "PTOPIC"
}.runWith(Sink.foreach(res => log.info(s" Response:" +s" ${res.passThrough}")))
[info] a.k.i.TransactionalSourceLogic - [0cd3d] Starting. StageActor
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
fetch.max.wait.ms = 500
group.id = mygroup-1
group.instance.id = null
heartbeat.interval.ms = 3000
isolation.level = read_committed
max.poll.interval.ms = 300000
max.poll.records = 500
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
transaction.timeout.ms = 60000
transactional.id = O8XXD