我正在使用 clj-kafka
,我正在努力 core.async
它在repl中的接口。
我收到了一些消息,但我的结构感觉不对:我要么无法停止接收消息,要么必须启动 go
再次执行例程以接收更多消息。
以下是我的尝试:
(defn consume [topic]
(let [consume-chan (chan)]
(with-resource [c (consumer config)]
shutdown
(go (doseq [m (messages c "test")]
(>! chan message) ;; should I check the return value?
)))
consume-chan)) ;; is it the right place to return a channel ?
(def consume-chan (consume "test"))
;;(close! consume-chan)
(go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already
(def cons-ch (go
(with-resource [c (consumer config)]
shutdown
(doseq [m (messages c "test")]
(>! consume-chan m))))) ;; should I check something here ?
;;(close! cons-ch)
(def go-ch
(go-loop []
(if-let [km (<! consume-chan)]
(do (println "Got a value in this loop:" km)
(recur))
(do (println "Stop recurring - channel closed")))))
;;(close! go-ch)
如何使用 core.async
接口?
1条答案
按热度按时间wbrvyc0a1#
我会这么做:
>!
以及<!
如果通道已关闭,则返回nil,因此请确保发生这种情况时循环退出—这样您就可以通过关闭通道从外部轻松结束循环。使用try/catch检查go块中的异常,并将任何异常作为返回值,这样它们就不会丢失。
检查读取值是否存在异常,以捕获通道内的任何内容。
go块返回一个通道,块内代码的返回值(如上面的异常)将放在通道上。检查这些通道是否存在异常,可能是为了重新触发。
您现在可以这样写入通道:
你读的是这样的:
现在您将有两个频道,所以使用类似
alts!
或者alts!!
检查循环是否退出。完成后关闭频道。