将apache kafka连接到数据库

ffscu2ro  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(520)

我正在尝试用postgresql、apachekafka(发送数据)和apachespark streaming(处理数据)创建一个流/大数据应用程序。
在本文档之后,我首先尝试将kafka连接到sqlite数据库。但是当我用命令运行连接器时

$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

在进程关闭之前,我得到了下面的错误。我将正确的路径传递到我的数据库,我验证了。
如何连接和处理我的数据库和Kafka之间的连接?

2017-09-11 12:00:17,305] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Sep 11, 2017 12:00:17 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-09-11 12:00:17,618] INFO Started o.e.j.s.ServletContextHandler@1a2d50d5{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-09-11 12:00:17,623] WARN FAILED ServerConnector@3e37a981{HTTP/1.1}{0.0.0.0:8083}: java.net.BindException: Address already in use (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:321)
    at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
    at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.eclipse.jetty.server.Server.doStart(Server.java:366)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
    at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2017-09-11 12:00:17,624] WARN FAILED org.eclipse.jetty.server.Server@46eb180e: java.net.BindException: Address already in use (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:321)
    at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
    at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.eclipse.jetty.server.Server.doStart(Server.java:366)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
    at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
[2017-09-11 12:00:17,624] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
org.apache.kafka.connect.errors.ConnectException: Unable to start REST server
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:147)
    at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)
Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:321)
    at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
    at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.eclipse.jetty.server.Server.doStart(Server.java:366)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
    ... 2 more
9bfwbjaz

9bfwbjaz1#

WARN FAILED ServerConnector@3e37a981{HTTP/1.1}{0.0.0.0:8083}: java.net.BindException: Address already in use (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.net.BindException: Address already in use

这意味着其他一些东西已经在使用端口8083。
你用的是汇合平台吗?这是一个开始使用apachekafka的好方法,并为您提供了一个简单的方法来启动堆栈( confluent start )以及配置Kafka连接。本博客系列展示了它的实际应用:
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
顺便说一句,你可能也想看看Kafka流,作为一种在Kafka本身做流处理的方式。

x6h2sr28

x6h2sr282#

我把那本书注解掉了 listeners=http://0.0.0.0:8081 中的属性 schema-registry.properties 文件,问题已解决。
你的端口号可能是 8083 不过。

相关问题