有没有办法在给定的逻辑中分割spark数据集

f4t66c6m  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(263)

我正在寻找spark数据集分割应用程序,它类似于下面提到的逻辑。

>>> import pandas as pd
>>> import numpy as np

>>> df1 = pd.DataFrame(np.random.randn(10, 4), columns=['a', 'b', 'c', 'd'])
>>> df1
          a         b         c         d
0 -0.398502 -1.083682  0.850632 -1.443868
1 -2.124333  1.093590 -0.338162 -1.414035
2  0.753560  0.600687 -0.998277 -2.094359
3 -0.635962 -0.291226  0.428961  1.158153
4 -0.900506 -0.545226 -0.448178 -0.567717
5  0.112911  0.351649  0.788940  2.071541
6 -0.358625  0.500367  1.009819 -1.139761
7  1.003608  0.246925  0.225138 -0.586727
8  0.355274 -0.540685  1.482472  0.364989
9  3.089610 -1.415088 -0.072107 -0.203137
>>>

>>> mask = df1.applymap(lambda x: x <-0.7)
>>>
>>> mask
       a      b      c      d
0  False   True  False   True
1   True  False  False   True
2  False  False   True   True
3  False  False  False  False
4   True  False  False  False
5  False  False  False  False
6  False  False  False   True
7  False  False  False  False
8  False  False  False  False
9  False   True  False  False
>>> mask.any(axis=1)
0     True
1     True
2     True
3    False
4     True
5    False
6     True
7    False
8    False
9     True
dtype: bool
>>> df1 = df1[-mask.any(axis=1)]
>>> df1
          a         b         c         d
3 -0.635962 -0.291226  0.428961  1.158153
5  0.112911  0.351649  0.788940  2.071541
7  1.003608  0.246925  0.225138 -0.586727
8  0.355274 -0.540685  1.482472  0.364989
>>>

在Spark中我去了 df.filter 但它只是尝试挑选匹配,但在我的情况下,我需要过滤(删除)数据到3-4级。上面只显示了一个级别。这是一种过滤。

kgqe7b3p

kgqe7b3p1#

在spark应用中,由于rdd抽象的假设,保序是非常困难的。最好的方法是使用sparkapi来翻译pandas逻辑,就像我在这里所做的那样。不幸的是,我不认为您可以对每一列应用相同的筛选条件,因此我不得不手动将掩码转换为对多个列的操作。databricks的这篇博客文章对任何从Pandas过渡到spark的人都很有帮助。

import pandas as pd
import numpy as np
np.random.seed(1000)
df1 = pd.DataFrame(np.random.randn(10, 4), columns=['a', 'b', 'c', 'd'])
mask = df1.applymap(lambda x: x <-0.7)
df2 = df1[-mask.any(axis=1)]

我们想要的结果是:

a         b         c         d
1 -0.300797  0.389475 -0.107437 -0.479983
5 -0.334835 -0.099482  0.407192  0.919388
6  0.312118  1.533161 -0.550174 -0.383147
8 -0.326925 -0.045797 -0.304460  1.923010

所以在spark中,我们使用pandasDataframe创建Dataframe并使用 filter 要获得正确的结果集:

df1_spark = sqlContext.createDataFrame(df1).repartition(10)
df2_spark = df1_spark.filter(\
   (df1_spark.a > -0.7)\
 & (df1_spark.b > -0.7)\
 & (df1_spark.c > -0.7)\
 & (df1_spark.d > -0.7)\
 )

这给了我们正确的结果(注意顺序没有保留):

df2_spark.show()
+-------------------+--------------------+--------------------+-------------------+
|                  a|                   b|                   c|                  d|
+-------------------+--------------------+--------------------+-------------------+
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314|  0.919387539204449|
| 0.3121180100663634|  1.5331610653579348| -0.5501738650283003|-0.3831474108842978|
|-0.3007966727870205|  0.3894745542873072|-0.10743730169089667|-0.4799830753607686|
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845|  1.923010130400007|
+-------------------+--------------------+--------------------+-------------------+

如果您确实需要使用pandas创建掩码,则必须保留原始pandasDataframe的索引,并通过创建广播变量和基于索引列的过滤从spark中删除单个记录。举个例子,ymmv。
添加索引:

df1['index_col'] = df1.index

df1
          a         b         c         d  index_col
0 -0.804458  0.320932 -0.025483  0.644324          0
1 -0.300797  0.389475 -0.107437 -0.479983          1
2  0.595036 -0.464668  0.667281 -0.806116          2
3 -1.196070 -0.405960 -0.182377  0.103193          3
4 -0.138422  0.705692  1.271795 -0.986747          4
5 -0.334835 -0.099482  0.407192  0.919388          5
6  0.312118  1.533161 -0.550174 -0.383147          6
7 -0.822941  1.600083 -0.069281  0.083209          7
8 -0.326925 -0.045797 -0.304460  1.923010          8
9 -0.078659 -0.582066 -1.617982  0.867261          9

将掩码转换为spark广播变量:

myIdx = sc.broadcast(df2.index.tolist())

使用spark api创建和修改Dataframe:

