我正在尝试使用hadoop2.7上的spark2.1.1将数据从cassandra提取到一个特定的分区配置单元表中。为此,我将来自cassandra的所有数据转换为rdd,并通过rdd.todf()将其转换为Dataframe,然后传递到以下函数:
public def writeToHive(ss: SparkSession, df: DataFrame) {
df.createOrReplaceTempView(tablename)
val cols = df.columns
val schema = df.schema
// logs 358
LOG.info(s"""SELECT COUNT(*) FROM ${tablename}""")
val outdf = ss.sql(s"""INSERT INTO TABLE ${db}.${t} PARTITION (date="${destPartition}") SELECT * FROM ${tablename}""")
// Have also tried the following lines below, but yielded the same results
// var dfInput_1 = dfInput.withColumn("region", lit(s"${destPartition}"))
// dfInput_1.write.mode("append").insertInto(s"${db}.${t}")
// logs 358
LOG.info(s"""SELECT COUNT(*) FROM ${tablename}""")
// logs 423
LOG.info(s"""SELECT COUNT(*) FROM ${db}.${t} where date='${destPartition}'""")
}
在cassandra中,表中确实有358行。我在hortonworks上看到这个帖子https://community.hortonworks.com/questions/51322/count-msmatch-while-using-the-parquet-file-in-spar.html 但似乎没有解决办法。我已尝试将spark.sql.hive.metastorepartitionprunning设置为true,但在行计数中未看到任何更改。
希望您能提供任何关于行计数之间存在差异的反馈。谢谢!
编辑:错误数据进入。。。。我早该料到的
1条答案
按热度按时间nxowjjhe1#
有时数据包含非utf8字符,如日语或汉语。检查数据是否包含任何此类非utf8字符。
如果是这种情况,请以orc格式插入。默认情况下,它是文本,文本不支持非utf8字符。