flink的嵌套输出

daolsyd0  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(683)

我正在使用flinksql处理一个kafka流,其中每个消息都从kafka中提取,使用flinksql进行处理并推回到kafka中。我想要一个嵌套输出,其中输入是平坦的,输出是嵌套的。例如,我的输入是

{'StudentName':'ABC','StudentAge':33}

并希望输出为

{'Student':{'Name':'ABC','Age':33}}

我试图在这里搜索和一些类似的链接,但找不到这样的。使用ApacheFlinkSQLAPI是否可以这样做?如果需要,可以使用用户定义的函数,但希望避免这样做。

yrefmtwq

yrefmtwq1#

你可以这样做:

SELECT 
  MAP ['Student', MAP ['Name', StudentName, 'Age', StudentAge]] 
FROM 
  students;

我在这里找到了map函数,但是我必须在sql客户机中进行实验来找出语法。

4uqofj5v

4uqofj5v2#

我可以通过从flink udf返回一张Map来达到同样的效果。udf中的eval()函数将返回一个Map,而flinksql查询将使用student作为别名调用udf:
udf应该是这样的

public class getStudent extends ScalarFunction {
        public Map<String, String> eval(String name, Integer age) {
            Map<String, String> student = new HashMap<>();
            student.put("Name", name);
            student.put("Age", age.toString());
            return student;
        }
    }

flinksql查询保持如下:

Select getStudent(StudentName, StudentAge) as `Student` from MyKafkaTopic

在尝试从flinksql中获取列表时,也可以对列表执行同样的操作

相关问题