kafka conenct hdfs接收器以Parquet格式保存数据

vsaztqbk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(480)

使用kafka connect hdfs sink,我可以将avro数据写入kafka主题,并将数据保存在hive/hdfs中。
我试图用format类以Parquet文件格式保存数据

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

quickstart-hdfs.properties如下

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_parquet
hdfs.url=hdfs://localhost:9000
flush.size=3
hive.metastore.uris=thrift://10.15.167.109:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我将数据发布到kafka时,表是在hive中创建的,test\u hdfs\u parquet目录是在hdfs中创建的,但是sink无法以parquet格式保存数据,原因是出现以下异常

java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:178)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
        at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-03-13 11:48:41,148] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
ux6nzvsh

ux6nzvsh1#

你好像遇到了这个问题
提到的解决方案是使用avro1.7.7库,这意味着 parquet-avro ,还有其他的avrojar。
或者您可以尝试从源代码编译hdfs connect,并更新所有jar。

相关问题