处理器api:对存储在keyvaluestore中的事件的批量post请求

yqkkidmi  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(322)

正如这里所建议的https://stackoverflow.com/a/60942154/1690657 我已经使用处理器api将传入的请求存储在keyvaluestore中。每100个事件我想发送一个 POST 请求。所以我做了这个:

public class BulkProcessor implements Processor<byte[], UserEvent> {

    private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;

    private BulkAPIClient bulkClient;

    private String storeName;

    private ProcessorContext context;

    private int count;

    @Autowired
    public BulkProcessor(String storeName, BulkClient bulkClient) {
        this.storeName = storeName;
        this.bulkClient = bulkClient;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
        count = 0;
        // to check every 15 minutes if there are any remainders in the store that are not sent yet
        this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if (count > 0) {
                sendEntriesFromStore();
            }
        });
    }

    @Override
    public void process(byte[] key, UserEvent value) {
        int userGroupId = Integer.valueOf(value.getUserGroupId());
        ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
        if (userEventArrayList == null) {
            userEventArrayList = new ArrayList<>();
        }
        userEventArrayList.add(value);
        keyValueStore.put(userGroupId, userEventArrayList);
        if (count == 100) {
            sendEntriesFromStore();
        }
    }

    private void sendEntriesFromStore() {
        KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
        while (iterator.hasNext()) {
            KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
            keyValueStore.delete(entry.key);
            BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
            if (bulkRequest.getLocation() != null) {
                URI url = bulkClient.buildURIPath(bulkRequest);
                bulkClient.postRequestBulkApi(url, bulkRequest);
            }
        }
        iterator.close();
        count = 0;
    }

    @Override
    public void close() {
    }
}

我不确定是否添加 count 是线程安全的,或者这是实现它的正确方法。目前我也只从一个分区读取。所以我的问题是:
这个线安全吗?
这是在处理器api中发送批量post请求的好方法吗?

f45qwnt8

f45qwnt81#

这个线安全吗?
是的,它是线程安全的。每个线程使用 ProcessorSupplier 创建自己的 Processor 示例/对象。
这是在处理器api中发送批量post请求的好方法吗?
总的来说我觉得不错。

相关问题