我对luigi和python没有太多经验,但我正在尝试确定为什么配置单元查询的结果没有保存到指定的输出文件中。我认为相关的是通过query()方法执行并在output()中定义保存位置:
class deliverableweekValues(HiveQueryTask):
tablename = luigi.Parameter(default='basetable')
database = luigi.Parameter(default='base_database')
#needs to return something as output, for subsequent splitting of tasks
def output(self):
return luigi.LocalTarget('weeks_for_metrics.txt')
#this determine what weeks are available to run metrics on
def query(self):
tmpl = """
SELECT DISTINCT week FROM {0}.{1} ORDER BY week
"""
qry = tmpl.format(self.database,self.tablename)
print (qry)
return qry
# run only the above function via line below,***WORKS***
luigi.run(['deliverableweekValues','--local-scheduler'])
这不会出错,只是不会保存到名为weeks\u for \u metrics.txt的文件中。路易吉输出:
DEBUG: Checking if deliverableweekValues(tablename=basetable, database=base_database) is complete
/usr/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
warnings.warn("Parameter {0} is not of type string.".format(str(x)))
INFO: Informed scheduler that task deliverableweekValues_base_database_basetable_96cd7fa3f7 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
SELECT DISTINCT week FROM base_database.basetable ORDER BY week
INFO: ['hive', '-f', '/tmp/tmpEaQmC4', '--hiveconf', "mapred.job.name='deliverableweekValues_base_database_basetable_96cd7fa3f7'"]
INFO: hive -f /tmp/tmpEaQmC4 --hiveconf mapred.job.name='deliverableweekValues_base_database_basetable_96cd7fa3f7'
INFO: log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender.
INFO: Logging initialized using configuration in file:/etc/hive/2.6.1.0-129/0/hive-log4j.properties
INFO: Query ID = jdavidson_20180326153035_c2fdf713-037f-4c79-93f6-0328fe57d208
INFO: Total jobs = 1
INFO: Launching Job 1 out of 1
INFO: Status: Running (Executing on YARN cluster with App id application_1520857775863_57954)
INFO: OK
INFO: Time taken: 30.451 seconds, Fetched: 122 row(s)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task deliverableweekValues_base_database_basetable_96cd7fa3f7 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 deliverableweekValues(tablename=basetable, database=base_database)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
True
但是,尽管完成时没有出错,并且获取了正确的122行,但该文件不在目录中。
ls week*
ls: cannot access week*: No such file or directory
这个例子确实是写的,来自luigi文档
通过使用以下示例代码,我已经成功地以这种方式编写了文件,因此我认为我缺少一些非常基本的东西:
class GenerateWords(luigi.Task):
def output(self):
return luigi.LocalTarget('words.txt')
def run(self):
# write a dummy list of words to output file
words = ['apple','banana','grapefruit']
with self.output().open('w') as f:
for word in words:
f.write('{word}\n'.format(word=word))
class CountLetters(luigi.Task):
def requires(self):
return GenerateWords()
def output(self):
return luigi.LocalTarget('letter_counts.txt')
def run(self):
# read in file as list
with self.input().open('r') as infile:
words = infile.read().splitlines()
# write each word to output file with its corresponding letter count
with self.output().open('w') as outfile:
for word in words:
outfile.write(
'{word} | {letter_count}\n'.format(
word=word,
letter_count=len(word)
)
)
# run via
luigi.run(['CountLetters','--local-scheduler'])
这将创建以下文件:
ls w*
words.txt
ls letter*
letter_counts.txt
暂无答案!
目前还没有任何答案,快来回答吧!