我正在尝试在SpringBoot应用程序中接收来自Azure服务总线的消息,但遇到了一些问题。这是我的代码:
Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
try {
System.out.println(messageContext.getMessage().getMessageId());
// other message processing code
messageContext.complete();
} catch (Exception ex) {
messageContext.abandon();
}
};
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString(<<CONNECTION_STRING>>)
.processor()
.queueName("test")
.disableAutoComplete()
.receiveMode(PEEK_LOCK)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
字符串
基本上,我使用了与这个例子相同的代码,但是当我运行我的应用程序时,我收到了这个错误:
java.lang.NoSuchFieldError:CLIENT_RECEIVER_IDENTIFIER位于com.azure.messaging.servicebus.implementation.ServiceBusReactorSession. PLAN Consumer(ServiceBusReactorSession.java:185)~[azure-messaging-servicebus-7.11.0.jar:7.11.0] at com.azure.messaging.servicebus.implementation. ServiceBusReactorSession.jar Consumer(ServiceBusReactorSession.java:99)~[azure-messaging-servicebus-7.11.0.jar:7.11.0],位于com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection.tagda$tagesReceiveLink$3(ServiceBusReactorAmqpConnection.java:183)~[azure-messaging-servicebus-7.11.0.jar:7.11.0] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher. FluxData $TimeoutMainSubscriber.onComplete(FluxTimeout.java:234)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.tryOnNext(FluxDistinctUntilChanged.java:149)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.onNext(FluxDistinctUntilChanged.java:102)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.ReplayProcessor.tryEmitNext(ReplayProcessor.java:508)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)~[reactor-core-3.4.22.jar:3.4.22] at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)~[reactor-core-3.4.22.jar:3.4.22] at com.azure.core.amqp.implementation.Handler.onNext(Handler.java:89)~[azure-core-amqp-2.5.2.jar:2.5.2] at com.azure.core.amqp.implementation.handler. SessionRemote. onSessionRemote Open(SessionHandler.java:87)~[azure-core-amqp-2.5.2.jar:2.5.2] at org.apache.qpid.proton.engine. BaseSync.handle(BaseHandler.java:146)~[proton-j-0.33.6.jar:na] at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)~[proton-j-0.33.6.jar:na] at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)~[proton-j-0.33.6.jar:na] at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)~[proton-j-0.33.6.jar:na] na] at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)~[azure-core-amqp-2.5.2.jar:2.5.2] at reactor.core.scheduler.SchedulerTask.call(ReactorTask.java:68)~[reactor-core-3.4.22.jar:3.4.22]在reactor.core.scheduler.SchedulerTask.call(ReactorTask.java:28)~[reactor-core-3.4.22.jar:3.4.22]在java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)~[na:na]位于java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)~[na:在java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)~[na:na] at java.base/java.lang.Thread.run(Thread.java:834)~[na:an]
这很奇怪,因为当我尝试使用peekMessage函数使用ServiceBusReceiverAsyncClient时,它可以工作,但是当我尝试使用receiveMessages或使用ProcessorClient的这种方法时,它抛出错误。我没有找到任何关于这个错误的信息,我不知道它是否是我必须设置的属性。
1条答案
按热度按时间mepcadol1#
显然,这是有关的te包版本,以前我是使用以下:
字符串
当我回滚到以前的版本(7.10.1)时,它开始正常工作,我不知道是否有兼容性问题,但现在它工作正常。
谢谢