sparksql:我做得对吗?

6ioyuze2  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(436)

下面是如何在我正在使用的一个小应用程序中使用sparksql。我有两个hbase表,比如t1,t2。
我的输入是一个csv文件,我解析每一行并查询(sparksql)表t1。我将输出写入另一个文件。
现在我解析第二个文件并查询第二个表,对结果应用某些函数并输出数据。表t1包含购买详细信息,表t2包含每个用户随时间框架添加到购物车的项目列表。
输入->客户ID(csv文件中的列表)
输出->下面提到的特定格式的csv文件。
customerid,他带来的物品的详细信息,他添加到购物车的第一件物品,他添加到购物车的所有物品,直到购买为止。
输入1100条记录,需要两个小时才能完成整个过程!
我想知道我能否加快这个过程,但我被打动了。有什么帮助吗?

vtwuwzda

vtwuwzda1#

这个Dataframe方法怎么样。。。
1) 从csv创建Dataframe。
如何将csv文件读取为dataframe或类似的示例。

val csv = sqlContext.sparkContext.textFile(csvPath).map {
  case(txt) =>
    try {
      val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
      val parsedRow = reader.readNext()
      Row(mapSchema(parsedRow, schema) : _*)
    } catch {
     case e:  IllegalArgumentException =>  throw new UnsupportedOperationException("converted from Arg to Op except")
 }
    }

2) 从hbase数据(如果使用hortonworks)或phoenix创建另一个Dataframe。
3) 加入并应用函数(可以是udf,也可以是其他函数)。。等等…),生成的文件可能又是一个Dataframe
4) 将结果Dataframe与第二个表连接并以csv格式输出数据,如下面的示例中的伪代码所示。。。
应该可以准备带有自定义列和相应值的数据框,并另存为csv文件。你也可以把这种放在Spark壳里。

val df = sqlContext.read.format("com.databricks.spark.csv").
                             option("header", "true").
                             option("inferSchema","true").
                             load("cars93.csv")
    val df2=df.filter("quantity <= 4.0")
    val col=df2.col("cost")*0.453592
    val df3=df2.withColumn("finalcost",col)
    df3.write.format("com.databricks.spark.csv").
                             option("header","true").
                             save("output-csv")

希望这有帮助。。祝你好运。

相关问题