ApacheApex:如何将kafka主题中的数据写入hdfs文件系统?

qzlgjiam  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(381)

我试图从kafka主题中读取数据并将其写入hdfs文件系统,我使用apex malhar从[https://github.com/apache/apex-malhar/tree/master/examples/kafka]. 不幸的是,在设置了kafka属性和hadoop配置之后,数据并没有在我的hdfs2.6.0系统中创建。ps:控制台没有显示任何错误,似乎一切正常
这里是我的应用程序使用的代码

  1. public class TestConsumer {
  2. public static void main(String[] args) {
  3. Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
  4. consumerThread.start();
  5. ApplicationTest a = new ApplicationTest();
  6. try {
  7. a.testApplication();
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

这里是来自apex malhar的applicationtest类示例

  1. package org.apache.apex.examples.kafka.kafka2hdfs;
  2. import org.apache.log4j.Logger;
  3. import javax.validation.ConstraintViolationException;
  4. import org.junit.Rule;
  5. import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.net.NetUtils;
  8. import com.datatorrent.api.LocalMode;
  9. import info.batey.kafka.unit.KafkaUnitRule;
  10. /**
  11. * Test the DAG declaration in local mode.
  12. */
  13. public class ApplicationTest
  14. {
  15. private static final Logger LOG = Logger.getLogger(ApplicationTest.class);
  16. private static final String TOPIC = "kafka2hdfs";
  17. private static final int zkPort = NetUtils.getFreeSocketPort();
  18. private static final int brokerPort = NetUtils.getFreeSocketPort();
  19. private static final String BROKER = "localhost:" + brokerPort;
  20. private static final String FILE_NAME = "test";
  21. private static final String FILE_DIR = "./target/tmp/FromKafka";
  22. // broker port must match properties.xml
  23. @Rule
  24. private static KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
  25. public void testApplication() throws Exception
  26. {
  27. try {
  28. // run app asynchronously; terminate after results are checked
  29. LocalMode.Controller lc = asyncRun();
  30. lc.shutdown();
  31. } catch (ConstraintViolationException e) {
  32. LOG.error("constraint violations: " + e.getConstraintViolations());
  33. }
  34. }
  35. private Configuration getConfig()
  36. {
  37. Configuration conf = new Configuration(false);
  38. String pre = "dt.operator.kafkaIn.prop.";
  39. conf.setEnum(pre + "initialOffset", AbstractKafkaInputOperator.InitialOffset.EARLIEST);
  40. conf.setInt(pre + "initialPartitionCount", 1);
  41. conf.set(pre + "topics", TOPIC);
  42. conf.set(pre + "clusters", BROKER);
  43. pre = "dt.operator.fileOut.prop.";
  44. conf.set(pre + "filePath", FILE_DIR);
  45. conf.set(pre + "baseName", FILE_NAME);
  46. conf.setInt(pre + "maxLength", 40);
  47. conf.setInt(pre + "rotationWindows", 3);
  48. return conf;
  49. }
  50. private LocalMode.Controller asyncRun() throws Exception
  51. {
  52. Configuration conf = getConfig();
  53. LocalMode lma = LocalMode.newInstance();
  54. lma.prepareDAG(new KafkaApp(), conf);
  55. LocalMode.Controller lc = lma.getController();
  56. lc.runAsync();
  57. return lc;
  58. }
  59. }
uwopmtnx

uwopmtnx1#

在runasync之后和关闭之前,您需要等待预期的结果(否则dag将立即退出)。这就是例子中的实际情况。

相关问题