如何将最新事件与Azure Event Hubs Go SDK(azeventhubs)关联?

2cmtqfgy  于 2023-11-14  发布在  Go
关注(0)|答案(1)|浏览(112)

我正在从azure-event-hubs-go/v3迁移到更新的azeventhubsGo SDK。在旧版SDK中,有一个ReceiveOption参数允许我指定从何处开始消费事件。
在新的SDK中,我使用以下代码来初始化处理器:

  1. processor, err := azeventhubs.NewProcessor(
  2. e.ConsumerClient,
  3. checkpointStore,
  4. &azeventhubs.ProcessorOptions{
  5. UpdateInterval: time.Second,
  6. Prefetch: 0,
  7. StartPositions: azeventhubs.StartPositions{
  8. Default: azeventhubs.StartPosition{
  9. Latest: to.Ptr(true),
  10. EnqueuedTime: to.Ptr(time.Now()),
  11. Inclusive: true
  12. }
  13. }
  14. }
  15. )

字符集
但是,我注意到这些事件是从最后一个检查点而不是从最近发送的事件中消耗的。

尝试内容:我尝试了ConsumingEventsUsingConsumerClientConsumingEventsWithCheckpoints两个示例,但它们的行为方式相同,都是消费上一个检查点的事件,而不是最近的事件。
**预期:**我希望处理器开始消费设备发送的最新事件,设备每秒发送一条消息。如何使用azeventhubs Go SDK实现此行为?

i7uq4tfw

i7uq4tfw1#

我最初很难理解AMQP的基本机制,但是,我很高兴地报告,这个问题已经成功地解决了。

  1. var wg sync.WaitGroup
  2. wg.Add(1)
  3. for _, partition := range p.PartitionIDs {
  4. go func(partition string) {
  5. defer wg.Done()
  6. partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
  7. if err != nil {
  8. panic(err)
  9. }
  10. receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
  11. defer cancel()
  12. for {
  13. events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
  14. if err != nil && !errors.Is(err, context.DeadlineExceeded) {
  15. panic(err)
  16. }
  17. for _, evt := range events {
  18. fmt.Printf("partition: %s\n", partition)
  19. fmt.Printf("Body: %s\n", string(evt.Body))
  20. }
  21. }
  22. }(partition)
  23. }
  24. wg.Wait()

字符集
我感谢Azure客户支持服务团队提供的宝贵帮助。

展开查看全部

相关问题