大数据

8ehkhllq  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(333)

我正在尝试对消息数据运行分析操作。作为输入,我有一个文本文件,其中有大约100万条大记录作为原始文本。这些原始数据必须转换成csv文件,以分离出单个消息的不同属性。
为了处理如此巨大的数据,处理应该是分布式的,因此我现在在本地使用apachespark。我对单词对计数感兴趣。例如:

Input:  "My name is Soham . My name is John"

output: (my, name) ,2  
        (name , is ),2
        (is, Soham ),1 
        (is, John),1
        (soham,my_),1

我现在正在运行以下代码:
用于将原始数据转换为本地部分。regex函数实际上是将一个字符串分成四个属性。

JavaRDD<Tuple4> chats = spark_ctx.textFile("in/whatsapp_chats/WhatsApp_Chat_AB_SK.txt")
                .map(v -> chat_regex_ListString(v))
                .filter( chat -> chat.size()!=0)
                .map( string ->
                        new Tuple4 <String, String, String, String> ( string.get(0),
                                string.get(1),
                                string.get(2),
                                string.get(3))

                );

更重要的是,下面的代码实际计算字数:

chats.flatMap(v -> Arrays.stream(v._4().toString().split(" ")).iterator())
                        .filter(word -> !stopwords_set.contains(word))
                        .mapToPair( v1 -> new Tuple2<String,Long>(v1, 1L))
                        .reduceByKey( (v2,v3) -> v2 +v3)
                        .mapToPair( v4 -> new Tuple2<Long, String>(v4._2(), v4._1()))
                        .sortByKey(false)
                        .map( v5 -> v5._2().concat("," + v5._1().toString()) )
                        .collect(), Charset.forName("UTF-8")) ;

这就是我正在尝试的;但它没有任何意义,也不会编译

chats.flatMap(v -> IntStream.range(1, v._4().toString().split(" ").length)
                             .mapToObj(i -> v._4().toString().split(" ")[i1].concat( 
                                         +v._4().toString().split(" ")[i]) )
                             .collect(Collectors.toList()))
                        .mapToPair( v1 -> new Tuple2<String,Long>(v1, 1L))
                        .reduceByKey( (v2,v3) -> v2 +v3)
                        .mapToPair( v4 -> new Tuple2<Long, String>(v4._2(), v4._1()))
                        .sortByKey(false)
                        .map( v5 -> v5._2().concat("," + v5._1().toString()) )
                        .collect(), Charset.forName("UTF-8")) ;

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题