在javaflink作业中使用python用户定义函数

aamkag61  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(710)

在java-flink作业中使用python用户定义的函数,或者用python用户定义的函数传递flink与java的转换结果,以应用一些机器学习的东西:
我知道从pyflink你可以这样做:

table_env.register_java_function("hash_code", "my.java.function.HashCode")

但是我需要这样做,但是从java添加python函数,或者如何将java转换的结果直接传递给python udf flink作业?
我希望这些问题不会太疯狂,但我需要知道是否存在以某种方式将flink数据流api与以java为主语言的python表api进行通信的方法?这意味着从java我需要做:source->transformations->sink,但是这些转换中的一些可以触发python函数,或者python函数将等待一些java转换完成以处理流结果。
我希望有人能理解我的意图。
谨致问候!

sulc1iza

sulc1iza1#

flink 1.10中添加了对python UDF(用户定义函数)的支持——请参阅pyflink:在flink的表api中介绍python对UDF的支持。例如,可以执行以下操作:

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
my_table.select("add(a, b)")

有关更多示例等,请参阅上面链接的博客文章或稳定文档。
在flink 1.11(预计下周发布)中,添加了对矢量化python UDF的支持,带来了与pandas、numpy等的互操作性。此版本还包括在sql ddl和sql客户端中对python UDF的支持。有关文档,请参阅主文档。
听起来您想从java调用python。有状态函数api更全面地支持这一点——请参阅远程函数。但是要从JavaDataStreamAPI调用python,我认为您唯一的选择是使用Flink1.11中添加的SQLDDL支持。见flip-106和文件。
flip-106有这样一个例子:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

您应该能够将其转换为使用datastreamapi。

bkhjykvo

bkhjykvo2#

此集成的示例:假设flink 1.11是当前版本,则pom.xml中需要此依赖关系。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.2</version>
  <scope>provided</scope>
</dependency>

创建环境:

private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private StreamTableEnvironment tableEnv = getTableAPIEnv(env);

/*this SingleOutputStreamOperator will contains the result of the consumption from the  defined source*/
private SingleOutputStreamOperator<Event> stream; 

public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
        tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
        tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
        return tableEnv;
    }

从要执行的转换开始,例如:

SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));

/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
  /*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());

/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
 private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
        return stream.process(new UserProfileProcessFunction());
    }

/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
        Table events = tableEnv.fromDataStream(rowInput,
                $("id"), $("noOfHits"), $("timestamp"))
                .select("FunctionName(id, noOfHits, timestamp)");
        return tableEnv.toAppendStream(events, Row.class);
    }

最后呢

env.execute("Job Name");

python函数的一个例子 FunctionName 进入 function.py 脚本:

@udf(
    input_types=[
        DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
    ],
    result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
    # function code here
    return f"{id}|{noOfHits}|{timestamp}"

相关问题