假设我有一个流Dataframea和一个大的静态Dataframeb。假设通常a的大小小于10000条记录。然而,b是一个更大的Dataframe,其大小在数百万范围内。
假设a和b都有一个“key”列。我想过滤a中的行,其中a.key在b中不存在。实现这一目标的最佳方法是什么。
现在,我已经试过了 A.join(B, Seq("key"), "left_anti")
. 然而,这一表现并不尽如人意。有什么办法可以加快进程吗
实际计划:
== Physical Plan ==
SortMergeJoin [domainName#461], [domain#147], LeftAnti
:- *(5) Sort [domainName#461 ASC NULLS FIRST], false, 0
: +- StreamingDeduplicate [domainName#461], state info [ checkpoint = hdfs://MTPrime-CO4-fed/MTPrime-CO4-0/projects/BingAdsAdQuality/Test/WhoIs/WhoIsStream/checkPoint/state, runId = 9d09398b-efda-41cb-ab77-1b5550cd5da9, opId = 0, ver = 63, numPartitions = 400], 0
: +- Exchange hashpartitioning(domainName#461, 400)
: +- Union
: :- *(2) Project [value#460 AS domainName#461]
: : +- *(2) Filter isnotnull(value#460)
: : +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#460]
: : +- MapPartitions <function1>, obj#459: java.lang.String
: : +- MapPartitions <function1>, obj#436: MTInterfaces.Fraud.RiskEntity
: : +- DeserializeToObject newInstance(class scala.Tuple3), obj#435: scala.Tuple3
: : +- Exchange RoundRobinPartitioning(600)
: : +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#142, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#143, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._3, true, false) AS _3#144]
: : +- *(1) MapElements <function1>, obj#141: scala.Tuple3
: : +- *(1) MapElements <function1>, obj#132: scala.Tuple3
: : +- *(1) DeserializeToObject createexternalrow(Body#60.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, EventTime#37, true, false), Timestamp#48L, Offset#27L, Partition#72.toString, PartitionKey#84.toString, Publisher#96.toString, SequenceNumber#108L, StructField(Body,StringType,true), StructField(EventTime,TimestampType,true), StructField(Timestamp,LongType,true), StructField(Offset,LongType,true), StructField(Partition,StringType,true), StructField(PartitionKey,StringType,true), StructField(Publisher,StringType,true), StructField(SequenceNumber,LongType,true)), obj#131: org.apache.spark.sql.Row
: : +- *(1) Project [cast(body#608 as string) AS Body#60, enqueuedTime#612 AS EventTime#37, cast(enqueuedTime#612 as bigint) AS Timestamp#48L, cast(offset#610 as bigint) AS Offset#27L, partition#609 AS Partition#72, partitionKey#614 AS PartitionKey#84, publisher#613 AS Publisher#96, sequenceNumber#611L AS SequenceNumber#108L]
: : +- Scan ExistingRDD[body#608,partition#609,offset#610,sequenceNumber#611L,enqueuedTime#612,publisher#613,partitionKey#614,properties#615,systemProperties#616]
: +- *(4) Project [value#453 AS domainName#455]
: +- *(4) Filter isnotnull(value#453)
: +- *(4) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#453]
: +- *(4) MapElements <function1>, obj#452: java.lang.String
: +- MapPartitions <function1>, obj#436: MTInterfaces.Fraud.RiskEntity
: +- DeserializeToObject newInstance(class scala.Tuple3), obj#435: scala.Tuple3
: +- ReusedExchange [_1#142, _2#143, _3#144], Exchange RoundRobinPartitioning(600)
+- *(8) Project [domain#147]
+- *(8) Filter (isnotnull(rank#284) && (rank#284 = 1))
+- Window [row_number() windowspecdefinition(domain#147, timestamp#151 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#284], [domain#147], [timestamp#151 DESC NULLS LAST]
+- *(7) Sort [domain#147 ASC NULLS FIRST, timestamp#151 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(domain#147, 400)
+- *(6) Project [domain#147, timestamp#151]
+- *(6) Filter isnotnull(domain#147)
+- *(6) FileScan csv [domain#147,timestamp#151] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://MTPrime-CO4-fed/MTPrime-CO4-0/projects/BingAdsAdQuality/Test/WhoIs], PartitionFilters: [], PushedFilters: [IsNotNull(domain)], ReadSchema: struct<domain:string,timestamp:string>
查询图快照:
编辑
现在我已经将查找数据移动到cosmosdb存储,并在其上创建了一个tempview(比如lookupdatea)。现在,我需要过滤那些不在商店里的。我正在探索以下选项:
1在流数据上创建tempview并进行查询 spark.sql(SELECT * FROM streamingdata s LEFT ANTI JOIN lookupdata l ON s.key = l.key")
与1相同,但执行内部子查询而不是左反联接。即 spark.sql("SELECT s.* FROM streamingdata s WHERE s.key NOT IN (SELECT key FROM lookupdata l)")
保持流式数据流的原样并执行过滤操作:
df.filter(x => { val key = x.getAs[String])("key")
spark.sql("SELECT * FROM lookupdata l WHERE l.key = '"+key+"'").isEmpty
})
哪一个更好?
2条答案
按热度按时间cvxl0en21#
对于(结构化)流式传输,不建议使用这种方法。假设你是一家拥有1亿客户的中国公司。你怎么看b排100米的比赛?
从我的上一个任务:如果引用数据的大数据集很明显,那么使用hbase,或者其他一些键值存储,比如cassandra,如果是volitile或non-volatile,那么使用mappartitions。但这更困难。设计师告诉我,数据工程师的任务可不容易。事实上,这并不是那么容易。但要走的路。
oxf4rvwz2#
请试一试