gson 使用Google云数据流在Apache beam中进行Json验证

rekjcdws  于 2022-11-06  发布在  Go
关注(0)|答案(1)|浏览(184)

我正在尝试使用Apache beam Java SDK编写一个过滤器转换,我需要过滤掉无效的Json消息。
如果我为每个元素验证创建一个新的Gson对象,实现工作正常。但是我想避免为每个元素创建Gson对象(吞吐量为1 K/秒),并验证json。
我在开始创建一个常量Gson对象,并在静态块中初始化它。这种方法不起作用。不确定为什么同一个对象不能用于解析多个元素,因为我们在处理过程中没有改变对象的状态?

// Gson object declared as constant
private static final Gson gsonObj=new Gson();

// Initialized GSon object during class loading before main method invocation
static {
    gsonObj = new Gson();
}

....

/*
enum to validate json messages.
 */
enum InputValidation implements SerializableFunction<String, Boolean> {
    VALID {
        @Override
        public Boolean apply(String input) {
            try {
                gsonObj.fromJson(input, Object.class);
                return true;
            } catch(com.google.gson.JsonSyntaxException ex) {
                return false;
            }
        }
    }
}
huwehgph

huwehgph1#

请使用TupleTag而不是'enum InputValidation implements'来筛选出记录。请使用以下代码来筛选出无法解析的json行。

Pipeline p = Pipeline.create(options);

TupleTag<String> successParse = new TupleTag<String>();
TupleTag<String> failParse = new TupleTag<String>();

private static final Gson gsonObj=new Gson();

PCollectionTuple = input.apply(ParDo.of(new DoFn<String, String>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            gsonObj.fromJson(c.element(), Object.class);
            c.output(successParse,c.element());
        } catch {
            c.output(failParse,c.element());
        }
    }
}).withOutputTags(successParse, TupleTagList.of(failParse)));

上面的一段代码在我的情况下工作,最佳解决方案过滤出的记录。
下面是官方文档示例。

相关问题