我尝试通过以下步骤将Dataframe加载到配置单元表中:
读取源表并将Dataframe保存为hdfs上的csv文件
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
按照“我的配置单元表列”对列进行排序“我的配置单元表列”以字符串形式显示,格式为:
val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype
val schemaList = hiveCols.split("\\|")
val hiveColumnOrder = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq
val finalDF = yearDF.selectExpr(hiveColumnOrder:_*)
我在“execquery”中读取的列的顺序与“hivecolumnorder”相同,为了确保顺序,我使用selectexpr再次选择yeardf中的列
将Dataframe保存为hdfs上的csv文件:
newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/")
一旦我保存了dataframe,我就从“hivecols”中获取相同的列,准备一个ddl来在相同的位置创建一个hive表,其值用逗号分隔,如下所示:
如果schema.tablename不存在,则创建表(col1 coldatatype、col2 coldatatype、col3 coldatatype、col4 coldatatype…col200 datatype)
以','结尾的行格式分隔字段
存储为文本文件
'位置'hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/';
在我将dataframe加载到创建的表中之后,我在这里面临的问题是,当我查询表时,我在查询中得到了不正确的输出。例如:如果在将Dataframe保存为文件之前对其应用以下查询:
finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2
我得到了正确的输出。所有值都与列正确对齐:
[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]
但是在csv文件中保存dataframe之后,在它上面创建一个表(步骤4)并对创建的表应用相同的查询我看到数据混乱并且与列不正确地Map:
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id | line_num | debit_rate | debit_rate_text | credit_rate | credit_rate_text | activity_amount | activity_amount_text | exchange_rate | exchange_rate_text | amount_cr | amount_cr_text |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598 | 2 | NULL | | 381761.4 | | 5686.76 | 5686.76 | NULL | -5686.76 | NULL | |
因此,我尝试使用另一种方法,预先创建配置单元表,并从dataframe向其中插入数据:
在上面的步骤4中运行ddl
最终创建或替换临时视图(“tmptable”)
sql(“insert into schema.table select*from tmptable”)
如果我在作业完成后运行前面提到的select查询,即使这种方法也会失败。我试着用 refresh table schema.table
以及 msckrepair table schema.table
只是想看看元数据是否有任何问题,但似乎什么也解决不了。
有谁能告诉我是什么导致了这种现象,我在这里操作数据的方式有什么问题吗?
2条答案
按热度按时间628mspwn1#
使用spark 2.3.2测试代码
与从csv文件创建spark dataframe然后将其注册为配置单元表不同,您可以轻松地运行sql命令并从csv文件创建配置单元表
正在使用
spark
对象可以作为配置单元用户运行sql命令:使用以下代码,您可以加载hdfs目录中的所有csv\u文件(或者只提供一个csv文件的路径):
最后,将spark sqlcontext对象注册为hive thriftserver:
这将在端口10000上创建thriftserver终结点。
现在您可以运行beeline并连接到thriftserver:
测试一下table
test_table
创建于my_db
数据库:此外,还可以使用thrifserver jdbc端点创建任何其他配置单元表(或任何hiveql命令)。
以下是所需的依赖项:
xnifntxz2#
我在hiveddl中使用了行格式serde:org.apache.hadoop.hive.serde2.opencsvserde。这也有','作为默认分隔符字符,我不必给任何其他分隔符。