我对拓扑有问题。我试着解释一下工作流程。。。我有一个每2分钟发出约500k元组的源,这些元组必须由一个喷口读取,并像单个对象(我认为是三叉戟中的一批)一样处理一次。之后,bolt/函数/what else?…必须附加一个时间戳并将元组保存到redis中。
我尝试用一个函数实现一个trident拓扑,该函数使用一个jedis对象(redis library for java)将所有元组保存到redis中,并将其放入这个函数类中,但是当我部署时,我在这个对象上收到一个notserializable异常。
我的问题是,如何实现一个在redis上写这批元组的函数?在网上阅读时,我找不到任何从函数写到redis的例子,也找不到任何在trident中使用state对象的例子(可能我不得不使用它…)
我的简单拓扑:
TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));
提前谢谢
2条答案
按热度按时间2sbarzqh1#
(由于redis相关的具体问题在其他评论中似乎已经解决,所以一般回复state)
当我们记住storm从分布式(或“分区”)数据源(通过storm“喷口”)读取数据,并行处理多个节点上的数据流时,storm中db更新的概念变得更清晰,可选地对这些数据流执行计算(称为“聚合”),并将结果保存到分布式数据存储(称为“状态”)。聚合是一个非常宽泛的术语,只是指“计算的东西”:例如,计算流上的最小值在storm中被视为以前已知的最小值与集群中某个节点当前处理的新值的聚合。
考虑到聚合和分区的概念,我们可以看看storm中的两个主要原语,它们允许在一个状态下保存某些内容:partitionpersist和persistentaggregate,第一个原语在每个集群节点的级别上运行,不与其他分区协调,感觉有点像通过dao与db交谈,第二种方法涉及到对元组进行“重新划分”(即在集群中重新分布元组,通常是沿着一些groupby逻辑),在将某些内容读取/保存到db之前进行一些计算(“聚合”),感觉有点像是在与hashmap而不是db交谈(在这种情况下,storm将db称为“mapstate”),或者“快照”(如果Map上只有一个键)。
还有一点需要注意的是,storm的精确一次语义不是通过只处理一次每个元组来实现的:这太脆弱了,因为在我们的拓扑中定义的每个元组可能有几个读/写操作,出于可伸缩性的原因,我们希望避免大规模的两阶段提交,网络分区变得更有可能。相反,storm通常会继续重放元组,直到他确定它们至少已经被完全成功地处理了一次。这与状态更新的重要关系是,storm给我们提供了允许幂等状态更新的基元(opaquemap),这样这些重放就不会损坏以前存储的数据。例如,如果我们对数字[1,2,3,4,5]进行求和,则保存在db中的结果将始终是15,即使由于某些瞬时故障而在“求和”操作中多次重放和处理它们。opaquemap对用db保存数据的格式有轻微影响。请注意,只有当我们告诉storm这样做时,这些重放和不透明逻辑才会出现,但我们通常会这样做。
如果你有兴趣阅读更多,我在这里发表了两篇关于这个主题的博客文章。
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/
最后一件事:正如上面重播内容所暗示的,storm本质上是一种非常异步的机制:我们通常有一些数据生产者在排队系统中发布事件(例如。Kafka或0mq)和风暴读从那里。因此,如问题中所建议的,从storm内分配时间戳可能具有或可能不具有期望的效果:该时间戳将反映“最近的成功处理时间”,而不是数据摄取时间,并且在重放元组的情况下,它当然将不相同。
fjnneemd2#
你为redis试过三叉戟吗。github上已经有一个代码:https://github.com/kstyrc/trident-redis.
让我知道这是否回答你的问题。