在flink中如何跟踪扩展UserDefinedFunction的类中的自定义度量

voj3qocg  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(149)

我有一个简单的函数,处理一些字符串,这是一个json

public class ArraySizeUdf extends ScalarFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ArraySizeUdf.class);
    private final static ObjectMapper mapper = new ObjectMapper();

    public int eval(String stringJsonArray) {
        try {

            if (stringJsonArray == null) {
                return 0;
            }

            JsonNode actualObj = mapper.readTree(stringJsonArray);
            ArrayNode aa = (ArrayNode) actualObj;
            return aa.size();
        } catch (Exception e) {
            LOG.error("Error deserializing json to find size : {}", stringJsonArray);
            return -1;
        }
    }
}

字符串
我想跟踪出现的异常数量。因为我没有getRuntimeContext()的权限,我可以使用什么来跟踪我在这里定义的任何自定义指标?
我使用streamTableEnvironment.createTemporaryFunction("array_size", ArraySizeUdf.class);注册了udf
并将其用作select array_size('["some", "other"]') from table;

zc0qhyus

zc0qhyus1#

ScalarFunction可以定义一个open(FunctionContext context)方法,传入的上下文有一个getMetricGroup方法。

相关问题