df1_spark.rdd.filter(lambda row: row and row['index_col'] not in myIdx.value).collect()
df2_spark = df1_spark.rdd.filter(lambda row: row and row['index_col'] in myIdx.value).toDF()

df2_spark.show()
+-------------------+--------------------+--------------------+-------------------+---------+
|                  a|                   b|                   c|                  d|index_col|
+-------------------+--------------------+--------------------+-------------------+---------+
|-0.3007966727870205|  0.3894745542873072|-0.10743730169089667|-0.4799830753607686|        1|
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314|  0.919387539204449|        5|
| 0.3121180100663634|  1.5331610653579348| -0.5501738650283003|-0.3831474108842978|        6|
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845|  1.923010130400007|        8|
+-------------------+--------------------+--------------------+-------------------+---------+
z4iuyo4d

z4iuyo4d2#

我想,它应该如何过滤,在3层,所有的解决方案是在 df.except 命令。
假设下面提到的数据 office_people.csv+---+------------------+--------------------+--------------------+------+---------------+ | id| full_name| job_title| email|gender| ip_address| +---+------------------+--------------------+--------------------+------+---------------+ | 1| Walter Lawrence| Graphic Designer|wlawrence0@answer...| Male| 179.89.185.194| | 2| Mark Reynolds| Structural Engineer| mreynolds1@ihg.com| Male| 23.192.227.122| | 3| Gregory Jones| Design Engineer| gjones2@prweb.com| Male| 75.232.148.42| | 4| Ashley Clark| Staff Scientist| aclark3@unicef.org|Female| 18.103.212.244| | 5| Dorothy Harvey| Design Engineer| dharvey4@ucoz.com|Female| 180.119.92.27| | 6| Laura Allen| Tax Accountant|lallen5@shutterfl...|Female| 194.60.142.75| | 7| Richard Knight| Staff Scientist|rknight6@sourcefo...| Male| 5.25.210.201| | 8| Gregory Carpenter|Payment Adjustmen...|gcarpenter7@googl...| Male| 92.16.231.195| | 9| Sean Thompson| Teacher|sthompson8@twitpi...| Male| 4.216.52.79| | 10| Frances Stephens|Human Resources M...|fstephens9@artist...|Female| 181.11.246.116| | 11| Louis Little| Nurse Practicioner| llittlea@lycos.com| Male|209.135.198.222| | 12| Frances Perry| Quality Engineer| fperryb@youtu.be|Female| 173.162.64.208| | 13| Russell Hanson| Web Developer II|rhansonc@wikimedi...| Male| 57.81.25.130| | 14| Michelle Wallace| Technical Writer| mwallaced@time.com|Female| 56.17.86.56| | 15| Keith Patterson|VP Product Manage...|kpattersone@globo...| Male| 252.146.42.238| | 16| Kathleen Howard| Programmer III|khowardf@accuweat...|Female| 235.163.98.206| | 17|Russell Cunningham| VP Marketing|rcunninghamg@berk...| Male| 72.197.113.247| | 18| Henry Dixon| Developer III| hdixonh@fema.gov| Male| 63.144.255.192| | 19| Martha Jackson| Electrical Engineer|mjacksoni@cyberch...|Female|167.209.159.139| | 20| Sean Kelly|Systems Administr...| skellyj@opera.com| Male| 6.183.241.141| +---+------------------+--------------------+--------------------+------+---------------+ only showing top 20 rowsDataset<Row> people_non_Engineer = people.except(people_Engineer); 简单地分开 else 过滤器的一部分(虽然我用java做过)

import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Dataset;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class starthere {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Example App");

       JavaSparkContext sc = new JavaSparkContext(conf);

       // SQLContext sqlContext = new SQLContext(sc);

       SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL Example")
                .config("spark.some.config.option", "some-value")
                .config("spark.sql.warehouse.dir", "file:///C:/tmp/") 
                .getOrCreate();

       StructType customSchema = new StructType(new StructField[] {
                new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
                new StructField("full_name", DataTypes.StringType, true, Metadata.empty()),
                new StructField("job_title", DataTypes.StringType, true, Metadata.empty()),
                new StructField("email", DataTypes.StringType, true, Metadata.empty()),
                new StructField("gender", DataTypes.StringType, true, Metadata.empty()),
                new StructField("ip_address", DataTypes.StringType, true, Metadata.empty())
            });

        Dataset<Row> people  =  spark.read()
                .format("com.databricks.spark.csv")
                .schema(customSchema)    
                .option("header", "true")
                .load("office_people.csv");

        people.show();
        Dataset<Row> people_Engineer = people.filter(col("job_title").contains("Engineer"));
        people_Engineer.show();
        Dataset<Row> people_non_Engineer = people.except(people_Engineer);
        people_non_Engineer.show();
        Dataset<Row> people_Sys_Admin = people.filter(col("job_title").contains("Systems Admin"));
        Dataset<Row> people_non_tech = people_non_Engineer.except(people_Sys_Admin);
        people_non_tech.show();
        Dataset<Row> people_Tech_Writer = people.filter(col("job_title").contains("Technical Writer"));
        Dataset<Row> people_non_tech_people = people_non_tech.except(people_Tech_Writer);
        people_non_tech_people.show();
        }
}

相关问题