配置单元找不到由spark结构化流写入的分区数据

fv2wmkja  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(378)

我有一份spark结构化流媒体工作,将数据写入ibm云对象存储(s3):

  1. dataDf.
  2. writeStream.
  3. format("parquet").
  4. trigger(Trigger.ProcessingTime(trigger_time_ms)).
  5. option("checkpointLocation", s"${s3Url}/checkpoint").
  6. option("path", s"${s3Url}/data").
  7. option("spark.sql.hive.convertMetastoreParquet", false).
  8. partitionBy("InvoiceYear", "InvoiceMonth", "InvoiceDay", "InvoiceHour").
  9. start()

我可以使用hdfs cli查看数据:

  1. [clsadmin@xxxxx ~]$ hdfs dfs -ls s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0 | head
  2. Found 616 items
  3. -rw-rw-rw- 1 clsadmin clsadmin 38085 2018-09-25 01:01 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-1e1dda99-bec2-447c-9bd7-bedb1944f4a9.c000.snappy.parquet
  4. -rw-rw-rw- 1 clsadmin clsadmin 45874 2018-09-25 00:31 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-28ff873e-8a9c-4128-9188-c7b763c5b4ae.c000.snappy.parquet
  5. -rw-rw-rw- 1 clsadmin clsadmin 5124 2018-09-25 01:10 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-5f768960-4b29-4bce-8f31-2ca9f0d42cb5.c000.snappy.parquet
  6. -rw-rw-rw- 1 clsadmin clsadmin 40154 2018-09-25 00:20 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-70abc027-1f88-4259-a223-21c4153e2a85.c000.snappy.parquet
  7. -rw-rw-rw- 1 clsadmin clsadmin 41282 2018-09-25 00:50 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-873a1caa-3ecc-424a-8b7c-0b2dc1885de4.c000.snappy.parquet
  8. -rw-rw-rw- 1 clsadmin clsadmin 41241 2018-09-25 00:40 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-88b617bf-e35c-4f24-acec-274497b1fd31.c000.snappy.parquet
  9. -rw-rw-rw- 1 clsadmin clsadmin 3114 2018-09-25 00:01 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-deae2a19-1719-4dfa-afb6-33b57f2d73bb.c000.snappy.parquet
  10. -rw-rw-rw- 1 clsadmin clsadmin 38877 2018-09-25 00:10 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-e07429a2-43dc-4e5b-8fe7-c55ec68783b3.c000.snappy.parquet
  11. -rw-rw-rw- 1 clsadmin clsadmin 39060 2018-09-25 00:20 s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00001-1553da20-14d0-4c06-ae87-45d22914edba.c000.snappy.parquet

但是,当我尝试查询数据时:

  1. hive> select * from invoiceitems limit 5;
  2. OK
  3. Time taken: 2.392 seconds

我的表ddl如下所示:

  1. CREATE EXTERNAL TABLE `invoiceitems`(
  2. `invoiceno` int,
  3. `stockcode` int,
  4. `description` string,
  5. `quantity` int,
  6. `invoicedate` bigint,
  7. `unitprice` double,
  8. `customerid` int,
  9. `country` string,
  10. `lineno` int,
  11. `invoicetime` string,
  12. `storeid` int,
  13. `transactionid` string,
  14. `invoicedatestring` string)
  15. PARTITIONED BY (
  16. `invoiceyear` int,
  17. `invoicemonth` int,
  18. `invoiceday` int,
  19. `invoicehour` int)
  20. ROW FORMAT SERDE
  21. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  22. STORED AS INPUTFORMAT
  23. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
  24. OUTPUTFORMAT
  25. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  26. LOCATION
  27. 's3a://streaming-data-landing-zone-partitioned/data'

我还尝试了列/分区名称的正确大小写-这也不起作用。
你知道为什么我的查询找不到数据吗?
更新1:
我尝试过将位置设置为一个包含没有分区的数据的目录,但仍然不起作用,所以我想知道这是否是一个数据格式问题?

  1. CREATE EXTERNAL TABLE `invoiceitems`(
  2. `InvoiceNo` int,
  3. `StockCode` int,
  4. `Description` string,
  5. `Quantity` int,
  6. `InvoiceDate` bigint,
  7. `UnitPrice` double,
  8. `CustomerID` int,
  9. `Country` string,
  10. `LineNo` int,
  11. `InvoiceTime` string,
  12. `StoreID` int,
  13. `TransactionID` string,
  14. `InvoiceDateString` string)
  15. PARTITIONED BY (
  16. `InvoiceYear` int,
  17. `InvoiceMonth` int,
  18. `InvoiceDay` int,
  19. `InvoiceHour` int)
  20. STORED AS PARQUET
  21. LOCATION
  22. 's3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/';
  23. hive> Select * from invoiceitems limit 5;
  24. OK
  25. Time taken: 2.066 seconds
23c0lvtd

23c0lvtd1#

读取snappy压缩Parquet文件
数据采用snappy压缩Parquet文件格式。

  1. s3a://streaming-data-landing-zone-partitioned/data/InvoiceYear=2018/InvoiceMonth=9/InvoiceDay=25/InvoiceHour=0/part-00000-1e1dda99-bec2-447c-9bd7-bedb1944f4a9.c000.snappy.parquet

所以在create table ddl语句中设置'parquet.compress'='snappy'表属性。您也可以在ambari的“自定义配置单元站点设置”部分中为iop或hdp设置parquet.compression=snappy。
下面是在配置单元中的表创建语句期间使用table属性的示例:

  1. hive> CREATE TABLE inv_hive_parquet(
  2. trans_id int, product varchar(50), trans_dt date
  3. )
  4. PARTITIONED BY (
  5. year int)
  6. STORED AS PARQUET
  7. TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY');

更新外部表中的分区元数据
另外,对于一个外部分区表,每当任何外部作业(在本例中是spark作业)将分区直接写入datafolder时,我们都需要更新分区元数据,因为除非显式更新分区,否则hive不会意识到这些分区。
这可以通过以下方式实现:

  1. ALTER TABLE inv_hive_parquet RECOVER PARTITIONS;
  2. //or
  3. MSCK REPAIR TABLE inv_hive_parquet;
展开查看全部

相关问题