package com.samza;
public class MyTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka","topicOut");
public void process (IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator) throws Exception {
// Do something useful
}
}
4条答案
按热度按时间olqngx591#
阅读更多的文档,再看一些hello samza示例,如果您将其部署到yarn,请阅读更多关于它的内容。你要找的答案都在那里。
hello samza有三份工作。选择一个并遵循它,配置,启动脚本等。
下面是hello samza页面中的如何启动wikipedia提要作业
属性文件显示编译的作业/任务代码的位置。wikipedia提要作业/任务的源代码如下:
https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/wikipediafeedstreamtask.java
只是修改这个工作,或复制和修改,让你的工作。
h4cxqtbf2#
如果您按照hello samza的说明操作,您的本地计算机上将运行一个功能齐全的zookeeper、kafka和yarn/samza集群。有了这个项目,就可以运行与wikipedia提要相关的任务来测试东西。
但是,和您一样,我在为新任务(没有集群管理功能)提供正确的目录结构和构建设置时遇到了一些问题。所以,我创建了hello samza base,去掉了hello samza之外的所有不必要的新任务。我在自述文件中包含了关于构建新任务的说明。
就部署而言,这要复杂一些。读一些关于创建zookeeper、kafka和Yarn簇的文章。
gfttwv5a3#
我通过maven eclipse项目创建了samza jobs。版本0.9.2的依赖项,其中pom.xml文件中加载了此内容(我有一些版本问题,因此您可能在那里有一些工作):
作业的基本代码如下:
编译完成后,需要将其分组到一个jar文件中,并将其放置在所有samza节点、web或hdf都可以访问的位置。
从您必须创建的属性文件中引用它才能启动它。在porject网页中查找示例。
crcmnpdw4#
建立自己的工作是非常简单的。先打个招呼samza:
下一步是通过以下命令设置系统:
请确保一切顺利
jps
下一步是从pom.xml中删除ApacheRat插件,而不是在HelloSamza中构建项目。删除时,可以在src文件夹(mytask.java)中添加一个java文件作业,也可以在config目录(my.task.properties)中添加一个.properties文件
这是一个示例空作业(mytask.java)。
别忘了实现一个.properties文件。
如果您有一个无错误代码,请使用maven进行构建,如:
之后,你的服务器就启动了(如果没有,你可以通过
./bin/grid start all)
你可以通过deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/MyTask.properties
并由Kafka客户消费结果deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outTopic