什么是调试自定义kafka连接器的简单有效的方法?

ukqbszuj  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(593)

我正在使用两个kafka连接器,在控制台输出中没有看到它们的创建/部署中有任何错误,但是我没有得到我想要的结果(没有任何结果,不管是想要的还是其他的)。我基于kafka的示例filestream连接器制作了这些连接器,因此我的调试技术基于示例中使用的slf4j记录器。我已经搜索了我认为会在控制台输出中生成的日志消息,但是没有结果。我是不是找错地方了?或者有没有更好的方法来调试这些连接器?
我在实现中引用的slf4j记录器的示例用法:
kafka文件流链接任务
Kafka文件流源任务

0aydgbwb

0aydgbwb1#

连接器模块由kafka连接器框架执行。对于调试,我们可以使用独立模式。我们可以将ide配置为使用connectstandalone主函数作为入口点。
创建调试配置,如下所示。如果是maven项目,需要记住勾选“include dependencies with”provided“scope

连接器属性文件需要为调试指定连接器类名“connector.class”

可以从kafka文件夹/usr/local/etc/kafka/connect-standalone.properties复制worker属性文件

zxlwwiss

zxlwwiss2#

我会尽量宽泛地回答你的问题。连接器开发的简单方法如下:
通过查看许多公开提供的kafka连接器中的一个来构造和构建连接器源代码(您可以在这里找到一个广泛的列表:https://www.confluent.io/product/connectors/ )
从下载最新的confluent开源版本(>=3.3.0)https://www.confluent.io/download/
通过以下方式之一使您的连接器包可供kafka connect使用:
将所有连接器jar文件(连接器jar加上不包括connectapi jar的依赖关系jar)存储到文件系统中的某个位置,并通过将该位置添加到 plugin.path 属性。例如,如果连接器jar存储在 /opt/connectors/my-first-connector ,您将设置 plugin.path=/opt/connectors 在工人的财产中(见下文)。
将所有连接器jar文件存储在 ${CONFLUENT_HOME}/share/java . 例如: ${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector . (需要从 kafka-connect- 由启动脚本拾取的前缀)$confluent\u home是安装confluent平台的地方。
(可选)通过更改“连接”的日志级别来增加日志记录 ${CONFLUENT_HOME}/etc/kafka/connect-log4j.propertiesDEBUG 甚至 TRACE .
使用confluent cli启动所有服务,包括kafka connect。详情如下:http://docs.confluent.io/current/connect/quickstart.html
简要地: confluent start 注意:cli当前加载的connect worker的属性文件是 ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties . 如果选择启用类加载隔离,并且需要更改connect worker的属性,则应该编辑该文件。
运行connect worker后,通过运行以下命令启动连接器: confluent load <connector_name> -d <connector_config.properties>confluent load <connector_name> -d <connector_config.json> 连接器配置可以是java属性或json格式。
confluent log connect 打开connect worker的日志文件,或通过运行 cd "$( confluent current )" 注意:通过设置环境变量,在confluent cli会话期间更改日志和数据的存储位置 CONFLUENT_CURRENT 适当地。e、 g.鉴于此 /opt/confluent 存在并且是您要存储数据的位置,请运行:
export CONFLUENT_CURRENT=/opt/confluent confluent current 最后,要以交互方式调试连接器,一种可能的方法是在使用confluent cli启动connect之前应用以下内容: confluent stop connect export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y; confluent start connect 然后与调试器连接(例如远程连接到connect worker(默认端口:5005))。要停止在调试模式下运行connect,只需运行: unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG; 当你完成的时候。
我希望以上将使您的连接器开发更容易和。。。更有趣!

e5njpo68

e5njpo683#

我喜欢公认的答案。有一件事-环境变量不适合我。。。我正在使用confluent社区版5.3.1。。。
这就是我所做的工作。。。
我从这里安装了confluent cli:https://docs.confluent.io/current/cli/installing.html#tarball-安装
我使用命令运行合流 confluent local start 我用命令得到了连接应用程序的详细信息 ps -ef | grep connect 我将生成的命令复制到编辑器中,并添加了arg(就在java之后):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

然后我停止使用命令连接 confluent local stop connect 然后我用arg运行connect命令
短暂间歇---
vs代码开发由erichgamma-of领导 gang of four 他还写了《月 eclipse 》。vs代码正在成为一流的JavaIDEhttps://en.wikipedia.org/wiki/erich_gamma
中场休息结束---
接下来,我启动了vs代码并打开了debezium oracle connector文件夹(从这里克隆)https://github.com/debezium/debezium-incubator
然后我选择了 Debug - Open Configurations
并进入突出显示的调试配置

然后运行调试器-它会命中你的断点!!

connect命令应如下所示:

/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties

相关问题