.txt文件如下所示:
1234567813572468
1234567813572468
1234567813572468
1234567813572468
1234567813572468
当我把它读入,并分成3个不同的列时,我返回这个(完美的):
df = spark.read.option("header" , "false")\
.option("inferSchema", "true" )\
.text( "fixed-width-2.txt" )
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
但是,如果我再读一遍,然后应用一个模式。。。
from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df_new = spark.read.csv("fixed-width-2.txt", schema=schema)
df_new.printSchema()
root
|-- col1: integer (nullable = true)
|-- col2: integer (nullable = true)
|-- col3: integer (nullable = true)
文件中的数据已丢失:
df_new.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
+----+----+----+
所以我的问题是,如何读入这个文本文件并应用模式?
1条答案
按热度按时间8dtrkrch1#
使用架构读取时
col1
作为int
此值超过1234567813572468
最大int值。相反,阅读LongType
.from pyspark.sql.types import *
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(
spark.sparkContext.textFile("fixed_width.csv").
map(lambda x:(int(x[0:4]),int(x[4:8]),int(x[8:12]))),schema)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|1357|
|1234|5678|1357|
|1234|5678|1357|
|1234|5678|1357|
|1234|5678|1357|
+----+----+----+
df.printSchema()
root
|-- col1: integer (nullable = true)
|-- col2: integer (nullable = true)
|-- col3: integer (nullable = true)
`Using DataFrame Api:`
df = spark.read.option("header" , "false")
.option("inferSchema", "true" )
.text( "path")
sorted_df = df.select(
df.value.substr(1, 4).alias('col1'),
df.value.substr(5, 4).alias('col2'),
df.value.substr(8, 4).alias('col3'),
)
dynamic cast expression
casting=[(col(col_name).cast("int")).name(col_name) for col_name in sorted_df.columns]
sorted_df=sorted_df.select(casting)
required dataframe
sorted_df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
+----+----+----+
just in case if you want to change the types
schema = StructType([StructField('col1', IntegerType(), True),
StructField('col2', IntegerType(), True),
StructField('col3', IntegerType(), True)])
df=spark.createDataFrame(sorted_df.rdd,schema)
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
|1234|5678|8135|
+----+----+----+