我试图从kafka主题中读取数据并将其写入hdfs文件系统,我使用apex malhar从[https://github.com/apache/apex-malhar/tree/master/examples/kafka]. 不幸的是,在设置了kafka属性和hadoop配置之后,数据并没有在我的hdfs2.6.0系统中创建。ps:控制台没有显示任何错误,似乎一切正常
这里是我的应用程序使用的代码
public class TestConsumer {
public static void main(String[] args) {
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
consumerThread.start();
ApplicationTest a = new ApplicationTest();
try {
a.testApplication();
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里是来自apex malhar的applicationtest类示例
package org.apache.apex.examples.kafka.kafka2hdfs;
import org.apache.log4j.Logger;
import javax.validation.ConstraintViolationException;
import org.junit.Rule;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
import info.batey.kafka.unit.KafkaUnitRule;
/**
* Test the DAG declaration in local mode.
*/
public class ApplicationTest
{
private static final Logger LOG = Logger.getLogger(ApplicationTest.class);
private static final String TOPIC = "kafka2hdfs";
private static final int zkPort = NetUtils.getFreeSocketPort();
private static final int brokerPort = NetUtils.getFreeSocketPort();
private static final String BROKER = "localhost:" + brokerPort;
private static final String FILE_NAME = "test";
private static final String FILE_DIR = "./target/tmp/FromKafka";
// broker port must match properties.xml
@Rule
private static KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
public void testApplication() throws Exception
{
try {
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun();
lc.shutdown();
} catch (ConstraintViolationException e) {
LOG.error("constraint violations: " + e.getConstraintViolations());
}
}
private Configuration getConfig()
{
Configuration conf = new Configuration(false);
String pre = "dt.operator.kafkaIn.prop.";
conf.setEnum(pre + "initialOffset", AbstractKafkaInputOperator.InitialOffset.EARLIEST);
conf.setInt(pre + "initialPartitionCount", 1);
conf.set(pre + "topics", TOPIC);
conf.set(pre + "clusters", BROKER);
pre = "dt.operator.fileOut.prop.";
conf.set(pre + "filePath", FILE_DIR);
conf.set(pre + "baseName", FILE_NAME);
conf.setInt(pre + "maxLength", 40);
conf.setInt(pre + "rotationWindows", 3);
return conf;
}
private LocalMode.Controller asyncRun() throws Exception
{
Configuration conf = getConfig();
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(new KafkaApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
return lc;
}
}
1条答案
按热度按时间uwopmtnx1#
在runasync之后和关闭之前,您需要等待预期的结果(否则dag将立即退出)。这就是例子中的实际情况。