我正试图在python中使用apache-flink有状态函数构建一个项目,但我似乎无法让它工作。我缩小了问题的范围,认为当我通过protobuf模式向有状态函数发送请求时,序列化程序似乎无法将消息序列化到我期望的类中。我想做的是:
import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event
functions = StatefulFunctions()
@functions.bind("namespace/funcname")
def funcname(context, session: Event):
print("hello world")
handler = RequestReplyHandler(functions)
if __name__ == '__main__':
inputFile = open("my_file.json", "r")
for line in inputFile:
data = json.loads(line).get('properties')
if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
request = Event()
request.prop1 = data["prop1"]
request.prop2 = data["prop2"]
request = request.SerializeToString()
handler(request)
这是我的协议模式:
syntax = "proto3";
package mypackage;
message Event {
string prop1 = 1;
string prop2 = 2;
}
我做错什么了?
1条答案
按热度按时间woobm2wo1#
这是因为requestreply处理程序不接受直接protobuf消息。flink运行时发送一个名为
ToFunction
并接收类型为的响应FromFunction
. 此负载包含调用方消息以及持久值和其他元信息。如果不能直接调用函数,比如在测试中,我建议您这样做,而不要使用处理程序。