使用samza消费远程Kafka主题

a64a0gku  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(422)

我正在尝试修改hello samza教程:
(1) 从远程代理(即非localhost)上的kafka主题读取(2)将消息写入文件
我修改了wikipediafeedstreamtask.java,如下所示:

public class WikipediaFeedStreamTask implements StreamTask {
  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "wikipedia-ra\
w");

  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoo\
rdinator coordinator) {
      //System.out.println("Message Received!");
      //System.out.println(envelope.getMessage());
      try{
      PrintWriter writer = new PrintWriter("test.txt", "UTF-8");
      writer.println(envelope.getMessage());
      writer.println("The second line");
      writer.close();}
      catch(IOException e)
          {}
      Map<String, Object> outgoingMap = WikipediaFeedEvent.toMap((WikipediaFeedEvent) envel\
ope.getMessage());
    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
  }
}

这只是一个标准文件,添加了将消息写入文件的功能。
我将属性文件修改为:


# Job

job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-feed

# YARN

yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task

task.class=samza.examples.wikipedia.task.WikipediaFeedStreamTask
task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews

# Serializers

serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Wikipedia System

systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
systems.wikipedia.host=irc.wikimedia.org
systems.wikipedia.port=6667

# Kafka System

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=REMOTE-ZOOKEEPER-IP:2181/
systems.kafka.producer.bootstrap.servers=REMOTE-BROKER-IP:9092

# Job Coordinator

job.coordinator.system=kafka

当我运行作业(像这样)时,我会在test.txt中看到来自wikipedia流的数据。我的假设显然是错误的,即只要修改.properties文件中的kafka消费者值,samza就会被迫从该代理读取数据。那我需要改变什么呢?
在哪里指定samza应该监听的主题名?

j1dl9f46

j1dl9f461#

我看到你修改了Kafka系统的连接字符串。但是,streamtask的输入仍然引用wikipedia中的流: task.inputs=wikipedia.#en.wikipedia,wikipedia.#en.wiktionary,wikipedia.#en.wikinews 你应该改变 task.inputs 阅读“kafka.$yourinputstreamname”。请试一试。我想这应该能解决你的问题。

相关问题