我是apachespark新手,我正在从hdfs目录中读取文件,然后根据条件进行过滤和排序。
我在hdfs目录中有两个文件第一个文件包含如下数据
Name:xxxx,currenttime:[timestamp],urlvisited:[url]
包含以下信息的第二个文件
Name:xxxx,currenttime :[timestamp],downloadfilename:[filename]
首先,我根据名称过滤数据,然后使用逗号拆分数据,然后使用currenttime字段对数据排序
到目前为止我已经试过了
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class SampleVisit {
public static void main(String[] args) {
final String name = args[0];
SparkConf sparkConf = new SparkConf().setAppName("sample");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/sample/*/",1);
JavaRDD<String> filterdata = lines.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.contains("Name:" + name);
}
});
//Returning all other values as one fields and currenttime as another field
JavaRDD<Tuple2<String,String>> stage2 = filterdata.map(new Function<String, Tuple2<String,String>>() {
public Tuple2<String, String> call(String s) throws ParseException {
String [] entries = s.split(",");
return new Tuple2(s[0]+","+s[2], s[1]);
}
});
List<Tuple2<String,String>> sorted = stage2.takeOrdered(100, new CompareValues()) ;
JavaRDD<Tuple2<String,String>> finale = ctx.parallelize(sorted);
finale.coalesce(1, true).saveAsTextFile("hdfs://localhost:9000/sampleout");
}
}
我的comparevalues.java如下所示
public class CompareValues implements Comparator<Tuple2<String,String>>, Serializable {
@Override
public int compare(Tuple2<String, String> o1, Tuple2<String, String> o2) {
long first = Long.valueOf(o1._2);
long second = Long.valueOf(o2._2);
Date firstDate = new Date(first);
Date secondDate = new Date(second);
return secondDate.compareTo(firstDate);
}
}
当我用name value作为参数运行时,所有文件都按预期运行,但结果返回第一个文件值按顺序排列的数据,然后第二个文件值按顺序排列的数据,但我希望两个文件值按顺序排列的结果有人能帮到我吗?
1条答案
按热度按时间hmtdttj41#
我认为你的问题来自于把你的rdd和
shuffle=true
. 这意味着使用散列分区器对rdd进行洗牌,并将每个项发送到适当的分区。因此,这些项是在分区内排序的,但它们通常不是。当您将其保存到文件时,每个分区都会写入一个单独的文件,以允许并发写入。如果希望结果跨所有分区排序,则需要设置一个partitioner,以确保所有“close”项都在同一分区中。
请看这里的数据分区一章以获得进一步的解释。