将rdd保存到hdfs时出错

whhtz7ly  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(652)

我试图使用scala将rdd保存到hdfs中,出现以下错误:

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, quickstart.cloudera, executor 3): java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1020)
        at java.lang.Float.parseFloat(Float.java:452)
        at scala.collection.immutable.StringLike$class.toFloat(StringLike.scala:231)
        at scala.collection.immutable.StringOps.toFloat(StringOps.scala:31)
        at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
        at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1196)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1195)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1183)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

首先,我将一个文件读入hdfs,它读取正确。之后,我尝试进行一些转换,比如更改字段分隔符(管道),然后将其写回hdfs。如果有人能帮我,这是我的密码。

val productsRDD= sc.textFile("/user/cloudera/products/products")
val products2RDD=productsRDD.map(a=>a.split(","))
case class clas1(product_id: Int,product_category_id: Int,product_name: String,product_description: String,product_price: Float,product_image: String)
val products = products2RDD.map(b => clas1(Integer.parseInt(0),Integer.parseInt(1),(2).toString,(3).toString,(4).toFloat,(5).toString))
val r = products.toDF()
r.registerTempTable("productsDF")
val prodDF = sqlContext.sql("select * from productsDF where product_price > 100")

/* everything goes fine until this line*/

prodDF.map(c => c(0)+"|"+c(1)+"|"+c(2)+"|"+c(3)+"|"+c(4)+"|"+c(5)).saveAsTextFile("/user/cloudera/problem1/pipes1")

数据框的字段:

| Field               | Type         | Null | Key | Default | Extra          |
+---------------------+--------------+------+-----+---------+----------------+
| product_id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| product_category_id | int(11)      | NO   |     | NULL    |                |
| product_name        | varchar(45)  | NO   |     | NULL    |                |
| product_description | varchar(255) | NO   |     | NULL    |                |
| product_price       | float        | NO   |     | NULL    |                |
| product_image       | varchar(255) | NO   |     | NULL    |                |

我是scala的新手,我很感激你的帮助。。。谢谢您!

z9gpfhce

z9gpfhce1#

从错误中看-java.lang.numberformatexception:空字符串
当您试图从字符串为空的字符串中解析整数时,似乎存在错误,因此您将看到这个特定的erorr。
你能做的就是在转换之前和分裂之后使用coalesce。创建一个dataframe,sparksql中有一个coalesce特性,它将把空值替换为“null”

xoshrz7s

xoshrz7s2#

根据您的cdh版本,spark2有一个内置的csv阅读器。

case class Product(product_id: Int,product_category_id: Int,product_name: String,product_description: String,product_price: Float,product_image: String)

val productsDs = spark.csv("/user/cloudera/products/products").as[Product]
val expensiveProducts = productDs.where($"product_price" > 100.0)

如果不使用spark2,您肯定应该升级一些本地客户机以指向您的同一个yarn集群,或者使用sparkcsv来不必处理一个糟糕的csv解析器 map(... split(",")) 注意:我不知道如果列如错误所说是空的,case类是否还能工作
如果你只是想改变一个分隔符,你也可以用csv格式化程序把它写出来

expensiveProducts.write
    .option("sep", "|")
    .csv("/user/cloudera/problem1/pipes1")

相关问题