我正在尝试使用flink和python批处理api测试wordcount经典示例。我的问题是,将数据源从env.from_elements()修改为env.read_text()后(对于更大的测试用例),会发生错误。下面的代码描述了我的实现。
[...]
if __name__ == "__main__":
env = get_environment()
input_file = 'file:///workfile.txt/'
if len(sys.argv) != 1 and len(sys.argv) != 3:
sys.exit("Usage: ./bin/pyflink.sh WordCount[ - <text path> <result path>]")
if len(sys.argv) == 3:
data = env.read_text(sys.argv[1])
else:
#data = env.from_elements("hello","world","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello")
data = env.read_text(input_file)
result = data \
.flat_map(Tokenizer()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
if len(sys.argv) == 3:
result.write_csv(sys.argv[2])
else:
result.output()
[...]
执行上述代码时,抛出一个文件权限错误。更具体地说,以下消息
原因:org.apache.flink.runtime.jobexception:创建输入拆分导致错误:file:/workfile.txt不存在,或者运行flink的用户(“user”)没有足够的权限访问它。
ps:搜索解决方案,但找不到。如果这个问题已经解决了,我会很感激重新定向。
1条答案
按热度按时间qyyhg6bp1#
我假设“workfile.txt”应该是一个相对路径。但是,不能将相对文件与方案(“文件://”)一起使用。
请提供完整的绝对路径,它应该工作。
请注意,相对路径通常不适用于pythonapi,因为我们在临时位置执行脚本。