我在画Map Map[String,String]
scala自定义项的对象输出( scala.collection.immutable.map
)到表api中的某个有效数据类型,即通过java类型( java.util.Map
)这里推荐:flink表api&sql和Map类型(scala)。然而我得到下面的错误。
你知道正确的方法吗?如果是,是否有方法将转换泛化为类型为的(嵌套)scala对象 Map[String,Any]
?
代码
scala自定义项
class dummyMap() extends ScalarFunction {
def eval() = {
val whatevermap = Map("key1" -> "val1", "key2" -> "val2")
whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
}
}
下沉
my_sink_ddl = f"""
create table mySink (
output_of_dummyMap_udf MAP<STRING,STRING>
) with (
...
)
"""
错误
Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType<java.util.Map>]
TableSink schema: [output_of_my_scala_udf: Map<String, String>]
谢谢!
1条答案
按热度按时间dhxwm5r41#
魏忠的原始答案。我只是个记者。谢谢小薇!
此时(flink 1.11),有两种方法在起作用:
当前:自定义项定义中的datatypehint+自定义项注册的sql
过时:重写udf定义中的getresulttype+t琰env.register琰java琰函数以进行udf注册
代码
scala自定义项
python代码