我在hadoop中有两个csv,比如csv1,csv2。两个csv都包含两列(timestamp和somevalue),比如csv1列是t1,v1,csv2列是t2,v2。我想为每个t1=t2计算v1*v2(对于相同的时间戳),并使用sparkjavaapi将结果存储为hdfs中的文本文件。我是新来的,请有人帮帮我。提前告诉他。
isr3a4wc1#
我可以在scala中完成,也许你可以了解我正在做的事情的要点并自己实现:
scala> val df1=sc.parallelize(Seq((1001,2),(1002,3),(1003,4))).toDF("t1","v1") df1: org.apache.spark.sql.DataFrame = [t1: int, v1: int] scala> val df2=sc.parallelize(Seq((1001,3),(1002,4),(1005,4))).toDF("t2","v2") df2: org.apache.spark.sql.DataFrame = [t2: int, v2: int] scala> df1.join(df2,df1("t1")===df2("t2")) res1: org.apache.spark.sql.DataFrame = [t1: int, v1: int ... 2 more fields] scala> res1.show +----+---+----+---+ | t1| v1| t2| v2| +----+---+----+---+ |1002| 3|1002| 4| |1001| 2|1001| 3| +----+---+----+---+ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val result=res1.withColumn("foo",res1("v1") * res1("v2")) result: org.apache.spark.sql.DataFrame = [t1: int, v1: int ... 3 more fields] scala> result.show +----+---+----+---+---+ | t1| v1| t2| v2|foo| +----+---+----+---+---+ |1002| 3|1002| 4| 12| |1001| 2|1001| 3| 6| +----+---+----+---+---+
我希望这能解决你的问题。
1条答案
按热度按时间isr3a4wc1#
我可以在scala中完成,也许你可以了解我正在做的事情的要点并自己实现:
我希望这能解决你的问题。