from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
class MRAvro(MRJob):
# Converts each AVRO record into one JSON record per line
HADOOP_INPUT_FORMAT = 'org.apache.avro.mapred.AvroAsTextInputFormat'
# Reads each JSON line into
INPUT_PROTOCOL = JSONProtocol
def mapper(self, avro_record, _):
# TODO
def reducer(self, key, values):
# TODO
2条答案
按热度按时间p8ekf7hl1#
正如chiron所解释的,您需要指定hadoop输入格式。这可以通过设置
HADOOP_INPUT_FORMAT
mrjob中的选项在配置中,您需要确保
AvroAsTextInputFormat
在集群上可用;从v0.5.3开始,您可以使用--libjar
或者在mrjob配置文件中配置libjars(在v0.5.3未发布时;请参见上的讨论--libjar
在功能请求中)。我不知道一个简单的方法来集成本地测试与avro(
HADOOP_INPUT_FORMAT
被本地跑步者忽略)。一种解决方案是使用apacheavro工具的tojson方法转换测试数据。否则,您可以使用avro或fastavro库在python中编写自己的函数,为本地执行准备数据。
xt0899hw2#
您需要告诉hadoop您的hadoop作业的“输入格式”是什么格式:
但我不知道你是怎么做的。如果您使用的是纯hadoop,那么我以前的解决方案是有效的。