Kafka流加入表演

kgsdhlau  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(183)

我已经构建了一个kafka流应用程序,该应用程序将通过客户信息等丰富实时事件。生产者能够写入200k至500k以上的信息每秒的基础上负载这是好的。对于给定的流,每天的预期平均记录数约为200-220亿条记录。使用连接的kafka流丰富变得太慢,每秒只能实现大约40-50k个事件。因为我使用的是selectkey,所以它会导致重新分区,而且我还需要进行其他几个连接。生产者以avro格式写入数据。当我在没有连接的情况下执行转换时,我能够与生产者匹配。下面是我正在使用的连接的代码片段:
消息事件包括以下字段:
customerid、deviceid、httpresponse、datetime、响应率、sessionid

//get device information
        final GlobalKTable<String,GenericRecord> device_info = builder.globalTable("device"); // contain about 250k records

//get customer information
         final GlobalKTable<String,GenericRecord> cust= builder.table("CustomerInfo"); // contain about 40 million customer records

// customer events
        KStream<String, GenericRecord>  eventstream= builder.stream("CustomerEvents"); //about 250k events per second

     eventstream.selectKey((key, value)-> value.get("customerid")))
                .leftJoin(cust, (dataKey,dataValue) -> dataKey, (data, customerinfo) -> {

        if(customerinfo!=null){

        data.put("customerName", customerinfo.get("customername"));
        //some other logic here

        }else{
        data.put("customerName", customerinfo.get("unknown"));
        // some logic here
            }
        return data;
        })
        .selectKey((key,value)-> value.get("deviceid").substring(0,8))
                .leftJoin(device_info, (dataKey,dataValue) -> dataKey, (data, deviceinfo) -> {

                    if(deviceinfo!=null) {

                        data.put("device_manufacturer", deviceinfo.get("manufacturer"));

                    } else {
                        data.put("device_manufacturer", "unknown");
                    }
                    return  data;
                })
        .to("outputTopic", Produced.with(stringSerde, new JsonSerde<>(DataTransform.class)));

我注意到的几件事:
联接的引入使得跨分区的数据分布发生了倾斜,但不确定如何解决这一问题,因为键决定了消息的放置位置,例如,假设大多数数据都包含类似的设备ID
selectkey会导致重新分区,这意味着我的数据必须在代理和运行在不同服务器上的kafka流应用程序之间进行多次往返

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题