多个apache flink windows验证

f87krz0w  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(401)

我刚刚开始使用apache flink进行流处理,问题是我接收到一个json流,如下所示:

{

  token_id: “tok_afgtryuo”,

  ip_address: “128.123.45.1“,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”

}

有人问我是否能遵守以下业务规则:
如果在过去10秒内此ip的令牌数>5,则拒绝
如果最后一分钟此ip的令牌数>15,则拒绝
如果最近一小时内此ip的令牌数>60,则拒绝
我上了2节课, main 当我创建一个示例来调用 Window 具有不同参数的函数,以避免重复代码:
主.java

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //This DataStream Would be  Converting the Json to a Token Object
        DataStream<Token> baseStream =
                env.addSource(new SocketTextStreamFunction("localhost",
                        9999,
                        "\n",
                        1))
                        .map(new MapTokens());

        // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
       DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
               5, "seconds").print();

        //2 -Decline if number of tokens > 15 for this IP in last minute
        DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                62, "minutes").print();

        //3- Decline if number of tokens > 60 for this IP in last hour
        DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                60, "Hours").print();

        env.execute("Job2");
    }

在另一个类中,我对规则进行所有逻辑运算,计算ip地址出现的次数,如果超过时间窗口中允许的数字,我将返回一条包含一些信息的消息:
规则制定者.java

public class RuleMaker {

    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                String tokenProp,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){

        return
               stream
                .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                    @Override
                    public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {

                         String tokenSelection = "";
                        switch (tokenProp)
                        {
                            case "ip":
                                tokenSelection = token.getIpAddress();
                                break;
                            case "device":
                                tokenSelection = token.getDeviceFingerprint();
                                break;
                            case "cardHash":
                                tokenSelection = token.getCardHash();
                                break;
                        }
                        collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                    }
                })
                .keyBy(0)
                .timeWindow(time)
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));
    }

    //Class to process the elements from the window
    private class MyProcessWindowFunction extends ProcessWindowFunction<
            Tuple3<String, Integer, String>,
            String,
            Tuple,
            TimeWindow
            > {

        private Integer _maxPetitions;
        private String  _ruleType;

        public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
            this._maxPetitions = maxPetitions;
            this._ruleType = ruleType;
        }

        @Override
        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {

            Integer counter = 0;
            for (Tuple3<String, Integer, String> element : iterable) {
                counter += element.f1++;
                if(counter > _maxPetitions){
                    out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                    counter = 0;
                }
            }
        }
    }
}

到目前为止,我认为这段代码是可行的,但我是apache flink的乞丐,如果你能告诉我我尝试使用这段代码的方式是否有问题,并为我指出正确的方向,我会非常感激。
谢谢。

rn0zuynd

rn0zuynd1#

一般的方法看起来非常好,尽管我认为表api足够强大,可以帮助您(更简洁)支持现成的json。
如果你想坚持使用datastreamapi,在 getStreamKeyCount ,切换 tokenProp 应该通过将密钥提取程序传递给 getStreamKeyCount 只有一个地方可以添加新规则。

public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                            KeySelector<Token, String> keyExtractor,
                                            Time time, 
                                            Integer maxPetitions, 
                                            String ruleType){

    return stream
         .map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
            .keyBy(0)
            .timeWindow(time)
            .process(new MyProcessWindowFunction(maxPetitions, ruleType));
}

然后调用变成

DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream, 
    Token::getIpAddress, Time.minutes(1), 62, "minutes");

相关问题