此代码将pandas转换为flink表,然后再将转换回pandas。当我使用
filter filter
比 select
但是当我添加 group_by
以及 order_by
.
import pandas as pd
import numpy as np
f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
df = pd.read_csv("dataBase/New/candidate.csv")
col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
'candidate_middle_name', 'candidate_last_name', 'candidate_email',
'created_date', 'last_modified_date', 'last_modified_by']
table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
.filter("candidate_first_name === 'Libby'")\
.group_by("candidate_id, candidate_source_id")\
.select("candidate_id, candidate_source_id")\
.order_by("candidate_id").to_pandas()
我的错误是
Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
1条答案
按热度按时间camsedfj1#
如果您查看文档,就会发现使用表api,order by只支持批处理查询。如果切换到sql,则可以使用按升序时间属性排序的流式查询。
在无界流式查询中,按任何其他方式排序都是不可能的,因为排序需要对输入有充分的了解。