我正在使用合流restapi代理调用kafka。我正在读取一个csv文件,用那里的所有记录(大约400万条记录)创建一个对象,并向rest代理发送一个请求。我一直在找工作 OutOfMemory
例外。
确切的异常消息是: Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
我只是rest代理服务器的一个示例,作为docker容器托管。环境变量设置为: JAVA_OPTIONS=-Xmx1g
其他配置: CPU - 1 Memory - 1024
它在崩溃前处理了大约10万次。我尝试过将它扩展到4个示例,将cpu增加到3个,内存也增加到2046MB。然后它处理大约500000条记录。
在读取了csv之后,我正在批量调用kafka端点,记录为5k。在节点中写入。这是节点代码
fs.createReadStream(inputFile)
.pipe(parser({skip_lines_with_error: true}))
.on('data', (records) => {
country.push({ 'value' : {
country: records[0],
capital: records[1]
}
});
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
}).catch((reason) => {
console.log(reason);
});
country = [];
}
})
.on('end', () => {
console.log('All done!');
});
function callKafkaProxy(records) {
const urlAndRequestOptions = {
url: 'http://kafka-rest-proxy.com/topics/test-topic',
headers: {
'content-type' : 'application/vnd.kafka.json.v2+json',
'Accept' : 'application/vnd.kafka.v2+json'
}
};
let recordsObject = {records: records};
//request here is a wrapper on the http package.
return request.post(urlAndRequestOptions, recordsObject);
我觉得我缺少了一些配置,这些配置应该可以帮助解决这个问题,而不会增加>1的示例数。
任何帮助都将不胜感激。
2条答案
按热度按时间ugmeyewa1#
1它不处理背压。创建可写流,它将处理批处理过程。那就用管子吧。
然后分析这些线:
您的callkafkaproxy是异步的,这就是为什么您的country数组总是被填充的原因,不管callkafkaproxy函数的结果如何。国家阵列不断填充并不断发出请求。您可以在batch++之后通过控制台日志记录来确定。你会看到你发起了很多请求,Kafka的React会比你提出请求慢得多。
解决方案:
创建可写流。
将数据从解析器传输到它。input.pipe(解析器).pipe(yourjustcreatedkafkawritablestream)
当您准备好接收其他记录时,让您的可写流推动countries进行数组和回调。当你到达你的边缘(如果countries.length>5000),然后向Kafka提出请求,等待回复,然后才给回拨。这样,您的流将是自适应的。您应该阅读更多关于节点流及其功率的信息。但请记住,强大的功能带来了巨大的责任,在这种情况下,您必须仔细设计代码以避免此类内存泄漏。
i5desfxk2#
在zilvinas的回答的帮助下,我明白了如何利用流来批量发送数据。这里有一个解决方案: