运行拓扑时storm中的nullpointerexception

z18hc3ub  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(394)

我要走了 NullPointerException s在 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor 方法运行拓扑时,特别是在螺栓中。喷口已正式执行。
storm的疑难解答页面说,这可能是由于多个线程在outputcollector上发出方法造成的。但是,我看不出这与我的案子有什么关系。
下面是喷口的代码:

(defspout stub-spout ["stub-spout"]
  [conf context collector]
  (spout
    (nextTuple []
      (let [channel-value (<!! storm-async-channel)]
        (emit-spout! collector [channel-value])))
    (ack [id]
      ))))

对于螺栓:

(defbolt stub-bolt ["stub-bolt"] [tuple collector]
  (println "Invocation!")
  (let [obj (get tuple "object")
        do-some-calculations (resolve 'calclib/do-some-calculations)
        new-obj (do-some-calculations obj)]
    (emit-bolt! collector new-obj)))

经过调查后发现 resolve 返回null(我需要 resolve 在运行时,在位于 calclib ).
但代码在本地集群中运行正常。为什么会这样?
如有任何建议,我们将不胜感激。谢谢!

j1dl9f46

j1dl9f461#

我想我找到了解决办法。螺栓定义更改为准备好的螺栓:

(defbolt stub-bolt ["stub-bolt"] 
  {:prepare true}
  [conf context collector]
  (let [f (load "/calclib/core")
        do-some-calculations (resolve 'calclib/do-some-calculations)]
    (bolt
      (execute [tuple]  
        (let [obj (get tuple "object")
              new-obj (do-some-calculations obj)]
          (emit-bolt! collector new-obj))))))

关键是呼叫 load . 我不知道有没有更优雅的方法。

相关问题