我刚刚开始使用apache flink进行流处理,问题是我接收到一个json流,如下所示:
{
token_id: “tok_afgtryuo”,
ip_address: “128.123.45.1“,
device_fingerprint: “abcghift”,
card_hash: “hgtyuigash”,
“bin_number”: “424242”,
“last4”: “4242”,
“name”: “Seu Jorge”
}
有人问我是否能遵守以下业务规则:
如果在过去10秒内此ip的令牌数>5,则拒绝
如果最后一分钟此ip的令牌数>15,则拒绝
如果最近一小时内此ip的令牌数>60,则拒绝
我上了2节课, main
当我创建一个示例来调用 Window
具有不同参数的函数,以避免重复代码:
主.java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//This DataStream Would be Converting the Json to a Token Object
DataStream<Token> baseStream =
env.addSource(new SocketTextStreamFunction("localhost",
9999,
"\n",
1))
.map(new MapTokens());
// 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
DataStreamSink<String> response1 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
5, "seconds").print();
//2 -Decline if number of tokens > 15 for this IP in last minute
DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
62, "minutes").print();
//3- Decline if number of tokens > 60 for this IP in last hour
DataStreamSink<String> response3 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
60, "Hours").print();
env.execute("Job2");
}
在另一个类中,我对规则进行所有逻辑运算,计算ip地址出现的次数,如果超过时间窗口中允许的数字,我将返回一条包含一些信息的消息:
规则制定者.java
public class RuleMaker {
public DataStream<String> getStreamKeyCount(DataStream<Token> stream,
String tokenProp,
Time time,
Integer maxPetitions,
String ruleType){
return
stream
.flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
@Override
public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
String tokenSelection = "";
switch (tokenProp)
{
case "ip":
tokenSelection = token.getIpAddress();
break;
case "device":
tokenSelection = token.getDeviceFingerprint();
break;
case "cardHash":
tokenSelection = token.getCardHash();
break;
}
collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
}
})
.keyBy(0)
.timeWindow(time)
.process(new MyProcessWindowFunction(maxPetitions, ruleType));
}
//Class to process the elements from the window
private class MyProcessWindowFunction extends ProcessWindowFunction<
Tuple3<String, Integer, String>,
String,
Tuple,
TimeWindow
> {
private Integer _maxPetitions;
private String _ruleType;
public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
this._maxPetitions = maxPetitions;
this._ruleType = ruleType;
}
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {
Integer counter = 0;
for (Tuple3<String, Integer, String> element : iterable) {
counter += element.f1++;
if(counter > _maxPetitions){
out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " + _ruleType + " token: " + element.f0 );
counter = 0;
}
}
}
}
}
到目前为止,我认为这段代码是可行的,但我是apache flink的乞丐,如果你能告诉我我尝试使用这段代码的方式是否有问题,并为我指出正确的方向,我会非常感激。
谢谢。
1条答案
按热度按时间rn0zuynd1#
一般的方法看起来非常好,尽管我认为表api足够强大,可以帮助您(更简洁)支持现成的json。
如果你想坚持使用datastreamapi,在
getStreamKeyCount
,切换tokenProp
应该通过将密钥提取程序传递给getStreamKeyCount
只有一个地方可以添加新规则。然后调用变成