apachespark和scala中数据集的java移动平均

v7pvogib  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(312)

我必须使用apache spark和scala作为编程语言在数据集上执行以下任务:
从hdfs读取数据集。一些采样线如下所示:

deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613

按设备id对数据进行分组。因此,我们现在有了deviceid=>(字节,eventdate)的Map
对于每个设备,按事件日期对集合进行排序。我们现在为每个设备提供了一组基于事件日期的有序字节。
从这个有序集合中选取最后30天的字节。
使用30的时间段查找最后一个日期的移动平均字节数。
使用30的时间段查找最终日期字节的标准偏差。
返回结果中的两个值(mean-kstddev)和(mean+kstddev)[假设k=3]
我使用的是ApacheSpark1.3.0。实际的数据集更宽,最终必须在10亿行上运行。
以下是数据集的数据结构。

package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}

deviceanalyzer类如下所示:

package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
    val sc = new SparkContext(sparkConf)
    val logFile = args(0)
    val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    deviceIdsMap.foreach(
         // I am stuck here !!
    })
    sc.stop()
  }
}

但我仍然坚持这个算法的实际实现。

6pp0gazn

6pp0gazn1#

我有一个非常粗糙的实现来完成这项工作,但它不符合标准。对不起,我对scala/spark非常陌生,所以我的问题很基本。以下是我现在拥有的:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      val sortedAggregates = Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
      println(sortedAggregates) // This does not work - returns an empty array !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray  // This should be sortedAggregates.map (but does not compile)
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}

如果有人能提出以下改进建议,我将不胜感激:
对sorting.quicksort的调用无效。也许我说的不对。
我想使用spark mllib类multivariatestatisticalsummary来计算统计值。
为此,我需要将所有中间值保留为rdd,以便可以直接使用rdd方法来完成这项工作。
最后,我还需要将结果写入hdfs,rdd类上提供了一个方法来执行此操作,这也是我希望将所有内容保留为rdd的另一个原因。

相关问题