我已经实现了spring批处理远程分区,现在我要把100亿个id分区,分为多个分区,这些id会从elastic获取,然后推到分区中,然后再推到kafka中
@重写公共Map<string,executioncontext>分区(int gridsize){
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
AtomicInteger partitionNumber = new AtomicInteger(1);
try {
for(int i=0;i<n;i++){
List<Integer> ids = //fetch id from elastic
map.put("partition" + partitionNumber.getAndIncrement(), context);
}
System.out.println("Partitions Created");
} catch (IOException e) {
e.printStackTrace();
}
return map;
}
我不能一次获取和推送Map中的所有ID,否则,我将耗尽内存。我希望ID被推送到队列中,然后获取下一个ID。
这可以通过Spring批来完成吗?
1条答案
按热度按时间eeq64g8w1#
如果您想使用分区,您必须找到一种方法,用给定的键对输入数据集进行分区。没有分区键,就不能真正使用分区(有或没有spring批处理)。
如果您的id是由一个可以划分为多个分区的序列定义的,那么您不必获取100亿个id,对它们进行分区,并将每个分区(即每个分区的所有id)放在worker的执行上下文中。您可以做的是找到max id,创建id的范围并将它们分配给不同的worker。例如:
分区1:0-10000
分区2:10001-20000
等
如果您的id不是由序列定义的,并且不能按范围进行分区,那么您需要找到另一个键(或复合键),该键允许您根据另一个条件对数据进行分区。否则,(远程)分区不是您的选择。