Kafka连接Hive集成问题

l7mqbcuq  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(251)

我正在使用kafkaconnectforhive集成来创建配置单元表以及s3上的分区。在启动connect分布式进程并进行post调用以侦听主题之后,只要主题中有一些数据,我就可以在日志中看到数据正在提交给s3,如下所示。

2017-07-13 06:59:37 INFO  AbstractCoordinator:434 - Successfully joined group connect-hive-int-1 with generation 2
2017-07-13 06:59:37 INFO  ConsumerCoordinator:219 - Setting newly assigned partitions [test_hive_int_1-0] for group connect-hive-int-1
2017-07-13 06:59:37 INFO  TopicPartitionWriter:213 - Started recovery for topic partition test_hive_int_1-0
2017-07-13 06:59:38 INFO  TopicPartitionWriter:228 - Finished recovery for topic partition test_hive_int_1-0
2017-07-13 06:59:38 INFO  NativeS3FileSystem:246 - OutputStream for key 'ashishs/topics/+tmp/test_hive_int_1/year=2017/month=07/day=13/hour=06/minute=58/97a5b3f2-e9c2-41b4-b344-eb080d048052_tmp.avro' writing to tempfile '/tmp/hadoop-root/s3/output-2343236621771119424.tmp'
2017-07-13 06:59:38 WARN  HiveMetaStore:150 - Hive database already exists: default
2017-07-13 06:59:38 INFO  TopicPartitionWriter:302 - Starting commit and rotation for topic partition test_hive_int_1-0 with start offsets {year=2017/month=07/day=13/hour=06/minute=58/=0} and end offsets {year=2017/month=07/day=13/hour=06/minute=58/=1}
2017-07-13 06:59:38 INFO  NativeS3FileSystem:280 - OutputStream for key 'ashishs/topics/+tmp/test_hive_int_1/year=2017/month=07/day=13/hour=06/minute=58/97a5b3f2-e9c2-41b4-b344-eb080d048052_tmp.avro' closed. Now beginning upload
2017-07-13 06:59:38 INFO  NativeS3FileSystem:292 - OutputStream for key 'ashishs/topics/+tmp/test_hive_int_1/year=2017/month=07/day=13/hour=06/minute=58/97a5b3f2-e9c2-41b4-b344-eb080d048052_tmp.avro' upload complete
2017-07-13 06:59:39 INFO  TopicPartitionWriter:638 - Committed s3://dev.canopydata.com/ashishs//topics/test_hive_int_1/year=2017/month=07/day=13/hour=06/minute=58/test_hive_int_1+0+0000000000+0000000001.avro for test_hive_int_1-0

但是在第一次提交之后,我得到了以下异常:

2017-07-13 06:59:39 INFO  TopicPartitionWriter:638 - Committed s3://dev.canopydata.com/ashishs//topics/test_hive_int_1/year=2017/month=07/day=13/hour=06/minute=58/test_hive_int_1+0+0000000000+0000000001.avro for test_hive_int_1-0
2017-07-13 06:59:39 INFO  WorkerSinkTask:244 - WorkerSinkTask{id=hive-int-1-0} Committing offsets
2017-07-13 06:59:39 INFO  TopicPartitionWriter:531 - Ignoring stale out-of-order record in test_hive_int_1-0. Has offset 0 instead of expected offset 4
2017-07-13 06:59:49 ERROR WorkerSinkTask:390 - Task hive-int-1-0 threw an uncaught and unrecoverable exception
java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.confluent.connect.hdfs.errors.HiveMetaStoreException: Hive MetaStore exception
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:229)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: io.confluent.connect.hdfs.errors.HiveMetaStoreException: Hive MetaStore exception
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:223)
    ... 12 more
Caused by: io.confluent.connect.hdfs.errors.HiveMetaStoreException: Hive MetaStore exception
    at io.confluent.connect.hdfs.hive.HiveMetaStore.alterTable(HiveMetaStore.java:226)
    at io.confluent.connect.hdfs.avro.AvroHiveUtil.alterSchema(AvroHiveUtil.java:58)
    at io.confluent.connect.hdfs.TopicPartitionWriter$2.call(TopicPartitionWriter.java:664)
    at io.confluent.connect.hdfs.TopicPartitionWriter$2.call(TopicPartitionWriter.java:661)
    ... 4 more
Caused by: MetaException(message:org.datanucleus.exceptions.NucleusDataStoreException: Clear request failed : DELETE FROM `PARTITION_KEYS` WHERE `TBL_ID`=?)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:39803)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:39780)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:39722)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1345)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1329)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table(HiveMetaStoreClient.java:345)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table(HiveMetaStoreClient.java:334)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
    at com.sun.proxy.$Proxy48.alter_table(Unknown Source)
    at io.confluent.connect.hdfs.hive.HiveMetaStore$6.call(HiveMetaStore.java:212)
    at io.confluent.connect.hdfs.hive.HiveMetaStore$6.call(HiveMetaStore.java:209)
    at io.confluent.connect.hdfs.hive.HiveMetaStore.doAction(HiveMetaStore.java:87)
    at io.confluent.connect.hdfs.hive.HiveMetaStore.alterTable(HiveMetaStore.java:218)
    ... 7 more
2017-07-13 06:59:49 ERROR WorkerSinkTask:391 - Task is being killed and will not recover until manually restarted

一个奇怪的观察结果是,如果我删除了这个特定的作业并用相同的配置再次提交它,那么主题中的进一步数据将毫无例外地提交给s3。就在第一次提交之后,我看到了这个例外。
我在post call中使用的负载是:

{
        "name": "hive-int-1",
        "config": {
                "connector.class": "com.qubole.streamx.s3.S3SinkConnector",
                "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
                "tasks.max": "1",
                "topics": "test_hive_int_1",
                "flush.size": "2",
                "s3.url": "s3://dev.canopydata.com/ashishs/",
                "hadoop.conf.dir": "/usr/local/streamx/config/hadoop-conf",
                "rotate.interval.ms": "60000",
                "hive.integration":"true",
                "hive.metastore.uris":"thrift://<host_fqdn>:10000",
                "schema.compatibility":"BACKWARD",
                "partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
                "partition.duration.ms": "120000",
                "locale": "en",
                "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/",
                "timezone": "GMT"
        }
}

任何关于我做错了什么或者我是否遗漏了什么的建议都会很有帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题