已调用自定义flink接收器,但没有数据

bnl4lu3b  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(381)

我打算实现一个定制的sink,在这里我创建了一个stubbed invoke函数,它只将接收到的数据记录到任务日志文件(目前),如下所示。

package io.name.package;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class AlertSink extends RichSinkFunction<Alert> {
    Logger LOG = LoggerFactory.getLogger(AlertSink.class);

    @Override
    public void invoke(Alert alert, Context context) throws Exception {
        LOG.info("Invoking sink for alert: ", alert.toString());
    }
}

我已经配置了如下所示的数据流。

DataStream<Alert> result = filteredMetrics
            .keyBy(
                new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
                    @Override
                    public  Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
                        return Tuple3.of(in.f0, in.f1, in.f2);
                    }
            })
            .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new ThresholdEvaluator());

        result.addSink(new AlertSink());

当我检查日志时,我看到接收器被调用,但显示空字符串。thresholdevaluator发出警报,但显示非空字符串。

2020-08-05 19:38:16,638 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}

我是不是漏了什么?
我还尝试在thresholdevaluator和addsink操作符之间添加map函数。mapfunction似乎可以很好地接收警报对象,但不能接收alertsink。

result.map(new MapFunction<Alert, Alert>() {
            @Override
            public Alert map(Alert value) {
                LOG.info(value.toString());
                return value;
            }
        }).addSink(new AlertSink());

(用附加日志更新)

jq6vz3qz

jq6vz3qz1#

原因是日志输出没有用于插值的占位符-正确的语法是

LOG.info("Invoking sink for alert: {}", alert.toString());

这和

LOG.info("Invoking sink for alert: " + alert.toString());

在后一种情况下,不管日志级别如何,每次都会发生字符串串联,在第一种情况下,如果只有日志级别至少为 INFO .

相关问题