val filteredRdd = df.rdd.zipWithIndex().collect { case (r, i) if 10 >= start && i <=20 => r }
val newDf = sqlContext.createDataFrame(filteredRdd, df.schema)
print("- Total number of rows = " + str(sp.initDataCount))
print("- Limit = " + str(sp.limit))
print("- Number of pages = " + str(sp.pages))
print("- Number of rows in the last page = " + str(sp.numberOfRowsInLastPage))
while (sp.page < sp.pages-1):
df_page = sp.next()
nbrRows = df_page.count()
print(" Page " + str(sp.page) + '/' + str(sp.pages) + ": Number of rows = " + str(nbrRows))
-输出:
- Total number of rows = 4521
- Limit = 753
- Number of pages = 7
- Number of rows in the last page = 3
Page 0/7: Number of rows = 753
Page 1/7: Number of rows = 753
Page 2/7: Number of rows = 753
Page 3/7: Number of rows = 753
Page 4/7: Number of rows = 753
Page 5/7: Number of rows = 753
Page 6/7: Number of rows = 3
from pyspark.sql import Window
from pyspark.sql.functions import row_number
offset = 20
num_rows = 10
# Sort the DataFrame based on a column
sorted_df = df.sort("your_sort_column")
# Create a window spec
window_spec = Window.orderBy("your_sort_column")
# Add a row number column using the window spec
df_with_row_number = sorted_df.withColumn(
"row_number", row_number().over(window_spec)
)
# Filter the DataFrame based on the row number column to apply offset and limit
result_df = df_with_row_number.filter(
(df_with_row_number.row_number > offset)
& (df_with_row_number.row_number <= offset + num_rows)
)
# Perform an action to trigger the computation and display the result
result_df.show()
下面是一个等价的例子:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY your_sort_column) as row_number
FROM your_table
) t
WHERE row_number > 20 AND row_number <= 30;
5条答案
按热度按时间cgyqldqp1#
已经6年了,不知道那时候能不能
我会在答案上添加一个顺序id,并搜索offset和offset + limit之间的寄存器
在纯Spark SQL查询中,偏移量为10,限制为10
在Pyspark上,情况会非常相似
它的灵活性和速度足够快,即使是大量的数据
kgsdhlau2#
如果dataframe中有重复的行,karthik's answer将失败。'except'将删除df1中所有在df2中的行。
5lhxktic3#
到目前为止,spark sql还不支持offset。你可以使用
DataFrames
方法来分页。示例:如果您希望使用分页限制10进行迭代,可以执行以下操作:
如果你想在第一次运行时执行
LIMIT 50, 100
,你可以执行以下操作:希望这有帮助!
irtuqstp4#
请在下面找到一个有用的PySpark(Python 3和Spark 3)类,名为SparkPaging,它抽象了分页机制:https://gitlab.com/enahwe/public/lib/spark/sparkpaging
下面是用法:
SparkPaging
用于对 Dataframe 和数据集进行分页的
示例
-初始化示例1:
通过指定一个限制来接近。
-初始化示例2:
方法通过指定页数(如果有休息,页数将增加)。
-初始化示例3:
通过指定一个限制来接近。
-初始化示例4:
方法通过指定页数(如果有休息,页数将增加)。
-重置:
-迭代示例:
-输出:
628mspwn5#
@Khanis Rok方法使用
monotonically_increasing_id()
,不保证连续编号,因此在此场景中使用它不是正确的函数。相反,您可以使用
row_number()
函数和Window规范来分配连续的行号。下面是Python中的一个例子:
下面是一个等价的例子: