在java-flink作业中使用python用户定义的函数,或者用python用户定义的函数传递flink与java的转换结果,以应用一些机器学习的东西:
我知道从pyflink你可以这样做:
table_env.register_java_function("hash_code", "my.java.function.HashCode")
但是我需要这样做,但是从java添加python函数,或者如何将java转换的结果直接传递给python udf flink作业?
我希望这些问题不会太疯狂,但我需要知道是否存在以某种方式将flink数据流api与以java为主语言的python表api进行通信的方法?这意味着从java我需要做:source->transformations->sink,但是这些转换中的一些可以触发python函数,或者python函数将等待一些java转换完成以处理流结果。
我希望有人能理解我的意图。
谨致问候!
2条答案
按热度按时间sulc1iza1#
flink 1.10中添加了对python UDF(用户定义函数)的支持——请参阅pyflink:在flink的表api中介绍python对UDF的支持。例如,可以执行以下操作:
有关更多示例等,请参阅上面链接的博客文章或稳定文档。
在flink 1.11(预计下周发布)中,添加了对矢量化python UDF的支持,带来了与pandas、numpy等的互操作性。此版本还包括在sql ddl和sql客户端中对python UDF的支持。有关文档,请参阅主文档。
听起来您想从java调用python。有状态函数api更全面地支持这一点——请参阅远程函数。但是要从JavaDataStreamAPI调用python,我认为您唯一的选择是使用Flink1.11中添加的SQLDDL支持。见flip-106和文件。
flip-106有这样一个例子:
您应该能够将其转换为使用datastreamapi。
bkhjykvo2#
此集成的示例:假设flink 1.11是当前版本,则pom.xml中需要此依赖关系。
创建环境:
从要执行的转换开始,例如:
最后呢
python函数的一个例子
FunctionName
进入function.py
脚本: