spark-load csv文件作为Dataframe?

vuktfyat  于 2021-05-30  发布在  Hadoop
关注(0)|答案(14)|浏览(319)

我想在spark中读取csv并将其转换为Dataframe,然后将其存储在hdfs中 df.registerTempTable("table_name") ###我试过:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在apachespark中,将csv文件作为Dataframe加载的正确命令是什么?

avkwfej4

avkwfej41#

penny的spark 2示例就是在spark2中实现它的方法。还有一个技巧:通过设置选项,对数据进行初始扫描,为您生成头文件 inferSchematrue 在这里,那么,假设 spark 是您设置的spark会话,是加载到amazon在s3上托管的所有陆地卫星图像的csv索引文件中的操作。

/*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

坏消息是:这会触发对文件的扫描;对于像这个20+mb压缩csv文件这样的大文件,长距离连接可能需要30秒。记住这一点:一旦得到模式,最好手动编写模式代码。
(代码片段)apache软件许可证2.0获得许可,以避免所有歧义;我做了一些s3集成的演示/集成测试)

efzxgjgh

efzxgjgh2#

在Java1.8中,这个代码片段非常适合读取csv文件
pom.xml文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
bwitn5fc

bwitn5fc3#

如果您正在使用Scala2.11和Apache2.0或更高版本构建jar。
不需要创建 sqlContext 或者 sparkContext 对象。只是一个 SparkSession 对象满足所有需求。
下面是mycode,它工作正常:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

如果你在集群中运行,只需改变 .master("local").master("yarn") 在定义 sparkBuilder 对象
spark文档包括:https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

gblwokeq

gblwokeq4#

如果使用spark 2.0,请尝试此操作+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")

For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

note:- this 为任何分隔文件工作。只需使用选项(“delimiter”,)来更改值。
希望这有帮助。

jdgnovmf

jdgnovmf5#

默认文件格式是parquet with spark.read。。文件读取csv,这就是为什么会出现异常。使用您尝试使用的api指定csv格式

y3bcpkx1

y3bcpkx16#

使用spark 2.x解析csv并加载为dataframe/dataset

首先,初始化 SparkSession 对象默认情况下,它将作为 spark ```
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;

使用以下任何一种方式加载csv作为 `DataFrame/DataSet` ###1. 以程序化的方式进行

val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")


#### 更新:从这里添加所有选项,以防将来链接中断

路径:文件的位置。类似于spark,它可以接受标准hadoop globbing表达式。
标题:设置为true时,第一行文件将用于命名列,并且不会包含在数据中。所有类型都假定为字符串。默认值为false。
分隔符:默认情况下,列使用分隔符分隔,但分隔符可以设置为任何字符
引号:默认情况下引号字符为“,但可以设置为任何字符。引号内的分隔符将被忽略
转义:默认情况下,转义字符为,但可以设置为任何字符。忽略转义引号字符
parserlib:默认情况下,可以将“commons”设置为“univocity”,以便使用该库进行csv解析。
模式:确定解析模式。默认情况下,它是允许的。可能的值为:
permissive:尝试分析所有行:为缺少的标记插入空值,并忽略多余的标记。
dropmalformed:删除比预期标记少或多的行或与架构不匹配的标记
failfast:如果遇到任何格式不正确的行字符集,将以runtimeexception终止:默认为“utf-8”,但可以设置为其他有效的字符集名称
inferschema:自动推断列类型。它需要额外传递一次数据,默认情况下为false注解:跳过以该字符开头的行。默认值为“#”。通过将其设置为null来禁用注解。
nullvalue:指定一个表示空值的字符串,任何与此字符串匹配的字段都将在dataframe中设置为空
dateformat:指定一个字符串,指示读取日期或时间戳时要使用的日期格式。自定义日期格式遵循java.text.simpleDataFormat中的格式。这同时适用于datetype和timestamptype。默认情况下,它是null,这意味着尝试通过java.sql.timestamp.valueof()和java.sql.date.valueof()解析时间和日期。

### 2. 您也可以用这种sql方法

val df = spark.sql("SELECT * FROM csv.hdfs:///csv/file/dir/file.csv")

依赖项:

"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,


## Spark版本<2.0

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");

依赖项:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

brgchamk

brgchamk7#

它的hadoop是2.6,spark是1.6,没有“databricks”包。

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)
fgw7neuy

fgw7neuy8#

将以下spark依赖项添加到pom文件:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

//Spark配置:
val spark=sparksession.builder().master(“本地”).appname(“示例应用程序”).getorcreate()
//读取csv文件:
val df=spark.read.option(“header”,“true”).csv(“文件路径”)
//显示输出
df.show()

46qrfjad

46qrfjad9#

使用内置的spark csv,您可以使用spark>2.0的新sparksession对象轻松完成。

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

您可以设置多种选项。 header :文件顶部是否包含标题行 inferSchema :是否要自动推断架构。默认值为 true . 我总是喜欢提供模式来确保正确的数据类型。 mode :解析模式,允许,dropmalformed或failfast delimiter :若要指定分隔符,默认值为逗号(',')

mzillmmw

mzillmmw10#

sparkcsv是核心spark功能的一部分,不需要单独的库。所以你可以举个例子

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

在scala中,(这适用于分隔符中的任何格式,例如“,”对于csv“,\t”对于tsv等) val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

bjg7j2ky

bjg7j2ky11#

使用spark 2.0,以下是如何读取csv

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)
pgx2nnw8

pgx2nnw812#

解析csv文件有很多挑战,如果文件大小更大,如果列值中有非英语/转义符/分隔符/其他字符,则会不断增加,这可能会导致解析错误。
神奇之处在于所使用的选项。为我和霍普工作的那些应该涵盖大多数边缘案例的代码如下:


### Create a Spark Session

spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error

html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

希望有帮助。有关更多信息,请参阅:使用pyspark 2读取包含html源代码的csv
注意:上面的代码来自spark2api,其中csv文件读取api与spark可安装的内置包捆绑在一起。
注意:pyspark是spark的python Package 器,与scala/java共享相同的api。

wvmv3b1j

wvmv3b1j13#

使用spark 2.4+,如果您想从本地目录加载csv,那么可以使用2个会话并将其加载到配置单元中。第一个会话应使用master()config创建为“local[*]”,第二个会话应使用“yarn”并启用配置单元。
下面的那个对我有用。

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

当你和 spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar 一切顺利,在Hive里创造了一张table。

vsaztqbk

vsaztqbk14#

要读取系统上的相对路径,请使用system.getproperty方法获取当前目录,并进一步使用相对路径加载文件。

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

spark:2.4.4 scala:2.11.12

相关问题