我想写一个flink程序来处理动觉流。kinesis流来自aws dynamodb流,表示dynamodb表中的插入。
流中的每条记录都可以包含多条插入记录。插入记录的数目可以是可变的(可以从1到10)
我想在1分钟的时间间隔内对所有流中的所有插入记录进行分组,并对impression count(impressioncount)字段求和
[
{
"country":"NL",
"userOS":"mac",
"createdOn":"2017-08-02 16:22:17.135600",
"trafficType":"D",
"affiliateId":"87",
"placement":"4",
"offerId":"999",
"advertiserId":"139",
"impressionCount":"1",
"uniqueOfferCount":"0"
},
{
"country":"NL",
"userOS":"mac",
"createdOn":"2017-08-02 16:22:17.135600",
"trafficType":"D",
"affiliateId":"85",
"placement":"4",
"offerId":"688",
"advertiserId":"139",
"impressionCount":"1",
"uniqueOfferCount":"0"
}
]
我的代码:
DataStream<List> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
"Impressions-Stream", new RawImpressionLogSchema(), consumerConfig));
/**CLASS: RawImpressionLogSchema**/
public class RawImpressionLogSchema implements DeserializationSchema<List> {
@Override
public List<RawImpressionLogRecord> deserialize(byte[] bytes) {
return RawImpressionLogRecord.parseImpressionLog(bytes);
}
@Override
public boolean isEndOfStream(List event) {
return false;
}
@Override
public TypeInformation<List> getProducedType() {
return TypeExtractor.getForClass(List.class);
}
}
/**parse Method**/
public static List<RawImpressionLogRecord> parseImpressionLog(
byte[] impressionLogBytes) {
JsonReader jsonReader = new JsonReader(new InputStreamReader(
new ByteArrayInputStream(impressionLogBytes)));
JsonElement jsonElement = Streams.parse(jsonReader);
if (jsonElement == null) {
throw new IllegalArgumentException(
"Event does not define a eventName field: "
+ new String(impressionLogBytes));
} else {
Type listType = new TypeToken<ArrayList<RawImpressionLogRecord>>(){}.getType();
return gson.fromJson(jsonElement, listType);
}
}
我能够解析输入并创建动画片流。想知道这是正确的方法吗?以及如何实现聚合。
还有,一旦我有了数据流,我如何在列表流上应用map/filter/groupby函数。
我是新来Flink和任何帮助将不胜感激。
更新
试图用下面的代码来解决上面的用例。但不知何故reduce函数没有被调用。你知道下面的代码有什么问题吗?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<List<ImpressionLogRecord>> rawRecords = env.addSource(new ImpressionLogDataSourceFunction("C:\\LogFiles\\input.txt"));
DataStream<ImpressionLogRecord> impressionLogDataStream = rawRecords
.flatMap(new Splitter())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ImpressionLogRecord>(Time.seconds(5)) {
@Override
public long extractTimestamp(
ImpressionLogRecord element) {
return element.getCreatedOn().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
}
}
);
//impressionLogDataStream.print();
KeyedStream<ImpressionLogRecord, String> keyedImpressionLogDataStream = impressionLogDataStream
.keyBy(impressionLogRecordForKey -> {
StringBuffer groupByKey = new StringBuffer();
groupByKey.append(impressionLogRecordForKey.getCreatedOn().toString().substring(0, 16));
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getOfferId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getAdvertiserId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getAffiliateId());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getCountry());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getPlacement());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getTrafficType());
groupByKey.append("_");
groupByKey.append(impressionLogRecordForKey.getUserOS());
System.out.println("Call to Group By Function===================" + groupByKey);
return groupByKey.toString();
});
//keyedImpressionLogDataStream.print();
DataStream<ImpressionLogRecord> aggImpressionRecord = keyedImpressionLogDataStream
.timeWindow(Time.minutes(5))
.reduce((prevLogRecord, currentLogRecord) -> {
System.out.println("Calling Reduce Function-------------------------");
ImpressionLogRecord aggregatedImpressionLog = new ImpressionLogRecord();
aggregatedImpressionLog.setOfferId(prevLogRecord.getOfferId());
aggregatedImpressionLog.setCreatedOn(prevLogRecord.getCreatedOn().truncatedTo(ChronoUnit.MINUTES));
aggregatedImpressionLog.setAdvertiserId(prevLogRecord.getAdvertiserId());
aggregatedImpressionLog.setAffiliateId(prevLogRecord.getAffiliateId());
aggregatedImpressionLog.setCountry(prevLogRecord.getCountry());
aggregatedImpressionLog.setPlacement(prevLogRecord.getPlacement());
aggregatedImpressionLog.setTrafficType(prevLogRecord.getTrafficType());
aggregatedImpressionLog.setUserOS(prevLogRecord.getUserOS());
aggregatedImpressionLog.setImpressionCount(prevLogRecord.getImpressionCount() + currentLogRecord.getImpressionCount());
aggregatedImpressionLog.setUniqueOfferCount(prevLogRecord.getUniqueOfferCount() + currentLogRecord.getUniqueOfferCount());
return aggregatedImpressionLog;
});
aggImpressionRecord.print();
1条答案
按热度按时间lkaoscv71#
工作代码