kafka连接hdfs-protobuf到Parquet地板

btqmn9zl  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(559)

我正在尝试使用Kafka连接hdfs,但似乎不起作用。。
我试着把设置弄得乱七八糟,但似乎什么都没用。。
这是protobuf消息架构:

syntax = "proto3";
package com.company;
option java_package = "com.company";
option java_outer_classname = "MyObjectData";
import public "wrappers.proto";
message MyObject {
int64 site_id = 1;
string time_zone = 2;
uint64 dev_id = 3;
uint64 rep_id = 4;
uint64 dev_sn = 5;
UInt64Value timestamp = 6;
UInt32Value secs = 7;
UInt64Value man_id = 8;
FloatValue panv = 9;
FloatValue outputv = 10;
FloatValue panelc = 11;
FloatValue ereset = 12;
FloatValue temp = 13;
FloatValue tempin = 14;
FloatValue tempout = 15;
UInt32Value sectelem = 16;
FloatValue energytelem = 17;
UInt32Value ecode = 18;

}

connect-standalone.properties如下所示:

bootstrap.servers=k1:9092,k2:9092,k3:9092

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject
key.converter.schemas.enable=false
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java

quickstart-hdfs.properties是:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=ObjectTopic
hadoop.conf.dir=/etc/hadoop
hdfs.url=hdfs://hdp-01:8020/user/hdfs/telems
hadoop.home=/etc/hadoop/client
flush.size=3
key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

transforms=SetSchemaName
transforms.SetSchemaName.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetSchemaName.schema.name=com.acme.avro.MyObject

当前我收到以下错误:
org.apache.kafka.connect.errors.connectexception:由于不可恢复的异常而退出workersinktask。在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:586)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:322)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:225)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:193)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:175)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:219)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:org.apache.avro.schemaparseexception:无法重新定义:io.confluent.connect.avro.connectdefault atorg.apache.avro.schema$names.put(schema。java:1128)在org.apache.avro.schema$namedschema.writenameref(schema。java:562)在org.apache.avro.schema$recordschema.tojson(schema。java:690)在org.apache.avro.schema$unionschema.tojson(schema。java:882)在org.apache.avro.schema$recordschema.fieldstojson(schema。java:716)在org.apache.avro.schema$recordschema.tojson(schema。java:701)在org.apache.avro.schema.tostring(schema。java:324)在org.apache.avro.schema.tostring(schema。java:314)位于org.apache.parquet.avro.avrowritesupport.init(avrowritesupport。java:133)在org.apache.parquet.hadoop.parquetwriter。java:270)在apache.parquet.hadoop.parquetwriter.(parquetwriter。java:222)在org.apache.parquet.hadoop.parquetwriter。java:188)在org.apache.parquet.avro.avroparquetwriter.(avroparquetwriter。java:131)在org.apache.parquet.avro.avroparquetwriter.(avroparquetwriter。java:106)在io.confluent.connect.hdfs.parquet.parquetrecordwriterprovider$1.write(parquetrecordwriterprovider。java:75)at io.confluent.connect.hdfs.topicpartitionwriter.writerecord(topicpartitionwriter。java:643)at io.confluent.connect.hdfs.topicpartitionwriter.write(topicpartitionwriter。java:379)在io.confluent.connect.hdfs.datawriter.write(datawriter。java:375)at io.confluent.connect.hdfs.hdfssinktask.put(hdfssinktask。java:109)在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:564)
另外,如果有关系的话,我可以使用user hdfs 这是一个模式问题吗?似乎我所做的一切都没有改变错误信息。。。

wbgh16ku

wbgh16ku1#

可能是这样, Can't redefine: io.confluent.connect.avro.ConnectDefault 是因为您的转换正在设置架构属性。
你可以试着用 AvroFormat ,它也将获取connect的内部schema和struct对象,并写入hdfs中的avro文件。
注意, ParquetFormat 使用 parquet-avro 项目,所以数据可能应该是avro开始。
注意stacktrace。
org.apache.avro.schemaparseexception。。。
...
org.apache.avro.schema$names.put(schema。java:1128)在org.apache.avro.schema$namedschema.writenameref(schema。java:562)在org.apache.avro.schema$recordschema.tojson(schema。java:690)在org.apache.avro.schema$unionschema.tojson(schema。java:882)在org.apache.avro.schema$recordschema.fieldstojson(schema。java:716)在org.apache.avro.schema$recordschema.tojson(schema。java:701)在org.apache.avro.schema.tostring(schema。java:324)在org.apache.avro.schema.tostring(schema。java:314)位于org.apache.parquet.avro.avrowritesupport.init(avrowritesupport。java:133)在org.apache.parquet.hadoop.parquetwriter。java:270)在apache.parquet.hadoop.parquetwriter.(parquetwriter。java:222)在org.apache.parquet.hadoop.parquetwriter。java:188)在org.apache.parquet.avro.avroparquetwriter.(avroparquetwriter。java:131)在org.apache.parquet.avro.avroparquetwriter.(avroparquetwriter。java:106)
因此,您需要在某处编写avro转换器的协议。也许用 skeuomorph Kafka流,或类似的过程,在您的生产者和连接(最简单的这些选项)
修改 kafka-connect-hdfs 这样就可以处理protobuf了
修改 ProtobufConverter 使其生成 ConnectRecord avro数据
如果一切都失败了,你可以提出一个问题,看看你得到什么。

相关问题