我正在尝试对消息数据运行分析操作。作为输入,我有一个文本文件,其中有大约100万条大记录作为原始文本。这些原始数据必须转换成csv文件,以分离出单个消息的不同属性。
为了处理如此巨大的数据,处理应该是分布式的,因此我现在在本地使用apachespark。我对单词对计数感兴趣。例如:
Input: "My name is Soham . My name is John"
output: (my, name) ,2
(name , is ),2
(is, Soham ),1
(is, John),1
(soham,my_),1
我现在正在运行以下代码:
用于将原始数据转换为本地部分。regex函数实际上是将一个字符串分成四个属性。
JavaRDD<Tuple4> chats = spark_ctx.textFile("in/whatsapp_chats/WhatsApp_Chat_AB_SK.txt")
.map(v -> chat_regex_ListString(v))
.filter( chat -> chat.size()!=0)
.map( string ->
new Tuple4 <String, String, String, String> ( string.get(0),
string.get(1),
string.get(2),
string.get(3))
);
更重要的是,下面的代码实际计算字数:
chats.flatMap(v -> Arrays.stream(v._4().toString().split(" ")).iterator())
.filter(word -> !stopwords_set.contains(word))
.mapToPair( v1 -> new Tuple2<String,Long>(v1, 1L))
.reduceByKey( (v2,v3) -> v2 +v3)
.mapToPair( v4 -> new Tuple2<Long, String>(v4._2(), v4._1()))
.sortByKey(false)
.map( v5 -> v5._2().concat("," + v5._1().toString()) )
.collect(), Charset.forName("UTF-8")) ;
这就是我正在尝试的;但它没有任何意义,也不会编译
chats.flatMap(v -> IntStream.range(1, v._4().toString().split(" ").length)
.mapToObj(i -> v._4().toString().split(" ")[i1].concat(
+v._4().toString().split(" ")[i]) )
.collect(Collectors.toList()))
.mapToPair( v1 -> new Tuple2<String,Long>(v1, 1L))
.reduceByKey( (v2,v3) -> v2 +v3)
.mapToPair( v4 -> new Tuple2<Long, String>(v4._2(), v4._1()))
.sortByKey(false)
.map( v5 -> v5._2().concat("," + v5._1().toString()) )
.collect(), Charset.forName("UTF-8")) ;
暂无答案!
目前还没有任何答案,快来回答吧!