pyspark 将一行与另一个表中的行进行匹配,以便能够对数据砖中的行进行分类

dgiusagp  于 2023-10-15  发布在  Spark
关注(0)|答案(3)|浏览(127)

如何使用Combinations表的行值对Clients表的值进行分类?
我决定创建一个组合表,以开发主行(客户表)的所有组合。
我计划检查客户行是否与组合表的一行一致,以将其分类为扇区B(组合表)。
我有这个流程,但Dtabricks返回错误:

for i,j in select_df.iterrows():
      for u,v in dfCombinacionesDias.iterrows():
          if (
              (select_df["MONDAY"][i] == registro["LUNES"][u]) 
              and (select_df["TUESDAY"][i] == registro["MARTES"][u]) 
              and (select_df["WEDNESDAY"][i] == registro["MIERCOLES"][u]) 
              and (select_df["THURSDAY"][i] == registro["JUEVES"][u]) 
              and (select_df["FRIDAY"][i] == registro["VIERNES"][u]) 
              and (select_df["SATURDAY"][i] == registro["SABADO"][u]) 
              and (select_df["SUNDAY"][i] == registro["DOMINGO"][u])
          ):
              Sector = "B"
          else:
              Sector = "A"
        
vSubSeq = "('{}','{}')".format(select_df["IDClient"][i],Sector)
sqlInsertSequence = "Insert into {0}.{1} values {2}".format(dSCHEMA, Table, vSubSeq,vdataDeltaPath)
print(sqlInsertSequence)
dfTables = spark.sql(sqlInsertSequence)

我添加图像与不同的表(客户端,组合和部门)

我想我需要一个for来循环一个表的行(组合表),以比较与客户端表中的一行,如果有一个匹配,我保存这个值在一个新的表(扇区表),显然会存在其他的for来循环客户端表。但我想知道一个算法,帮助查找表进行比较?

nnt7mjpx

nnt7mjpx1#

创意

我假设posted数据示例中的"x"像布尔触发器一样工作。那么,为什么不用True替换它,用False替换空的空间呢?之后,我们可以直接将逻辑运算符应用于数据。例如,客户的天数不符合"Sector B"模式意味着什么?在示意图中,它表示any(client_days and not sector_b) is True,如以下模型所示:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
client_days = pd.Series([0,1,0,0,1,0,0], index=week_days)
sector_b = pd.Series([1,0,1,0,1,0,0], index=week_days)

assert any(client_days & ~sector_b)

如何在Pandas中实现

pandas 1.5.1
让我们在Pandas中对这个想法进行建模,就好像我们可以将toPandas应用于原始数据:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
data = [
    [0,1,0,0,1,0,0],
    [1,0,1,0,1,0,0],
    [1,0,1,0,0,0,0],
    [1,0,0,0,0,0,0],
    [1,0,0,0,1,0,0],
    [0,0,1,0,1,0,0],
    [0,0,0,0,1,0,0],
    [0,0,1,0,0,0,0],
    [1,1,1,1,1,1,1],
    [1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
    data,
    index=1 + pd.Index(range(len(data)), name='Client'),
    columns=week_days,
    dtype=bool
)
sectors = pd.DataFrame(
    data=[[1,0,1,0,1,0,0]], 
    index=pd.Index(['Sector B'], name='sector'),
    columns=week_days,
    dtype=bool,
)

在这种情况下,我们可以使用dot操作符,即。scalar product,记住加法和乘法对应于布尔数据情况下的或/与运算:

answer = (clients @ ~sectors.loc['Sector B']).map({True: 'A', False: 'B'})

PySpark实现

pyspark 3.4.1
假设由于某种原因我们不能使用toPandas。让我们重新组织数据,就像它们是PySpark DataFrame一样:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())

