我只是在读取一个Parquet文件,并添加一个过滤器,以匹配所有的记录,落在日期-这里2021-04-03。列不应为null,并且应在给定的日期。
输入表
+---------+-----------+-------------------+
| lat| lng| eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+
到目前为止,我已经尝试过强制转换列,使用substring\u index函数进行匹配,但是我无法在推送的过滤器中获得它。
以下是我尝试的代码:
df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
.select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)
过滤器仅在数据过滤器中列出,而不在其他任何地方列出。我错过了什么?
1条答案
按热度按时间3zwjbxry1#
并不是所有的过滤器都能被按下。一般来说,大多数包含函数调用的过滤器
substring
或者unix_timestamp
不能向下推。在datasourcestrategy中实现了过滤器被下推的完整逻辑。在这种情况下,解决此限制的一种方法是存储
eventDTLocal
作为unix时间戳而不是parquet文件中的字符串,然后按特定的毫秒进行过滤。物理平面图
df
```== Physical Plan ==
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
+- *(1) ColumnarToRow
+- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: structlat:double,lng:double,eventDTLocal:bigint