Flume版本:-1.6
Kafka版本:-1.0.0
zookeeper版本:-3.4.10
我们很快就需要将flume与kafka和hadoop连接起来,所以我们从kafka消费者那里接收事件并将其接收到hadoop中。一切都是用conf文件配置的,直到这里一切正常。
现在我们需要检查是否可以使用定制的java代码来完成。我在互联网上尝试了很多选项来设计Kafka源和hdfs接收器。我在cloudera虚拟机上试过这个。
Kafka和zookeeper都启动了。
代码正在运行,但在生成消息时,hdfs中没有插入任何内容。
如果有人能指出我所缺少的东西,那将是非常有帮助的。
我试过的代码是。。
KafkaChannel channel = new KafkaChannel();
Map<String, String> channelParamters = new HashMap<String, String>();
channelParamters.put("brokerList", "localhost:9092");
channelParamters.put("zookeeperConnect","localhost:2181");
channelParamters.put("topic","integration");
channelParamters.put("groupId","channel");
channelParamters.put("batchSize", "15");
channelParamters.put("zookeeper.connect","localhost:2181");
channelParamters.put("clientId", "channel");
channelParamters.put("readSmallestOffset","true");
channelParamters.put("interceptors","i1");
channelParamters.put("interceptors.i1.type","host");
channelParamters.put("consumer.timeout.ms","1000");
channelParamters.put("parseAsFlumeEvent", "false");
channel.setName("KafkaSource");
Context channelContext = new Context(channelParamters);
final Map<String, String> properties = new HashMap<String, String>();
/**Sink Properties start**/
HDFSEventSink eventSink = new HDFSEventSink();
eventSink.setName("HDFSEventSink-" + "kafkaEventSink");
String hdfsBasePath = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events";
properties.put("hdfs.type", "hdfs");
properties.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H");
properties.put("hdfs.rollInterval ", "0");
properties.put("hdfs.rollSize ", "2048");
properties.put("hdfs.rollCount ", "0");
properties.put("hdfs.fileType ", " DataStream");
properties.put("channel", channel.getName());
properties.put("hdfs.maxOpenFiles", String.valueOf(1));
properties.put("hdfs.filePrefix ", " kafka_host");
properties.put("hdfs.fileSuffix ", " .txt");
properties.put("hdfs.idleTimeout ", "60");
/**Sink Properties end**/
Context sinkContext = new Context(properties);
eventSink.configure(sinkContext);
eventSink.setChannel(channel);
Configurables.configure(channel, channelContext);
eventSink.start();
channel.start();
1条答案
按热度按时间gc0ot86w1#
我不清楚您的自定义java代码要实现什么。
更好的方法是使用kafka connect(apachekafka的一部分)和开源hdfs连接器。它做的正是您在这里所做的,只是要设置的配置文件、处理模式、扩展、执行自动故障切换等等。