我刚刚实现了一个trident drpc函数来处理传入的消息,我正在尝试将拓扑的最后阶段处理的元组数作为trident状态持久化。以下是我的拓扑结构:
topology.newDRPCStream("portfolio")
.map(parseMapFunction,
new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
"portfolioTimestamp", "portfolioPayload"))
.filter(new FilterNull())
.flatMap(splitMapFunction,
new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
"portfolioTimestamp", "strategyCode"))
.parallelismHint(1)
.shuffle()
.each(new Fields("strategyCode"), findMongoTradesFunction,
new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
"tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
.parallelismHint(10)
.shuffle()
.filter(tradeFilterFunction)
.parallelismHint(150)
.groupBy(new Fields("uitid"))
.aggregate(
new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
"sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
"riskViewTo", "uitid"), reduceAggregateFunction,
new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
"sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
"riskViewTo"))
.parallelismHint(200)
.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));
当我尝试将此拓扑提交到storm时,出现以下错误:
Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more
如果我从拓扑中删除最后两个函数,我可以成功提交拓扑,它们是:
.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));
在运行聚合函数(aggregate())之后,我想用“portfoliourn”字段对元组进行分组,并将计数持久化到mongodb中。我不明白为什么最后一个groupby().persistentaggregate()节会导致此错误。你能帮我找出原因吗?
1条答案
按热度按时间1mrurvl11#
经过一番研究,我发现这一页似乎是一个类似的情况。nathan marz说drpc拓扑不支持分区持久化(截至2013年),我相信我的情况也是如此。我认为,(没有完全验证)Storm1.2.1DRPC拓扑可能根本不支持状态持久性。