带有源连接器的Pulsar Rabbitmq错误

fae0ux8s  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(263)

我正在尝试使用Apache Pulsar的RabbitMQ作为源代码。我在本地运行Pulsar的二进制文件,并使用正式的RabbitMQ连接器。RabbitMQ正在服务器中运行。我可以看到RabbitMQ Jmeter 板显示Pulsar已连接,但在Pulsar中出现错误。

2022-06-15T19:19:14,949+0300 [function-timer-thread-78-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check failed for rabbit-connector-local-0
java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:184) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.0.jar:2.10.0]
at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.0.jar:2.10.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.Status.asRuntimeException(Status.java:535) ~[io.grpc-grpc-api-1.42.1.jar:1.42.1]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534) ~[io.grpc-grpc-stub-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
... 1 more
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:46247
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
at io.grpc.netty.shaded.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:278) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
... 1 more

我的rabbitmq的yaml文件是,

configs:                                                                       
    host: "my.server"
    port: 5672
    virtualHost: "/" 
    username: "user"
    password: "pass"
    queueName: "topic"
    connectionName: "my-connection"
    requestedChannelMax: 0

有什么想法吗?也许需要做一些额外的事情?

0sgqnhkj

0sgqnhkj1#

我以前见过这种情况,有时是磁盘错误,有时是错误的Java版本。请确保您使用的是与您的Pulsar版本相匹配的正确Java版本。您使用的是哪个版本的Pulsar?如果是当前的github,请使用JDK 17,否则JDK 8或11可能是您最好的选择。
Pulsar正在运行吗?
我的配置文件

如果有多个消息传送协议,则需要用逗号分隔

消息传送协议=amqp

目录

协议处理程序目录=./protocols

-- AMQP

添加amqp配置

客户端服务器:
请参见此处:https://github.com/tspannhw/FLiPN-AirQuality-Checks

相关问题