无法使用kafka connect从sap hana获取记录

kq0g1dla  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(739)

我是kafka connect的新手,我正在尝试从sap s/4 hana复制/获取数据,并使用kafka connect将其持久化到hdfs上。到目前为止,我已经通过以下链接尝试了很多事情:
https://github.com/sap/kafka-connect-sap
https://medium.com/@schh/confluent-平台-连接-sap-hana-to-kafka-ec8a8a5be44
我的配置如下:
connect-standalone.properties属性

bootstrap.servers=10.0.4.146:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/user1/ngdbc.jar,/home/user1/kafka-connect-hana-1.0-SNAPSHOT.jar

hana-source.properties文件

name=saptohive-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=saptohive
connection.url=jdbc:sap://34.169.244.241:30041/
connection.user="MYUSER"
connection.password="MYPASS"
saptohive.table.name="SAPHANADB"."MARA"

hdfs-sink.properties属性

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=saptohive
hdfs.url=hdfs://10.0.1.244:8020/warehouse/tablespace/external/hive/
flush.size=3
hive.integration=true
hive.metastore.uris=thrift://10.0.1.244:9083/
hive.database=dev_ingestion_raw
schema.compatibility=BACKWARD

错误

[2020-08-31 10:58:35,969] ERROR Task saptohive-source-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
    java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcV$sp
            at com.sap.kafka.connect.source.GenericSourceTask.start(GenericSourceTask.scala:34)
            at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcV$sp
            ... 9 more
    Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction0$mcV$sp
            at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
            at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
            ... 9 more
    [2020-08-31 10:58:35,971] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

我不确定到底是什么问题。整个过程停留在:

(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2020-08-31 10:58:36,186] INFO AvroDataConfig values:
        schemas.cache.config = 1000
        enhanced.avro.schema.support = false
        connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:170)
[2020-08-31 10:58:36,190] INFO Hadoop configuration directory  (io.confluent.connect.hdfs.DataWriter:93)
[2020-08-31 10:58:36,467] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2020-08-31 10:58:37,326] INFO Trying to connect to metastore with URI thrift://10.0.1.244:9083/ (hive.metastore:376)
[2020-08-31 10:58:37,362] INFO Connected to metastore. (hive.metastore:472)
[2020-08-31 10:58:37,437] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:260)
[2020-08-31 10:58:37,523] INFO Discovered coordinator 10.0.1.33:9092 (id: 2147483646 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:607)
[2020-08-31 10:58:37,536] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2020-08-31 10:58:37,537] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
[2020-08-31 10:58:37,547] INFO Successfully joined group connect-hdfs-sink with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
[2020-08-31 10:58:37,550] INFO Setting newly assigned partitions [saptohive-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2020-08-31 10:58:37,562] INFO Started recovery for topic partition saptohive-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2020-08-31 10:58:37,570] INFO Finished recovery for topic partition saptohive-0 (io.confluent.connect.hdfs.TopicPartitionWriter:223)

所有文件,即connect-standalone.properties、hana-source.properties、hdfs-sink.properties和两个“.jar”文件,即ngdbc.jar和kafka-connect-hana-1.0-snapshot.jar,都在同一目录下。
我使用的命令是:

connect-standalone connect-standalone.properties hana-source.properties hdfs-sink.properties

我要知道我做错了什么。任何帮助都将不胜感激。谢谢。

8aqjt8rx

8aqjt8rx1#

实际上,我们集群中的kafka和scala版本比用于构建sap-hana-kafka连接器的版本旧。用kafka版本2.x.x创建了一个新的集群,并且成功了。

相关问题