我是kafka connect的新手,我正在尝试从sap s/4 hana复制/获取数据,并使用kafka connect将其持久化到hdfs上。到目前为止,我已经通过以下链接尝试了很多事情:
https://github.com/sap/kafka-connect-sap
https://medium.com/@schh/confluent-平台-连接-sap-hana-to-kafka-ec8a8a5be44
我的配置如下:
connect-standalone.properties属性
bootstrap.servers=10.0.4.146:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/user1/ngdbc.jar,/home/user1/kafka-connect-hana-1.0-SNAPSHOT.jar
hana-source.properties文件
name=saptohive-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=saptohive
connection.url=jdbc:sap://34.169.244.241:30041/
connection.user="MYUSER"
connection.password="MYPASS"
saptohive.table.name="SAPHANADB"."MARA"
hdfs-sink.properties属性
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=saptohive
hdfs.url=hdfs://10.0.1.244:8020/warehouse/tablespace/external/hive/
flush.size=3
hive.integration=true
hive.metastore.uris=thrift://10.0.1.244:9083/
hive.database=dev_ingestion_raw
schema.compatibility=BACKWARD
错误
[2020-08-31 10:58:35,969] ERROR Task saptohive-source-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcV$sp
at com.sap.kafka.connect.source.GenericSourceTask.start(GenericSourceTask.scala:34)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction0$mcV$sp
... 9 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction0$mcV$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 9 more
[2020-08-31 10:58:35,971] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
我不确定到底是什么问题。整个过程停留在:
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2020-08-31 10:58:36,186] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:170)
[2020-08-31 10:58:36,190] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:93)
[2020-08-31 10:58:36,467] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2020-08-31 10:58:37,326] INFO Trying to connect to metastore with URI thrift://10.0.1.244:9083/ (hive.metastore:376)
[2020-08-31 10:58:37,362] INFO Connected to metastore. (hive.metastore:472)
[2020-08-31 10:58:37,437] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:260)
[2020-08-31 10:58:37,523] INFO Discovered coordinator 10.0.1.33:9092 (id: 2147483646 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:607)
[2020-08-31 10:58:37,536] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2020-08-31 10:58:37,537] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
[2020-08-31 10:58:37,547] INFO Successfully joined group connect-hdfs-sink with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
[2020-08-31 10:58:37,550] INFO Setting newly assigned partitions [saptohive-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2020-08-31 10:58:37,562] INFO Started recovery for topic partition saptohive-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2020-08-31 10:58:37,570] INFO Finished recovery for topic partition saptohive-0 (io.confluent.connect.hdfs.TopicPartitionWriter:223)
所有文件,即connect-standalone.properties、hana-source.properties、hdfs-sink.properties和两个“.jar”文件,即ngdbc.jar和kafka-connect-hana-1.0-snapshot.jar,都在同一目录下。
我使用的命令是:
connect-standalone connect-standalone.properties hana-source.properties hdfs-sink.properties
我要知道我做错了什么。任何帮助都将不胜感激。谢谢。
1条答案
按热度按时间8aqjt8rx1#
实际上,我们集群中的kafka和scala版本比用于构建sap-hana-kafka连接器的版本旧。用kafka版本2.x.x创建了一个新的集群,并且成功了。