kotlin 如何防止Event Hub消费者与生产者断开连接?

snz8szmq  于 2023-06-24  发布在  Kotlin
关注(0)|答案(1)|浏览(137)

我们有一个场景,外部系统生成发送到EventHub的事件。此外,我们还有一个单独的服务,它使用来自EventHub的数据。但是,如果生产者停止向分区发送事件,消费者最终会断开与它的连接。如何确保消费者与生产者保持联系?此外,我需要能够在从开始或最后一点开始的消费事件之间切换。
我的第二个问题是:从消费者的Angular 来看,EventProcessorClient和EventHubClient之间有什么区别?
这是我的消费者代码。

val retryOptions = AmqpRetryOptions()
            .setDelay(Duration.ofSeconds(10))
            .setMaxDelay(Duration.ofSeconds(30))
            .setMaxRetries(3)
            .setMode(AmqpRetryMode.EXPONENTIAL)

        val eventHubClientBuilder = EventHubClientBuilder()
            .connectionString("connectionString")
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .retryOptions(retryOptions)
            .transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
        val eventHubClient = eventHubClientBuilder.buildAsyncConsumerClient()

        // this boolean indicates whether to read from the beginning of the stream
        eventHubClient.receive(true)
            .subscribe {
                log.info(
                    "Data Offset: ${it.data.offset}, " +
                        "Sequence number: ${it.data.sequenceNumber}, " +
                        "PartitionId: ${it.partitionContext.partitionId}, " +
                        "Enqueued time: ${it.data.enqueuedTime}, " +
                        "Received message: ${it.data.bodyAsString}"
                )
                // process messages
            }

经过一段时间后,消费者的日志将显示以下信息:
{"az.sdk.message":"Unable to schedule close work. Closing manually." ... }
{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor." ...}
{"az.sdk.message":"Timed out waiting for RequestResponseChannel to complete closing. Manually closing." ...}

k3fezbri

k3fezbri1#

防止Event Hub使用者与生产者断开连接。
生产者必须继续向事件中心的分区发送事件。如果生产者停止向分区发送事件,则消费者断开连接,因为没有新的数据要消费。
维护生产者和消费者之间连接的步骤。

  • 检查生产者是否存在任何可能导致其停止发送事件的问题。
  • 检查生产者是否正确运行,并且可以持续向事件中心发送事件。
  • 监控生产者的活动
  • 实现故障转移机制,以防主生产者发生故障或停止发送事件。

为使用者创建使用者组时,请确保配置了适当的设置。例如,如果您希望消费者从event stream的开头读取,则将EventPosition设置为EventPosition.fromStartOfStream(),或者使用EventPosition.latest()仅读取最新的事件。
代码引用取自Azure Event Hubs

string conn = @"Connection";
     string hub = "myhub";
     int NoOfEvents = 10;
   await using (var pc = new EventHubProducerClient(conn, hub))
    {
        EventDataBatch event_Batch = await pc.CreateBatchAsync();

        for (int i = 1; i <= NoOfEvents; i++)
        {
            string msg = $"Event {i}";
            event_Batch.TryAdd(new EventData(Encoding.UTF8.GetBytes(msg)));
        }

        await pc.SendAsync(event_Batch);
        Console.WriteLine($"Sent {NoOfEvents} events to hub: {hub}");
    }

Azure中的度量。

关于EventProcessorClient和EventHubClient之间的区别。

EventProcessorClient为事件处理提供更高层次的抽象。它允许通过提供负载平衡,检查点和自动分区管理等内置功能从Event Hub中消费事件。

并且被设计为并行处理来自多个分区的进程事件并确保容错。

EventHubClient

这提供了一个较低级别的API,用于从Event Hub消费事件。并对事件使用有更多的控制,但需要您手动处理分区管理、负载平衡和检查点。
并且适合于消费来自单个分区的事件。
也可参考Exchange events using different protocols.

相关问题