我们有一个场景,外部系统生成发送到EventHub的事件。此外,我们还有一个单独的服务,它使用来自EventHub的数据。但是,如果生产者停止向分区发送事件,消费者最终会断开与它的连接。如何确保消费者与生产者保持联系?此外,我需要能够在从开始或最后一点开始的消费事件之间切换。
我的第二个问题是:从消费者的Angular 来看,EventProcessorClient和EventHubClient之间有什么区别?
这是我的消费者代码。
val retryOptions = AmqpRetryOptions()
.setDelay(Duration.ofSeconds(10))
.setMaxDelay(Duration.ofSeconds(30))
.setMaxRetries(3)
.setMode(AmqpRetryMode.EXPONENTIAL)
val eventHubClientBuilder = EventHubClientBuilder()
.connectionString("connectionString")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.retryOptions(retryOptions)
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
val eventHubClient = eventHubClientBuilder.buildAsyncConsumerClient()
// this boolean indicates whether to read from the beginning of the stream
eventHubClient.receive(true)
.subscribe {
log.info(
"Data Offset: ${it.data.offset}, " +
"Sequence number: ${it.data.sequenceNumber}, " +
"PartitionId: ${it.partitionContext.partitionId}, " +
"Enqueued time: ${it.data.enqueuedTime}, " +
"Received message: ${it.data.bodyAsString}"
)
// process messages
}
经过一段时间后,消费者的日志将显示以下信息:{"az.sdk.message":"Unable to schedule close work. Closing manually." ... }
{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor." ...}
{"az.sdk.message":"Timed out waiting for RequestResponseChannel to complete closing. Manually closing." ...}
1条答案
按热度按时间k3fezbri1#
防止Event Hub使用者与生产者断开连接。
生产者必须继续向事件中心的分区发送事件。如果生产者停止向分区发送事件,则消费者断开连接,因为没有新的数据要消费。
维护生产者和消费者之间连接的步骤。
为使用者创建使用者组时,请确保配置了适当的设置。例如,如果您希望消费者从
event stream
的开头读取,则将EventPosition
设置为EventPosition.fromStartOfStream()
,或者使用EventPosition.latest()
仅读取最新的事件。代码引用取自Azure Event Hubs
Azure中的度量。
关于EventProcessorClient和EventHubClient之间的区别。
EventProcessorClient为事件处理提供更高层次的抽象。它允许通过提供负载平衡,检查点和自动分区管理等内置功能从Event Hub中消费事件。
并且被设计为并行处理来自多个分区的进程事件并确保容错。
EventHubClient
这提供了一个较低级别的API,用于从Event Hub消费事件。并对事件使用有更多的控制,但需要您手动处理分区管理、负载平衡和检查点。
并且适合于消费来自单个分区的事件。
也可参考Exchange events using different protocols.