cassandra Scala:将结果保存在toDf临时表中

6tr1vspr  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(197)

我尝试在toDF TempTable中保存一些分析,但收到以下错误“:215:错误:值toDF不是Double”的成员。我正在阅读一个Cassandra表的数据,并且我正在做一些计算。我想把这些结果保存到一个临时表中。我是scala新手,有人可以帮助我吗?我的代码

case class Consumo(consumo:Double, consumo_mensal: Double, mes:   org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double);

object Analysegridata{

val conf = new SparkConf(true)

 .set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression")
.set("spark.cassandra.connection.port", "9042")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true");
val sc = new SparkContext(conf);

val ssc = new StreamingContext(sc, Seconds(1))

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val checkpointDirectory = "/var/lib/cassandra/data"
ssc.checkpoint(checkpointDirectory)   // set checkpoint directory

// val context = StreamingContext.getOrCreate(checkpointDirectory)    
 import sqlContext.implicits._
 JavaSparkContext.fromSparkContext(sc);

 def rddconsumo(rddData: Double): Double = {

 val  rddData: Double = {  
  implicit val data = conf
  val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect

 def goto(cs: Array[Double]): Double = {

   var consumo = 0.0;
   var totaldias = 0;
   var soma_pf = 0.0;
   var somamc = 0.0;
   var tempo_gasto = 0.0;
   var consumo_mensal = 0.0;
   var i=0
for (i <- 0 until grid.length) {      
   val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE");
   val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY");
   val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH");
   val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR");
   val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH");
   val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3")

 def convert_minutos (minuto : Int) : Double ={
  minuto/60
 }
   dia.foreach (i =>  {

   def adSum(potencia: Array[Double]) = { 

    var i=0;
      while (i < potencia.length) {
       soma_pf  += potencia(i);
       i += 1;
       soma_pf;
       println("Potemcia =" + soma_pf)  
    }  
 }
   def tempo(minutos: Array[Int]) = { 
       var i=0;
       while (i < minutos.length) {
         somamc  += convert_minutos(minutos(i))  
         i += 1;
        somamc 
     } 
  }

  def tempogasto(horas: Array[Int]) = { 
      var i=0;
      while (i < horas.length) {
       tempo_gasto = horas(i) + somamc;
       i += 1;
       tempo_gasto;
       println("Temo que o aparelho esteve ligado =" + tempo_gasto)
     } 
 }

 def consumof(dia: Array[Int]) = { 
     var i=0;
     while (i < dia.length) {
       consumo = soma_pf * tempo_gasto;
       i += 1;
       consumo; 
       println("Consumo diario =" + consumo)       
     } 
  }
})

mes.foreach (i => {

 def totaltempo(dia: Array[Int]) = { 
     var i = 0;
     while(i < dia.length){
       totaldias += dia(i);
       i += 1;
       totaldias;
       println("Numero total de dias =" + totaldias)  
     }
 }
 def consumomensal(mes: Array[Int]) = { 
     var i = 0;
     while(i < mes.length){
      consumo_mensal = consumo * totaldias;
      i += 1;
     consumo_mensal;
     println("Consumo Mensal =" + consumo_mensal);
   }
} 
})

}
    consumo; 
    totaldias;
    consumo_mensal; 
    soma_pf;
    tempo_gasto;
    somamc

}

 rddData

  }
   rddData.toDF().registerTempTable("rddData")
 }
      ssc.start()
      ssc.awaitTermination()

 error: value toDF is not a member of Double"
ctehm74n

ctehm74n1#

我们还不清楚您到底要做什么(代码太多,请尝试提供一个 * 最小 * 的示例),但有几个明显的问题:
1.rddData具有类型Double:看起来应该是RDD[Double](这是一个双精度值的分布式集合)。试图将单个Double值保存为表没有多大意义,实际上-不起作用(toDF可以在RDD上调用,而不是在任何类型上,特别是在Double上,正如编译器警告的那样)。
1.collect()的数据:如果要加载RDD,请使用一些操作对其进行转换,然后将其保存为表-collect()可能不应在RDD上调用。collect()发送所有数据(跨群集分布)到单个“驱动程序”计算机(运行此代码的程序)-在此之后,您将无法利用群集,并且同样不使用RDD数据结构,因此无法使用toDF将数据转换为DataFrame。

相关问题