检测kafka streams 2.0中的废弃进程

oo7oh9g9  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(195)

我有这样一个例子:用户收集订单作为订单行。我用kafka主题实现了这一点,kafka主题包含有顺序更改的事件,它们被合并,存储在本地键值存储中,并作为第二个主题以顺序版本进行广播。
我需要以某种方式对废弃的订单做出React-这些订单已经启动,但至少在过去x小时内没有变化。
简单的解决方案是每y分钟扫描一次本地存储,然后将订单状态更改为“已放弃”的事件后处理。似乎我不能访问存储不是从处理器。。。但它也不是很优雅的编码。欢迎提出任何建议。
--编辑
我不能只将puctuation添加到merge/validation transformer,因为它的输出不同,应该路由到其他地方,比如在这个图像上(单个kafka streams应用程序):

所以“废弃订单处理器/转换器”对于它的输入来说是不可操作的(这里唯一的触发器是时间)。另一件事是,在这种情况下(如在图像上),我的转换器在初始化时得到forwardingdisabledprocessorcontext,所以我不能以标点符号发出任何消息。我可以传递kafkatemplatebean并生成新消息,但整个处理器/转换器只是空壳,只能访问本地存储。。。
这是我使用的代码片段:

public class AbandonedOrdersTransformer implements ValueTransformer<OrderEvent, OrderEvent> {

    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;

        stateStore = (KeyValueStore)processorContext.getStateStore(KafkaConfig.OPENED_ORDERS_STORE);

        //main scheduler
        this.context.schedule(TimeUnit.MINUTES.toMillis(5), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {

            KeyValueIterator<String, Order> iter = this.stateStore.all();
            while (iter.hasNext()) {
                KeyValue<String, Order> entry = iter.next();
                if(OrderStatuses.NEW.equals(entry.value.getStatus()) &&
                    (timestamp - entry.value.getLastChanged().getTime()) > TimeUnit.HOURS.toMillis(4)) {

                    //SEND ABANDON EVENT "event"

                    context.forward(entry.key, event);
                }
            }
            iter.close();
            context.commit();
        });
    }

    @Override
    public OrderEvent transform(OrderEvent orderEvent) {
        //do nothing
        return null;
    }

    @Override
    public void close() {
        //do nothing
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题