我们如何实现仅限于这种数据类型的想法?首先,扇区的数据很小,我们可以按照一定的顺序(例如:接下来,我们可以将map应用于逻辑AND,然后将reduce应用于逻辑OR,这为"Sector A"情况提供了True,否则为"False"。之后,我们将pyspark.sql.functions中的when应用于map值:

from pyspark.sql.functions import lit, when
from functools import reduce

client_data = clients_sdf[week_days]
sector_b = [*sectors_sdf.where('sector == "Sector B"')[week_days].first()]
not_in_B = map(lambda x, y: x & lit(not y), client_data, sector_b)
is_in_sector_A = reduce(lambda x, y: x | y, not_in_B)
client_sector = when(is_in_sector_A, 'A').otherwise('B')
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')

输出量:

>>> answer.show()
+------+------+
|Client|Sector|
+------+------+
|     1|     A|
|     2|     B|
|     3|     B|
|     4|     B|
|     5|     B|
|     6|     B|
|     7|     B|
|     8|     B|
|     9|     A|
|    10|     B|
+------+------+

一般情况

这只是一个幻想,它可能看起来像在一般情况下。假设我们有这些数据:

import pandas as pd

week_days = 'mon tue wed thu fri sat sun'.split()
data = [
    [0,1,0,1,0,0,0],    # changed to fit a new Sector A
    [1,0,1,0,1,0,0],
    [1,0,1,0,0,0,0],
    [1,0,0,0,0,0,0],
    [1,0,0,0,1,0,0],
    [0,0,1,0,1,0,0],
    [0,0,0,0,1,0,0],
    [0,0,1,0,0,0,0],
    [1,1,1,1,1,1,1],    # fit Sector C
    [1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
    data,
    index=1 + pd.Index(range(len(data)), name='Client'),
    columns=week_days,
    dtype=bool
)
sectors = pd.DataFrame(     # add Sector A, Sector C
    data=[[0,1,0,1,0,1,0], [1,0,1,0,1,0,0], [1,1,1,1,1,1,1]], 
    index=pd.Index(['Sector A', 'Sector B', 'Sector C'], name='sector'),
    columns=week_days,
    dtype=bool,
)

我们可以在这里看到3个扇区,大概是按优先级降序排列的,我们可能想在最后一帧中用它们的最后一个字母表示。
让我们在Pandas中实现:

isin_sector = ~(clients @ ~sectors.T)

answer = (
    isin_sector
    .apply(lambda column: column.map({True: column.name[-1]}))
    .agg(lambda row: row.dropna()[0], axis='columns') 
)

display(answer)

现在在PySpark中,试图避免Pandas API。在这里,当应用coalesce时,我依赖于Python中的字典保留项目添加顺序的事实:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, coalesce
from functools import reduce

spark = SparkSession.builder.getOrCreate()

clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())

client_data = clients_sdf[week_days]

def is_in_sector(sector):
    '''sector : a boolean sequence'''
    return ~reduce(lambda x, y: x | y, 
                   map(lambda x, y: x & lit(not y), 
                       client_data, sector))

sectors = {
    (rest:=rec.asDict()).pop('sector')[-1]: is_in_sector(rest.values())
    for rec in sectors_sdf.collect()
}
client_sector = coalesce(
    *(when(is_in_sec, sec_name) for sec_name, is_in_sec in sectors.items())
)
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
answer.show()
bxgwgixi

bxgwgixi2#

我有这个流程,但Dtabricks返回错误:
返回错误例如,您正在使用registro,它似乎没有在您提供的代码提取中的任何地方定义。
而且您没有有效地使用Databricks的功能。在DataFrames(Spark)的行上迭代效率很低,尤其是在嵌套循环中。
相反,您可以使用Spark的DataFrame APIs(来自Apache Spark API reference)来更有效地获得所需的结果。
作为一种替代方法,* 不 * 使用嵌套循环,您可以:

  • 重命名Combinations DataFrame的列以匹配Clients DataFrame的列。
  • 在所有日期列上连接Clients DataFrame和Combinations DataFrame。

连接后,Clients DataFrame中与Combinations DataFrame匹配的任何行都将被分类为“B“。没有匹配的字符串将是“A“。

from pyspark.sql.functions import col, when

# Assuming you have loaded your data into two DataFrames: df_clients and df_combinations

# Step 1: Rename columns in df_combinations to match df_clients
df_combinations = df_combinations.withColumnRenamed("LUNES", "MONDAY")\
                                 .withColumnRenamed("MARTES", "TUESDAY")\
                                 .withColumnRenamed("MIERCOLES", "WEDNESDAY")\
                                 .withColumnRenamed("JUEVES", "THURSDAY")\
                                 .withColumnRenamed("VIERNES", "FRIDAY")\
                                 .withColumnRenamed("SABADO", "SATURDAY")\
                                 .withColumnRenamed("DOMINGO", "SUNDAY")

# Step 2: Join df_clients with df_combinations
df_joined = df_clients.join(df_combinations, on=["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"], how="left_outer")

# Step 3: Create a new column "Sector" based on whether there is a match in df_combinations
df_result = df_joined.withColumn("Sector", when(col("MONDAY").isNotNull(), "B").otherwise("A"))

# Step 4: If you want to store the result in another table
df_result.select("IDClient", "Sector").write.format("delta").save("/path/to/save/location")

它使用DataFrame转换和显式循环上的动作来对Spark DataFrame进行操作。

ddrv8njm

ddrv8njm3#

答案如下:

import pandas as pd

classification_results = []

for i, row_client in select_df.iterrows():
    Sector = "A"  # Initialize Sector as A
    # Loop through each row in the Combinations table
    for u, row_combination in dfCombinacionesDias.iterrows():
        if (
            (row_client["MONDAY"] == row_combination["LUNES"]) and
            (row_client["TUESDAY"] == row_combination["MARTES"]) and
            (row_client["WEDNESDAY"] == row_combination["MIERCOLES"]) and
            (row_client["THURSDAY"] == row_combination["JUEVES"]) and
            (row_client["FRIDAY"] == row_combination["VIERNES"]) and
            (row_client["SATURDAY"] == row_combination["SABADO"]) and
            (row_client["SUNDAY"] == row_combination["DOMINGO"])
        ):
            Sector = "B"
            break  

    # Append the classification result to the list
    classification_results.append({"IDClient": row_client["IDClient"], "Sector": Sector})

# Create a DataFrame from the classification_results list
sector_df = pd.DataFrame(classification_results)

print(sector_df)

说明:此代码段有助于根据Combinations表中的匹配行对Clients表中的值进行分类。它遍历Clients表中的每一行,并根据星期几检查Combinations表中是否有匹配的行。如果找到匹配,则将客户端分类为扇区“B”,否则将其分类为扇区“A”。分类结果存储在sector_df DataFrame中。

相关问题