我打算实现一个定制的sink,在这里我创建了一个stubbed invoke函数,它只将接收到的数据记录到任务日志文件(目前),如下所示。
package io.name.package;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class AlertSink extends RichSinkFunction<Alert> {
Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(Alert alert, Context context) throws Exception {
LOG.info("Invoking sink for alert: ", alert.toString());
}
}
我已经配置了如下所示的数据流。
DataStream<Alert> result = filteredMetrics
.keyBy(
new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
return Tuple3.of(in.f0, in.f1, in.f2);
}
})
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ThresholdEvaluator());
result.addSink(new AlertSink());
当我检查日志时,我看到接收器被调用,但显示空字符串。thresholdevaluator发出警报,但显示非空字符串。
2020-08-05 19:38:16,638 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}
我是不是漏了什么?
我还尝试在thresholdevaluator和addsink操作符之间添加map函数。mapfunction似乎可以很好地接收警报对象,但不能接收alertsink。
result.map(new MapFunction<Alert, Alert>() {
@Override
public Alert map(Alert value) {
LOG.info(value.toString());
return value;
}
}).addSink(new AlertSink());
(用附加日志更新)
1条答案
按热度按时间jq6vz3qz1#
原因是日志输出没有用于插值的占位符-正确的语法是
这和
在后一种情况下,不管日志级别如何,每次都会发生字符串串联,在第一种情况下,如果只有日志级别至少为
INFO
.