storm拓扑配置

up9lanfz  于 2021-06-24  发布在  Storm
关注(0)|答案(7)|浏览(373)

如何为storm拓扑提供自定义配置?例如,如果我构建了一个连接到mysql集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接到的服务器,我该如何做呢?我的首选是使用配置文件,但我担心的是该文件本身没有部署到集群,因此它不会运行(除非我对集群如何工作的理解有缺陷)。到目前为止,我所看到的在运行时将配置选项传递到storm拓扑的唯一方法是通过命令行参数,但是当您获得大量参数时,这种方法会很混乱。
有一个想法是利用shell脚本将文件读入一个变量,并将该变量的内容作为字符串传递给拓扑,但如果可能的话,我希望有更干净的东西。
还有人遇到过这种情况吗?如果是,你是怎么解决的?
编辑:
它似乎需要提供更多的澄清。我的设想是,我有一个拓扑,我希望能够部署在不同的环境中,而不必重新编译它。通常,我会创建一个配置文件,其中包含数据库连接参数之类的内容,并将其传入。我想知道在Storm中怎样做那样的事。

5cnsuln7

5cnsuln71#

我通过在代码中提供配置解决了这个问题:

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, SOME_OPTS);

我试图提供一个特定的拓扑结构 storm.yaml 但它不起作用。纠正我,如果你让它工作,使用风暴.yaml。
更新:
对于任何想知道某些选项是什么的人,这是来自storm邮件列表上的satish duggana:
config.topology\u worker\u childopts:可以覆盖拓扑的worker\u childopts的选项。您可以配置任何java选项,如内存、gc等
你的情况可能是

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1g");
mrphzbgm

mrphzbgm2#

我面临着和你一样的问题,下面是我棘手的解决办法:
使用一个简单的java文件作为配置文件,比如 topo_config.java ,它看起来像:

package com.xxx
public class topo_config {
    public static String zk_host = "192.168.10.60:2181";
    public static String kafka_topic = "my_log_topic";
    public static int worker_num = 2;
    public static int log_spout_num = 4;
    // ...
}

这个文件放在我的configure文件夹中,然后写一个脚本,比如 compile.sh 将其复制到正确的包并进行编译,如下所示:

cp config/topo_config.java src/main/java/com/xxx/
mvn package

直接实现配置:

Config conf = new Config();
conf.setNumWorkers(topo_config.worker_num);
goqiplq2

goqiplq23#

其思想是,在构建拓扑时,创建喷口和螺栓的示例(以及其他内容),这些示例被序列化并分发到集群中的正确位置。如果要配置喷口或喷口的行为,请在提交拓扑之前创建拓扑时进行配置,并通过在喷口或喷口上设置示例变量进行配置,从而驱动所需的可配置行为。

qni6mghb

qni6mghb4#

实际上,最适合您的方法是将配置存储在一个可变的键值存储(s3、redis等)中,然后将其拉入以配置一个数据库连接,然后使用该连接(我假设您已经计划限制与数据库通信的频率,以便获取此配置的开销不是什么大问题)。这种设计允许您动态更改数据库连接,甚至不需要重新部署拓扑。

xlpyo6sf

xlpyo6sf5#

我们已经看到了同样的问题,并通过在每个拓扑中添加以下内容来解决它

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true");

还验证了使用NimbusUI它显示如下。

topology.worker.childopts   -Xmx4096m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true
vlf7wbxs

vlf7wbxs6#

可以指定随拓扑一起提交的配置(通常通过yaml文件)。在我们自己的项目中,我们如何管理这一点是因为我们有单独的用于开发的配置文件和用于生产的配置文件,在其中我们存储服务器、redis和db ip以及端口等。然后,当我们运行命令来构建jar并将拓扑提交给storm时,它包含了正确的配置文件,具体取决于您的部署环境。螺栓和喷口只需从stormconfMap中读取它们所需的配置,该Map在螺栓的prepare()方法中传递给它们。
从http://storm.apache.org/documentation/configuration.html :
每个配置都有一个在storm代码库的defaults.yaml中定义的默认值。您可以通过在nimbus和supervisors的类路径中定义storm.yaml来覆盖这些配置。最后,您可以定义在使用stormsubmitter时随拓扑一起提交的特定于拓扑的配置。但是,特定于拓扑的配置只能覆盖前缀为“topology”的配置。
storm 0.7.0及更高版本允许您在每个螺栓/每个喷口的基础上覆盖配置。
你还会看到http://nathanmarz.github.io/storm/doc/backtype/storm/stormsubmitter.html submitjar和submittopology被传递一个名为conf的Map。
希望这能让你开始。

zf9nrax1

zf9nrax17#

我也面临同样的问题,我通过在集群中配置nfs解决了这个问题,我将配置文件放在共享位置,这样所有集群机器都可以使用它。

相关问题