storm 0.10.0重用拓扑设计?

whitzsjs  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(317)

下面的设计能在Storm中完成吗?
让我们以下面的wordcount示例为例https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/wordcounttopology.java 我正在把单词生成器的喷口改成文件读取器的喷口
这个字计数拓扑的设计是1。一行一行地读文件并造句。把句子分成三个单词。bolt添加唯一的单词并给出单词及其对应的计数
因此,拓扑结构在某种程度上描述了一个文件在计算它所拥有的唯一单词时需要采用的流。
如果我有两个文件file1和file2,其中一个应该能够调用同一个拓扑,并创建此拓扑的两个示例来运行相同的字数。
为了跟踪字数计数是否确实已完成,一旦文件被处理,字数计数拓扑的示例应具有已完成状态。
在当前的storm设计中,我发现拓扑是实际的示例,所以它就像一个任务。
需要使用不同的拓扑名称进行两个不同的调用,如
对于文件1 StormSubmitter.submitTopology("WordCountTopology1", conf,builder.createTopology()); 对于文件2 StormSubmitter.submitTopology("WordCountTopology2", conf,builder.createTopology()); 更不用说使用storm客户端上传jar了
storm jar stormwordcount-1.0.0-jar-with-dependencies.jar com.company.WordCount1Main.App "server" "filepath1" storm jar stormwordcount-1.0.0-jar-with-dependencies.jar com.company.WordCount2Main.App "server" "filepath2" 另一个问题是,一旦处理完文件,拓扑就不会完成。他们一直活着,在我们对他们发出杀戮之前 storm kill "WordCountTopology" 我了解到,在流媒体世界中,消息来自kafka这样的消息队列,没有消息的结尾,但是在实体/消息固定的文件世界中,消息的结尾是如何相关的。
是否有一个api可以执行以下操作?
//创建拓扑,使用storm上传相应的jar一次 StormSubmitter.submitTopology("WordCountTopology", conf,builder.createTopology()); 一旦上传了应用程序代码,就用agruments//示例化拓扑,创建一个拓扑示例并给出一个状态跟踪器 JobTracker tracker = StormSubmitter.runTopology("WordCountTopology", conf, args); //如果当前作业已完成或未完成,则可以查询风暴 JobStatus status = StormSubmitter.getTopologyStatus(conf, tracker);

pgvzfuti

pgvzfuti1#

文章中提到的字数拓扑结构并不能公正地解释风暴的威力和力量。因为storm是一个流处理器,所以它需要一个流;句号。根据定义,文件是静态文件。我很同情storm的开发人员,他们如何通过一个简单的hello world来展示拓扑概念和非流技术(如文件)的应用。所以对于那些正在学习风暴的新手来说,我当时是一个很难理解如何用这个例子来发展的人。这个示例只是展示storm概念如何工作的一种方式,而不是一个真正的word应用程序,它说明了文件将如何出现或需要如何处理。
下面是一个解决方案。
由于拓扑一直在运行,因此它们可以计算任意时间段的字数,即在一个文件中或在任何时间段的所有文件中。
为了允许不同的文件进来,我们需要一个流嘴。因此,您自然需要kafka消息代理或类似的代理来接收流中的文件。根据文件的大小和消息代理设置的限制(即kafka,具有1 mb的文件限制),我们可以选择将文件本身作为有效负载或文件的引用发送,在这种情况下,您需要一个分布式文件系统来存储文件,即hadoop dfs或nas。
然后我们使用Kafka喷口来读取这些文件,而不是filespout。
我们现在有以下问题1。文件2的字数。每个文件的字数3。字数统计的运行状态,直到它被处理4。我们什么时候知道一个文件是否已处理或完成
跨文件字数
使用所提供的示例,这是示例所针对的用例,因此如果我们继续流式处理文件,并且在每个文件中读取行、拆分单词并发送给其他bolt,bolt将独立于它来自哪个文件来计算单词。
一只棕色的狐狸跳了起来。。。从前一只狐狸。。。
野狐。。。从前的福克斯(不需要,因为它来在文件1)。。。
每个文件的字数为了做到这一点,我们现在需要将要附加fileid的字的字段分组。所以现在这个例子需要修改,为它拆分的每个单词包含一个fileid。一只棕色的狐狸跳了起来。。。从前一只狐狸。。。
因此,按单词分组的字段将是(取消噪声单词)
file1\u快速file1\u棕色file1\u狐狸
文件2\u一次文件2\u一次文件2\u福克斯
由于所有这些计数都在bolt的内存中,我们不知道eof,因此无法获取状态,除非有人进入bolt,或者我们定期将计数发送到另一个数据存储,在那里我们可以查询它。这正是我们需要做的,我们需要定期将内存中的bolt计数持久化到hbase、elastic、mongodb等数据存储中
当我们知道一个文件是否被处理或完成时也许这是流媒体世界中最难回答的问题,基本上流处理器并不知道steam已经完成,因为从它的Angular 来看,流是文件进入,它需要将每个文件分割成字,并在相应的螺栓中计数。所以他们不知道在它到达每个演员之前或之后发生了什么。这一切都需要由应用程序开发人员来完成。一种方法是在读取每个文件时,我们计算总字数并发送消息文件1:总字数:1000文件2:总字数:2000
现在,当我们进行字数计算并在每个文件中找到不同的字数时,在我们说一个文件完成之前,单个字数和总字数应该匹配。所有这些都是自定义逻辑,我们需要先编写,然后才能说它是完整的。
因此,在essential storm中,它提供了以各种方式进行流处理的框架。应用程序开发人员的工作是根据其拥有的设计进行开发,并根据用例实现自己的逻辑。它没有提供开箱即用的应用程序用例,也没有很好的参考实现,我认为我们需要构建它,因为它不是一个商业产品,依赖于社区来支持。

ozxc1zmp

ozxc1zmp2#

对于两次重用同一拓扑,有两种可能:
1) 为文件spiut使用构造函数参数,并用两次不同的参数示例化同一拓扑:

private StormTopology createMyTopology(String filename) {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("File Spout", new FileSpout(filename));
    // add further spouts and bolts etc.
    return builder.createTopology();
}

public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file1));
    } else {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file2));
    }
}

2) 作为一种替代方法,您可以在 open() 方法。

public class FileSpout extends IRichSpout {
    @Override
    public void open(Map conf, ...) {
        String filenmae = (String)conf.get("FILENAME");
        // ...
    }
    // other methods omitted
}

    public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        c.put("FILENAME", file1);
    } else {
        c.put("FILENAME", file2);
    }

    // assembly topology...

    StormSubmitter.submitTopology("T", c, builder.createTopology());
}

第二个问题:storm中没有自动终止拓扑的api。你可以用 TopologyInfo 并监控喷口发射的元组数。如果它在一段时间内没有改变,您可以假设整个文件都已被读取,然后终止拓扑。

Config cfg = new Config();
// set NIMBUS_HOST and NIMBUS_THRIFT_PORT in cfg
Client client = NimbusClient.getConfiguredClient(cfg).getClient();
TopologyInfo info = client.getTopologyInfo("topologyName");
// get emitted tuples...
client.killTopology("topologyName");

相关问题