如何实现spark sql分页查询

anhgbhbe  于 2023-04-21  发布在  Apache
关注(0)|答案(5)|浏览(516)

有没有人如何在spark sql query中进行分页?
我需要使用spark sql,但不知道如何分页。
尝试:

select * from person limit 10, 10
cgyqldqp

cgyqldqp1#

已经6年了,不知道那时候能不能
我会在答案上添加一个顺序id,并搜索offset和offset + limit之间的寄存器
在纯Spark SQL查询中,偏移量为10,限制为10

WITH count_person AS (
    SELECT *, monotonically_increasing_id() AS count FROM person)
SELECT * FROM count_person WHERE count > 10 AND count < 20

在Pyspark上,情况会非常相似

import pyspark.sql.functions as F

offset = 10
limit = 10
df = df.withColumn('_id', F.monotonically_increasing_id())
df = df.where(F.col('_id').between(offset, offset + limit))

它的灵活性和速度足够快,即使是大量的数据

kgsdhlau

kgsdhlau2#

如果dataframe中有重复的行,karthik's answer将失败。'except'将删除df1中所有在df2中的行。

val filteredRdd = df.rdd.zipWithIndex().collect { case (r, i) if 10 >= start && i <=20 => r }
val newDf = sqlContext.createDataFrame(filteredRdd, df.schema)
5lhxktic

5lhxktic3#

到目前为止,spark sql还不支持offset。你可以使用DataFrames方法来分页。
示例:如果您希望使用分页限制10进行迭代,可以执行以下操作:

DataFrame df1;
    long count = df.count();
    int limit = 10;
    while(count > 0){
        df1 = df.limit(limit);
        df1.show();            //will print 10, next 10, etc rows
        df = df.except(df1);
        count = count - limit;
    }

如果你想在第一次运行时执行LIMIT 50, 100,你可以执行以下操作:

df1 = df.limit(50);
        df2 = df.except(df1);
        df2.limit(100);       //required result

希望这有帮助!

irtuqstp

irtuqstp4#

请在下面找到一个有用的PySpark(Python 3和Spark 3)类,名为SparkPaging,它抽象了分页机制:https://gitlab.com/enahwe/public/lib/spark/sparkpaging
下面是用法:

SparkPaging

用于对 Dataframe 和数据集进行分页的

示例

-初始化示例1:

通过指定一个限制来接近。

sp = SparkPaging(initData=df, limit=753)

-初始化示例2:

方法通过指定页数(如果有休息,页数将增加)。

sp = SparkPaging(initData=df, pages=6)

-初始化示例3:

通过指定一个限制来接近。

sp = SparkPaging()
sp.init(initData=df, limit=753)

-初始化示例4:

方法通过指定页数(如果有休息,页数将增加)。

sp = SparkPaging()
sp.init(initData=df, pages=6)

-重置:

sp.reset()

-迭代示例:

print("- Total number of rows = " + str(sp.initDataCount))
print("- Limit = " + str(sp.limit))
print("- Number of pages = " + str(sp.pages))
print("- Number of rows in the last page = " + str(sp.numberOfRowsInLastPage))
while (sp.page < sp.pages-1):
    df_page = sp.next()
    nbrRows = df_page.count()
    print("  Page " + str(sp.page) + '/' + str(sp.pages) + ": Number of rows = " + str(nbrRows))

-输出:

- Total number of rows = 4521
- Limit = 753
- Number of pages = 7
- Number of rows in the last page = 3
    Page 0/7: Number of rows = 753
    Page 1/7: Number of rows = 753
    Page 2/7: Number of rows = 753
    Page 3/7: Number of rows = 753
    Page 4/7: Number of rows = 753
    Page 5/7: Number of rows = 753
    Page 6/7: Number of rows = 3
628mspwn

628mspwn5#

@Khanis Rok方法使用monotonically_increasing_id(),不保证连续编号,因此在此场景中使用它不是正确的函数。
相反,您可以使用row_number()函数和Window规范来分配连续的行号。
下面是Python中的一个例子:

from pyspark.sql import Window
from pyspark.sql.functions import row_number

offset = 20
num_rows = 10

# Sort the DataFrame based on a column
sorted_df = df.sort("your_sort_column")

# Create a window spec
window_spec = Window.orderBy("your_sort_column")

# Add a row number column using the window spec
df_with_row_number = sorted_df.withColumn(
    "row_number", row_number().over(window_spec)
)

# Filter the DataFrame based on the row number column to apply offset and limit
result_df = df_with_row_number.filter(
    (df_with_row_number.row_number > offset)
    & (df_with_row_number.row_number <= offset + num_rows)
)

# Perform an action to trigger the computation and display the result
result_df.show()

下面是一个等价的例子:

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (ORDER BY your_sort_column) as row_number
  FROM your_table
) t
WHERE row_number > 20 AND row_number <= 30;

相关问题