我已经构建了一个kafka流应用程序,该应用程序将通过客户信息等丰富实时事件。生产者能够写入200k至500k以上的信息每秒的基础上负载这是好的。对于给定的流,每天的预期平均记录数约为200-220亿条记录。使用连接的kafka流丰富变得太慢,每秒只能实现大约40-50k个事件。因为我使用的是selectkey,所以它会导致重新分区,而且我还需要进行其他几个连接。生产者以avro格式写入数据。当我在没有连接的情况下执行转换时,我能够与生产者匹配。下面是我正在使用的连接的代码片段:
消息事件包括以下字段:
customerid、deviceid、httpresponse、datetime、响应率、sessionid
//get device information
final GlobalKTable<String,GenericRecord> device_info = builder.globalTable("device"); // contain about 250k records
//get customer information
final GlobalKTable<String,GenericRecord> cust= builder.table("CustomerInfo"); // contain about 40 million customer records
// customer events
KStream<String, GenericRecord> eventstream= builder.stream("CustomerEvents"); //about 250k events per second
eventstream.selectKey((key, value)-> value.get("customerid")))
.leftJoin(cust, (dataKey,dataValue) -> dataKey, (data, customerinfo) -> {
if(customerinfo!=null){
data.put("customerName", customerinfo.get("customername"));
//some other logic here
}else{
data.put("customerName", customerinfo.get("unknown"));
// some logic here
}
return data;
})
.selectKey((key,value)-> value.get("deviceid").substring(0,8))
.leftJoin(device_info, (dataKey,dataValue) -> dataKey, (data, deviceinfo) -> {
if(deviceinfo!=null) {
data.put("device_manufacturer", deviceinfo.get("manufacturer"));
} else {
data.put("device_manufacturer", "unknown");
}
return data;
})
.to("outputTopic", Produced.with(stringSerde, new JsonSerde<>(DataTransform.class)));
我注意到的几件事:
联接的引入使得跨分区的数据分布发生了倾斜,但不确定如何解决这一问题,因为键决定了消息的放置位置,例如,假设大多数数据都包含类似的设备ID
selectkey会导致重新分区,这意味着我的数据必须在代理和运行在不同服务器上的kafka流应用程序之间进行多次往返
暂无答案!
目前还没有任何答案,快来回答吧!