如何过滤在大Dataframe中键不存在的行

ztmd8pv5  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(541)

假设我有一个流Dataframea和一个大的静态Dataframeb。假设通常a的大小小于10000条记录。然而,b是一个更大的Dataframe,其大小在数百万范围内。
假设a和b都有一个“key”列。我想过滤a中的行,其中a.key在b中不存在。实现这一目标的最佳方法是什么。
现在,我已经试过了 A.join(B, Seq("key"), "left_anti") . 然而,这一表现并不尽如人意。有什么办法可以加快进程吗
实际计划:

== Physical Plan ==
SortMergeJoin [domainName#461], [domain#147], LeftAnti
:- *(5) Sort [domainName#461 ASC NULLS FIRST], false, 0
:  +- StreamingDeduplicate [domainName#461], state info [ checkpoint = hdfs://MTPrime-CO4-fed/MTPrime-CO4-0/projects/BingAdsAdQuality/Test/WhoIs/WhoIsStream/checkPoint/state, runId = 9d09398b-efda-41cb-ab77-1b5550cd5da9, opId = 0, ver = 63, numPartitions = 400], 0
:     +- Exchange hashpartitioning(domainName#461, 400)
:        +- Union
:           :- *(2) Project [value#460 AS domainName#461]
:           :  +- *(2) Filter isnotnull(value#460)
:           :     +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#460]
:           :        +- MapPartitions <function1>, obj#459: java.lang.String
:           :           +- MapPartitions <function1>, obj#436: MTInterfaces.Fraud.RiskEntity
:           :              +- DeserializeToObject newInstance(class scala.Tuple3), obj#435: scala.Tuple3
:           :                 +- Exchange RoundRobinPartitioning(600)
:           :                    +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#142, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#143, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._3, true, false) AS _3#144]
:           :                       +- *(1) MapElements <function1>, obj#141: scala.Tuple3
:           :                          +- *(1) MapElements <function1>, obj#132: scala.Tuple3
:           :                             +- *(1) DeserializeToObject createexternalrow(Body#60.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, EventTime#37, true, false), Timestamp#48L, Offset#27L, Partition#72.toString, PartitionKey#84.toString, Publisher#96.toString, SequenceNumber#108L, StructField(Body,StringType,true), StructField(EventTime,TimestampType,true), StructField(Timestamp,LongType,true), StructField(Offset,LongType,true), StructField(Partition,StringType,true), StructField(PartitionKey,StringType,true), StructField(Publisher,StringType,true), StructField(SequenceNumber,LongType,true)), obj#131: org.apache.spark.sql.Row
:           :                                +- *(1) Project [cast(body#608 as string) AS Body#60, enqueuedTime#612 AS EventTime#37, cast(enqueuedTime#612 as bigint) AS Timestamp#48L, cast(offset#610 as bigint) AS Offset#27L, partition#609 AS Partition#72, partitionKey#614 AS PartitionKey#84, publisher#613 AS Publisher#96, sequenceNumber#611L AS SequenceNumber#108L]
:           :                                   +- Scan ExistingRDD[body#608,partition#609,offset#610,sequenceNumber#611L,enqueuedTime#612,publisher#613,partitionKey#614,properties#615,systemProperties#616]
:           +- *(4) Project [value#453 AS domainName#455]
:              +- *(4) Filter isnotnull(value#453)
:                 +- *(4) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#453]
:                    +- *(4) MapElements <function1>, obj#452: java.lang.String
:                       +- MapPartitions <function1>, obj#436: MTInterfaces.Fraud.RiskEntity
:                          +- DeserializeToObject newInstance(class scala.Tuple3), obj#435: scala.Tuple3
:                             +- ReusedExchange [_1#142, _2#143, _3#144], Exchange RoundRobinPartitioning(600)
+- *(8) Project [domain#147]
   +- *(8) Filter (isnotnull(rank#284) && (rank#284 = 1))
      +- Window [row_number() windowspecdefinition(domain#147, timestamp#151 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#284], [domain#147], [timestamp#151 DESC NULLS LAST]
         +- *(7) Sort [domain#147 ASC NULLS FIRST, timestamp#151 DESC NULLS LAST], false, 0
            +- Exchange hashpartitioning(domain#147, 400)
               +- *(6) Project [domain#147, timestamp#151]
                  +- *(6) Filter isnotnull(domain#147)
                     +- *(6) FileScan csv [domain#147,timestamp#151] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://MTPrime-CO4-fed/MTPrime-CO4-0/projects/BingAdsAdQuality/Test/WhoIs], PartitionFilters: [], PushedFilters: [IsNotNull(domain)], ReadSchema: struct<domain:string,timestamp:string>

查询图快照:

编辑
现在我已经将查找数据移动到cosmosdb存储,并在其上创建了一个tempview(比如lookupdatea)。现在,我需要过滤那些不在商店里的。我正在探索以下选项:
1在流数据上创建tempview并进行查询 spark.sql(SELECT * FROM streamingdata s LEFT ANTI JOIN lookupdata l ON s.key = l.key") 与1相同,但执行内部子查询而不是左反联接。即 spark.sql("SELECT s.* FROM streamingdata s WHERE s.key NOT IN (SELECT key FROM lookupdata l)") 保持流式数据流的原样并执行过滤操作:

df.filter(x => { val key = x.getAs[String])("key")
  spark.sql("SELECT * FROM lookupdata l WHERE l.key = '"+key+"'").isEmpty
})

哪一个更好?

cvxl0en2

cvxl0en21#

对于(结构化)流式传输,不建议使用这种方法。假设你是一家拥有1亿客户的中国公司。你怎么看b排100米的比赛?
从我的上一个任务:如果引用数据的大数据集很明显,那么使用hbase,或者其他一些键值存储,比如cassandra,如果是volitile或non-volatile,那么使用mappartitions。但这更困难。设计师告诉我,数据工程师的任务可不容易。事实上,这并不是那么容易。但要走的路。

oxf4rvwz

oxf4rvwz2#

请试一试

from pyspark.sql.functions import broadcast
A.join(broadcast(B), Seq("key"), "left_anti")

相关问题