我想在spark中读取csv并将其转换为Dataframe,然后将其存储在hdfs中 df.registerTempTable("table_name")
###我试过:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
我得到的错误:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在apachespark中,将csv文件作为Dataframe加载的正确命令是什么?
14条答案
按热度按时间avkwfej41#
penny的spark 2示例就是在spark2中实现它的方法。还有一个技巧:通过设置选项,对数据进行初始扫描,为您生成头文件
inferSchema
至true
在这里,那么,假设spark
是您设置的spark会话,是加载到amazon在s3上托管的所有陆地卫星图像的csv索引文件中的操作。坏消息是:这会触发对文件的扫描;对于像这个20+mb压缩csv文件这样的大文件,长距离连接可能需要30秒。记住这一点:一旦得到模式,最好手动编写模式代码。
(代码片段)apache软件许可证2.0获得许可,以避免所有歧义;我做了一些s3集成的演示/集成测试)
efzxgjgh2#
在Java1.8中,这个代码片段非常适合读取csv文件
pom.xml文件
java
bwitn5fc3#
如果您正在使用Scala2.11和Apache2.0或更高版本构建jar。
不需要创建
sqlContext
或者sparkContext
对象。只是一个SparkSession
对象满足所有需求。下面是mycode,它工作正常:
如果你在集群中运行,只需改变
.master("local")
至.master("yarn")
在定义sparkBuilder
对象spark文档包括:https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
gblwokeq4#
如果使用spark 2.0,请尝试此操作+
note:- this 为任何分隔文件工作。只需使用选项(“delimiter”,)来更改值。
希望这有帮助。
jdgnovmf5#
默认文件格式是parquet with spark.read。。文件读取csv,这就是为什么会出现异常。使用您尝试使用的api指定csv格式
y3bcpkx16#
使用spark 2.x解析csv并加载为dataframe/dataset
首先,初始化
SparkSession
对象默认情况下,它将作为spark
```val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
val df = spark.sql("SELECT * FROM csv.
hdfs:///csv/file/dir/file.csv
")"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
brgchamk7#
它的hadoop是2.6,spark是1.6,没有“databricks”包。
fgw7neuy8#
将以下spark依赖项添加到pom文件:
//Spark配置:
val spark=sparksession.builder().master(“本地”).appname(“示例应用程序”).getorcreate()
//读取csv文件:
val df=spark.read.option(“header”,“true”).csv(“文件路径”)
//显示输出
df.show()
46qrfjad9#
使用内置的spark csv,您可以使用spark>2.0的新sparksession对象轻松完成。
您可以设置多种选项。
header
:文件顶部是否包含标题行inferSchema
:是否要自动推断架构。默认值为true
. 我总是喜欢提供模式来确保正确的数据类型。mode
:解析模式,允许,dropmalformed或failfastdelimiter
:若要指定分隔符,默认值为逗号(',')mzillmmw10#
sparkcsv是核心spark功能的一部分,不需要单独的库。所以你可以举个例子
在scala中,(这适用于分隔符中的任何格式,例如“,”对于csv“,\t”对于tsv等)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
bjg7j2ky11#
使用spark 2.0,以下是如何读取csv
pgx2nnw812#
解析csv文件有很多挑战,如果文件大小更大,如果列值中有非英语/转义符/分隔符/其他字符,则会不断增加,这可能会导致解析错误。
神奇之处在于所使用的选项。为我和霍普工作的那些应该涵盖大多数边缘案例的代码如下:
希望有帮助。有关更多信息,请参阅:使用pyspark 2读取包含html源代码的csv
注意:上面的代码来自spark2api,其中csv文件读取api与spark可安装的内置包捆绑在一起。
注意:pyspark是spark的python Package 器,与scala/java共享相同的api。
wvmv3b1j13#
使用spark 2.4+,如果您想从本地目录加载csv,那么可以使用2个会话并将其加载到配置单元中。第一个会话应使用master()config创建为“local[*]”,第二个会话应使用“yarn”并启用配置单元。
下面的那个对我有用。
当你和
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
一切顺利,在Hive里创造了一张table。vsaztqbk14#
要读取系统上的相对路径,请使用system.getproperty方法获取当前目录,并进一步使用相对路径加载文件。
spark:2.4.4 scala:2.11.12