如何读取文本文件并使用pyspark应用模式?

9udxz4iz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(478)

.txt文件如下所示:

1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468

当我把它读入,并分成3个不同的列时,我返回这个(完美的):

df = spark.read.option("header"     , "false")\
               .option("inferSchema", "true" )\
               .text( "fixed-width-2.txt"    )

sorted_df = df.select(
    df.value.substr(1, 4).alias('col1'),
    df.value.substr(5, 4).alias('col2'),
    df.value.substr(8, 4).alias('col3'),
).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|

但是,如果我再读一遍,然后应用一个模式。。。

from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
                     StructField('col2', IntegerType(), True),
                     StructField('col3', IntegerType(), True)])
df_new = spark.read.csv("fixed-width-2.txt", schema=schema)
df_new.printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: integer (nullable = true)
 |-- col3: integer (nullable = true)

文件中的数据已丢失:

df_new.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+

所以我的问题是,如何读入这个文本文件并应用模式?

8dtrkrch

8dtrkrch1#

使用架构读取时 col1 作为 int 此值超过 1234567813572468 最大int值。相反,阅读 LongType .

schema = StructType([StructField('col1', LongType(), True)])
spark.read.csv("path",schema=schema).show()

# +----------------+

# |            col1|

# +----------------+

# |1234567813572468|

# |1234567813572468|

# |1234567813572468|

# |1234567813572468|

# |1234567813572468|

# +----------------+

``` `Using RDD Api:` 更简单的方法是使用 `.textFile` (结果rdd)然后使用 `.map` 然后使用模式转换为Dataframe。

from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)

df.show()

+----+----+----+

|col1|col2|col3|

+----+----+----+

|1234|5678|1357|

|1234|5678|1357|

|1234|5678|1357|

|1234|5678|1357|

|1234|5678|1357|

+----+----+----+

df.printSchema()

root

|-- col1: integer (nullable = true)

|-- col2: integer (nullable = true)

|-- col3: integer (nullable = true)

`Using DataFrame Api:`
df = spark.read.option("header" , "false")
.option("inferSchema", "true" )
.text( "path")

sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
)

dynamic cast expression

casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)

required dataframe

sorted_df.show()

+----+----+----+

|col1|col2|col3|

+----+----+----+

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

+----+----+----+

just in case if you want to change the types

schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])

df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()

+----+----+----+

|col1|col2|col3|

+----+----+----+

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

|1234|5678|8135|

+----+----+----+

相关问题