正如这里所建议的https://stackoverflow.com/a/60942154/1690657 我已经使用处理器api将传入的请求存储在keyvaluestore中。每100个事件我想发送一个 POST
请求。所以我做了这个:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
keyValueStore.delete(entry.key);
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
bulkClient.postRequestBulkApi(url, bulkRequest);
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
我不确定是否添加 count
是线程安全的,或者这是实现它的正确方法。目前我也只从一个分区读取。所以我的问题是:
这个线安全吗?
这是在处理器api中发送批量post请求的好方法吗?
1条答案
按热度按时间f45qwnt81#
这个线安全吗?
是的,它是线程安全的。每个线程使用
ProcessorSupplier
创建自己的Processor
示例/对象。这是在处理器api中发送批量post请求的好方法吗?
总的来说我觉得不错。