Kafka有一个接口, Processor
,它的实现是有状态的。《开发人员指南》中给出的示例实现是:
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
// call this processor's punctuate() method every 1000 time units.
this.context.schedule(1000);
// retrieve the key-value store named "Counts"
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Long oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1L);
} else {
kvStore.put(word, oldValue + 1L);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Long> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
}
@Override
public void close() {
// close the key-value store
kvStore.close();
}
}
这个 init
方法初始化 WordCountProcessor
的内部状态,例如检索键值存储。其他方法,比如 process
以及 close
,使用此状态。
我不清楚该怎么做 reify
clojure中的这样一个接口。我们该如何传递 init
至 process
, close
等等。?
使用闭包?
我的一个想法是使用闭包:
(let [ctx (atom nil)]
(reify Processor
(close [this]
;; Do something w/ ctx
)
(init [this context]
(reset! ctx context))
(process [this k v]
;; Do something w/ ctx
)
(punctuate [this timestamp]
;; Do something w/ ctx
)))
令人烦恼的是,我们得从 ProcessorContext
对象,因此键值存储代码将在需要键值存储的所有方法中重复。
我不认为这是一个(一般)方法,虽然在个案的基础上,我们可以取代 ctx
具有方法所需的更特定状态的原子。
有更好的办法吗?
1条答案
按热度按时间jutyujz01#
关闭一个原子将是主要的方法。你原来的类有两个场,所以你可以关闭两个原子来获得相同的效果
如果这仍然太乏味,那么你可以添加一些方便的函数,也可以关闭原子
另一种选择是使用gen类,但是认为使用reify会更好。