flume hdfs接收器未从kafka通道在hdfs中创建文件

smdnsysy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

我正在尝试实现一个简单的flume hdfs接收器,它将从kafka通道获取事件,并将它们作为文本文件写入hdfs。
建筑非常直接。这些事件从twitter流式传输到kafka主题,flume hdfs sink确实会将这些事件写入hdfs。这是Kafka问题的第二部分。
当我执行这个命令时没有错误,看起来它工作得很好,但是我无法在hdfs中看到文本文件。我无法调试或调查,因为没有在中创建日志文件 /var/log/flume/ 文件夹。我使用hortonworkssandbox2.3.1和hue来浏览文件系统。
执行Flume的命令: flume-ng agent -n KafkaSink -c conf -f tweets_sink_flume.properties flume属性文件: tweets_sink_flume.properties ```

agent name, in this case KafkaSink.

KafkaSink.sources = kafka-source-1
KafkaSink.channels = hdfs-channel-1
KafkaSink.sinks = hdfs-sink-1

kafka source properties.

KafkaSink.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
KafkaSink.sources.kafka-source-1.zookeeperConnect = sandbox.hortonworks.com:2181
KafkaSink.sources.kafka-source-1.topic = raw_json_tweets
KafkaSink.sources.kafka-source-1.batchSize = 100
KafkaSink.sources.kafka-source-1.channels = hdfs-channel-1

hdfs flume sink properties

KafkaSink.channels.hdfs-channel-1.type = memory
KafkaSink.sinks.hdfs-sink-1.channel = hdfs-channel-1
KafkaSink.sinks.hdfs-sink-1.type = hdfs
KafkaSink.sinks.hdfs-sink-1.hdfs.writeFormat = Text
KafkaSink.sinks.hdfs-sink-1.hdfs.fileType = DataStream
KafkaSink.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
KafkaSink.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true

created /user/ruben/flume/ folder in hdfs to avoid permission error issues

KafkaSink.sinks.hdfs-sink-1.hdfs.path = /user/ruben/flume/%{topic}/%y-%m-%d
KafkaSink.sinks.hdfs-sink-1.hdfs.rollCount=100
KafkaSink.sinks.hdfs-sink-1.hdfs.rollSize=0

specify the capacity of the memory channel.

KafkaSink.channels.hdfs-channel-1.capacity = 10000
KafkaSink.channels.hdfs-channel-1.transactionCapacity = 1000

以下是相关flume控制台输出的一部分:

..
16/08/26 22:20:48 INFO kafka.KafkaSource: Kafka source kafka-source-1 started.
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], begin rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping leader finder thread
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutting down
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Stopped
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutdown completed
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping all fetchers
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] All connections stopped
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared all relevant queues for this fetcher
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared the data chunks in all the consumer message iterators
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Committing all offsets after clearing the fetcher queues
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Releasing partition ownership
16/08/26 22:36:38 INFO consumer.RangeAssignor: Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f rebalancing the following partitions: ArrayBuffer(0) for topic raw_json_tweets with consumers: List(flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0)
16/08/26 22:36:38 INFO consumer.RangeAssignor: flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 attempting to claim partition 0
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 successfully owned partition 0 for topic raw_json_tweets
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f selected partitions : raw_json_tweets:0: fetched offset = -1: consumed offset = -1
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], end rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Starting
16/08/26 22:36:38 INFO utils.VerifiableProperties: Verifying properties
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/08/26 22:36:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:sandbox.hortonworks.com,port:6667 with correlation id 0 for 1 topic(s) Set(raw_json_tweets)
16/08/26 22:36:38 INFO producer.SyncProducer: Connected to sandbox.hortonworks.com:6667 for producing
16/08/26 22:36:38 INFO producer.SyncProducer: Disconnecting from sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0-0], Starting
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Added fetcher for partitions ArrayBuffer([[raw_json_tweets,0], initOffset -1 to broker id:0,host:sandbox.hortonworks.com,port:6667] )

wn9m85ua

wn9m85ua1#

默认情况下,flume的kafka源将只获取在代理启动后写入kafka的消息-它不会从一开始就使用主题。
所以要么:
一定要在flume启动后继续写这个主题
要求kafka源代码从头开始使用readsmallestoffset=true配置。

相关问题