如何写我自己的工作在萨姆扎

mum43rcc  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(409)

最近我尝试在samza框架上做一些流处理工作。我已经成功地部署了hello samza示例。然而,当我试图写我自己的工作,我不知道从哪里开始我的工作。
我看了这份文件,但还是不明白重点。有人能帮我吗:
我的代码的体系结构是什么(源代码、库代码和配置)。
我的代码将被推入哪个目录。
我还需要做些什么来运行我的代码。
你的建议对我很有帮助,非常感谢!

olqngx59

olqngx591#

阅读更多的文档,再看一些hello samza示例,如果您将其部署到yarn,请阅读更多关于它的内容。你要找的答案都在那里。
hello samza有三份工作。选择一个并遵循它,配置,启动脚本等。
下面是hello samza页面中的如何启动wikipedia提要作业

deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties

属性文件显示编译的作业/任务代码的位置。wikipedia提要作业/任务的源代码如下:
https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/wikipediafeedstreamtask.java
只是修改这个工作,或复制和修改,让你的工作。

h4cxqtbf

h4cxqtbf2#

如果您按照hello samza的说明操作,您的本地计算机上将运行一个功能齐全的zookeeper、kafka和yarn/samza集群。有了这个项目,就可以运行与wikipedia提要相关的任务来测试东西。
但是,和您一样,我在为新任务(没有集群管理功能)提供正确的目录结构和构建设置时遇到了一些问题。所以,我创建了hello samza base,去掉了hello samza之外的所有不必要的新任务。我在自述文件中包含了关于构建新任务的说明。
就部署而言,这要复杂一些。读一些关于创建zookeeper、kafka和Yarn簇的文章。

gfttwv5a

gfttwv5a3#

我通过maven eclipse项目创建了samza jobs。版本0.9.2的依赖项,其中pom.xml文件中加载了此内容(我有一些版本问题,因此您可能在那里有一些工作):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.acio.samza</groupId>
      <artifactId>samzafroga</artifactId>
      <version>0.0.1</version>
      <name>samzafroga</name>
       <dependencies>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-api</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-core_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-log4j</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-shell</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-yarn_2.10</artifactId>
          <exclusions>
            <exclusion>
                <artifactId>jdk.tools</artifactId>
                <groupId>jdk.tools</groupId>
            </exclusion>
        </exclusions>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-kv_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-kv-rocksdb_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.samza</groupId>
          <artifactId>samza-kafka_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.schwering</groupId>
          <artifactId>irclib</artifactId>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <dependency>
          <groupId>org.codehaus.jackson</groupId>
          <artifactId>jackson-jaxrs</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
            <version>${jettyVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-webapp</artifactId>
            <version>${jettyVersion}</version>
        </dependency>
       </dependencies>
      <properties>
        <!-- maven specific properties -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <samza.version>0.8.0</samza.version>
        <jettyVersion>7.6.16.v20140903</jettyVersion>
      </properties>
        <repositories>
        <repository>
          <id>apache-releases</id>
          <url>https://repository.apache.org/content/groups/public</url>
        </repository>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-tools Maven2 Repository</name>
          <url>https://oss.sonatype.org/content/groups/scala-tools</url>
        </repository>
      </repositories>

      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
       <build>
        <pluginManagement>
          <plugins>
            <plugin>
              <groupId>org.apache.rat</groupId>
              <artifactId>apache-rat-plugin</artifactId>
              <version>0.9</version>
              <configuration>
                <excludes>
                  <exclude>*.patch</exclude>
                  <exclude>**/target/**</exclude>
                  <exclude>*.json</exclude>
                  <exclude>.vagrant/**</exclude>
                  <exclude>.git/**</exclude>
                  <exclude>*.md</exclude>
                  <exclude>docs/**</exclude>
                  <exclude>config/**</exclude>
                  <exclude>bin/**</exclude>
                  <exclude>.gitignore</exclude>
                  <exclude>**/.cache/**</exclude>
                  <exclude>deploy/**</exclude>
                  <exclude>**/.project</exclude>
                </excludes>
              </configuration>
             </plugin>
          </plugins>
        </pluginManagement>
        <plugins>

          <!-- plugin to build the tar.gz file filled with examples -->
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.3</version>
            <configuration>
              <descriptors>
                <descriptor>src/assembly/bin.xml</descriptor>
              </descriptors>
               <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
       <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-api</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-core_2.10</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-log4j</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-shell</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-yarn_2.10</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-kv_2.10</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-kv-rocksdb_2.10</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.samza</groupId>
                <artifactId>samza-kafka_2.10</artifactId>
                <version>${samza.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.schwering</groupId>
                <artifactId>irclib</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-jaxrs</artifactId>
                <version>1.8.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
       </dependencyManagement>
    </project>

作业的基本代码如下:

package xxxx;

import java.util.Map;

import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;

public class Redirect implements StreamTask {
      private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "samzaout");

     public void process(IncomingMessageEnvelope envelope,
         MessageCollector collector,
         TaskCoordinator coordinator)
     {
         String msg = (String)envelope.getMessage();
         // Transformation

         String outmsg = "xxx-" + msg + "-xxx";
         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
     }
}

编译完成后,需要将其分组到一个jar文件中,并将其放置在所有samza节点、web或hdf都可以访问的位置。
从您必须创建的属性文件中引用它才能启动它。在porject网页中查找示例。

crcmnpdw

crcmnpdw4#

建立自己的工作是非常简单的。先打个招呼samza:

git clone https://git.apache.org/samza-hello-samza.git hello-samza

下一步是通过以下命令设置系统:

bin/grid bootstrap

请确保一切顺利 jps 下一步是从pom.xml中删除ApacheRat插件,而不是在HelloSamza中构建项目。
删除时,可以在src文件夹(mytask.java)中添加一个java文件作业,也可以在config目录(my.task.properties)中添加一个.properties文件
这是一个示例空作业(mytask.java)。

package com.samza;
public class MyTask implements StreamTask {
    private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka","topicOut"); 
    public void process (IncomingMessageEnvelope envelope,                  MessageCollector collector,                 
    TaskCoordinator coordinator) throws Exception { 
        // Do something useful
    }
}

别忘了实现一个.properties文件。
如果您有一个无错误代码,请使用maven进行构建,如:

mvn clean package
mkdir -p deploy/samza
tar -xvf ./samza-job-package/target/samza-job-package-0.10.0-dist.tar.gz -C deploy/samza

之后,你的服务器就启动了(如果没有,你可以通过 ./bin/grid start all) 你可以通过 deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/MyTask.properties 并由Kafka客户消费结果 deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outTopic

相关问题