FlumeJava自定义接收器和源代码

uqzxnwby  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(307)

Flume版本:-1.6
Kafka版本:-1.0.0
zookeeper版本:-3.4.10
我们很快就需要将flume与kafka和hadoop连接起来,所以我们从kafka消费者那里接收事件并将其接收到hadoop中。一切都是用conf文件配置的,直到这里一切正常。
现在我们需要检查是否可以使用定制的java代码来完成。我在互联网上尝试了很多选项来设计Kafka源和hdfs接收器。我在cloudera虚拟机上试过这个。
Kafka和zookeeper都启动了。
代码正在运行,但在生成消息时,hdfs中没有插入任何内容。
如果有人能指出我所缺少的东西,那将是非常有帮助的。
我试过的代码是。。

KafkaChannel channel = new KafkaChannel();

Map<String, String> channelParamters = new HashMap<String, String>();

channelParamters.put("brokerList", "localhost:9092");
channelParamters.put("zookeeperConnect","localhost:2181");
channelParamters.put("topic","integration");
channelParamters.put("groupId","channel");
channelParamters.put("batchSize", "15");
channelParamters.put("zookeeper.connect","localhost:2181");
channelParamters.put("clientId", "channel");
channelParamters.put("readSmallestOffset","true");
channelParamters.put("interceptors","i1");
channelParamters.put("interceptors.i1.type","host");
channelParamters.put("consumer.timeout.ms","1000");

channelParamters.put("parseAsFlumeEvent", "false");

channel.setName("KafkaSource");

Context channelContext = new Context(channelParamters);

final Map<String, String> properties = new HashMap<String, String>();

/**Sink Properties start**/

HDFSEventSink eventSink = new HDFSEventSink();

eventSink.setName("HDFSEventSink-" + "kafkaEventSink");

String hdfsBasePath = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events";

properties.put("hdfs.type", "hdfs");
properties.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H");
properties.put("hdfs.rollInterval ", "0");
properties.put("hdfs.rollSize ", "2048");
properties.put("hdfs.rollCount ", "0");
properties.put("hdfs.fileType ", " DataStream");
properties.put("channel", channel.getName());
properties.put("hdfs.maxOpenFiles", String.valueOf(1));

properties.put("hdfs.filePrefix ", " kafka_host");
properties.put("hdfs.fileSuffix ", " .txt");
properties.put("hdfs.idleTimeout ", "60");

/**Sink Properties end**/

Context sinkContext = new Context(properties);

eventSink.configure(sinkContext);

eventSink.setChannel(channel);

Configurables.configure(channel, channelContext);

eventSink.start();

channel.start();
gc0ot86w

gc0ot86w1#

我不清楚您的自定义java代码要实现什么。
更好的方法是使用kafka connect(apachekafka的一部分)和开源hdfs连接器。它做的正是您在这里所做的,只是要设置的配置文件、处理模式、扩展、执行自动故障切换等等。

相关问题