hive 迭代多个查询并将其存储在pyspark Dataframe 中

2ekbmq32  于 2022-11-05  发布在  Hive
关注(0)|答案(1)|浏览(228)

我在hive中有一个表,我想在循环中的一个条件下查询它,并将结果动态存储在多个pysark Dataframe 中。

基本查询

g1 = """
    select * from db.hive_table where group =  1
"""

group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()

总共有80个组,当前分别为Group = 2、Group = 3等运行上述代码。

我无用的迭代代码


# changes the geometry type to obj

df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
         group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
         group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
         group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
         group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
         group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
         group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
         group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,

# num_list=[1,2,3,4,5,5,6,6]

for d in df_list:
    for i in range(1,80):
         gi = """
        select * from db.hive_table where group =  $i
        """

        group_i = spk.sql(gi)
        print(group_i.show(3))
        print(group_i.printSchema())
        print((group_i.count(), len(group_i.columns)))
        return group_i = group_i.toPandas()

请求帮助指导我解决这个问题,并帮助我增加我的编码知识。
先谢谢你。

fykwrbwg

fykwrbwg1#

使用列表

python/pyspark不允许你动态创建变量名,但是你可以创建一个 Dataframe 列表,比如sdf_list[0].show()sdf_list[1].toPandas()

sdf_list = []

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_list.append((i, filtered_sdf))  # (<filter/group identifier>, <spark dataframe>)
    del filtered_sdf

现在,sdf_list具有可以使用列表索引访问的spark Dataframe 的列表。例如,第一个 Dataframe 可以使用[0]访问,并且打印将验证它是 Dataframe 。

print(sdf_list[0])

# (1, DataFrame[col1: bigint, dt: date, col3: bigint])

# (<filter/group identifier>, <spark dataframe>)

该列表可以被迭代,并且其中的所有 Dataframe 可以被单独使用,

for (i, sdf) in sdf_list[:2]:
    print("dataframe {0}'s count:".format(i), sdf.count())

# dataframe 1's count: 20

# dataframe 2's count: 30

您可以随意使用它。

sdf_list[0][1].count()  # [0] returns the tuple - (<sdf identifier>, <sdf>)

# 20

sdf_list[0][1].show(2)

# etc...

假设你也希望所有的spark Dataframe 都是Pandas Dataframe ,如果你希望它是动态的,你需要创建一个 Dataframe 列表,或者只使用索引访问spark Dataframe 。


# using indices

group1_pdf = sdf_list[0][1].toPandas()

# creating list of pandas dataframes

pdf_list = []

for (i, sdf) in sdf_list:
    pdf_list.append((i, sdf.toPandas()))  # (<filter/group identifier>, <pandas dataframe>)

type(pdf_list)

# list

type(pdf_list[0])

# tuple

type(pdf_list[0][1])

# pandas.core.frame.DataFrame

使用字典

我们也可以使用字典来存储 Dataframe ,并使用键来跟踪它,因此,键可以作为 Dataframe 的名称。

sdf_dict = {}

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_dict['group'+str(i)] = filtered_sdf
    del filtered_sdf

字典中会有可以使用键访问的 Dataframe ,让我们简单地打印前两个键,然后检查我们有什么值。

list(sdf_dict.keys())[:2]

# ['group1', 'group2']

sdf_dict['group1']

# DataFrame[col1: bigint, dt: date, col3: bigint]

sdf_dict['group1'].count()

# 20

您可以选择迭代dict键并使用spark Dataframe 。

for sdf_key in list(sdf_dict.keys())[:2]:
    print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())

# group1's record count: 20

# group2's record count: 30

您可以查看type()以获得更好的理解。

type(sdf_dict)

# dict

type(sdf_dict['group1'])

# pyspark.sql.dataframe.DataFrame

转换为Pandas Dataframe 很简单


# single df manually

group1_pdf = sdf_dict['group1'].toPandas()

# with iteration

pdf_dict = {}

for sdf_key in sdf_dict.keys():
    pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()

type(pdf_dict)

# dict

type(pdf_dict['group1'])

# pandas.core.frame.DataFrame

相关问题