我试着做到以下几点:
env
.readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)
.map(tp => tp)
但是我的编辑器中出现了一个类型不匹配错误:
Expected: MapFunction[Tuple2[Text, Text], NotInferedR], actual: (Nothing) => Nothing
我怎样才能解决这个问题?
这是完整的代码:
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
class OptimizedFlinkTeraPartitioner(underlying:TotalOrderPartitioner) extends Partitioner[OptimizedText] {
def partition(key:OptimizedText, numPartitions:Int):Int = {
underlying.getPartition(key.getText())
}
}
object FlinkTeraSort {
implicit val textOrdering = new Ordering[Text] {
override def compare(a:Text, b:Text) = a.compareTo(b)
}
def main(args: Array[String]){
if(args.size != 4){
println("Usage: FlinkTeraSort hdfs inputPath outputPath #partitions ")
return
}
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()
val hdfs = args(0)
val inputPath= hdfs+args(1)
val outputPath = hdfs+args(2)
val partitions = args(3).toInt
val mapredConf = new JobConf()
mapredConf.set("fs.defaultFS", hdfs)
mapredConf.set("mapreduce.input.fileinputformat.inputdir", inputPath)
mapredConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath)
mapredConf.setInt("mapreduce.job.reduces", partitions)
val partitionFile = new Path(outputPath, TeraInputFormat.PARTITION_FILENAME)
val jobContext = Job.getInstance(mapredConf)
TeraInputFormat.writePartitionFile(jobContext, partitionFile)
val partitioner = new OptimizedFlinkTeraPartitioner(new TotalOrderPartitioner(mapredConf, partitionFile))
val data = env.readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath)
data.map(tp => tp)
data.output(new HadoopOutputFormat[Text, Text](new TeraOutputFormat(), jobContext))
env.execute("TeraSort")
}
}
(内部版本.sbt):
name := "terasort"
version := "0.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.3"
fork in run := true
1条答案
按热度按时间pkbketx91#
解决问题。