我正在运行cdh4.4与Spark0.9.0从cloudera包裹。
我有一堆avro文件是通过pig的avrostorage自定义项创建的。我想在spark中加载这些文件,使用avro文件上的通用记录或模式。到目前为止,我已经试过了:
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)
val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader = new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema
val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext) {
buf += dataFileReader.next
}
sc.parallelize(buf)
这适用于一个文件,但无法扩展-我正在将所有数据加载到本地ram中,然后从那里跨spark节点分发数据。
2条答案
按热度按时间rjjhvcjd1#
这对我很有用:
fdbelqdn2#
回答我自己的问题: