spark结构化流动态查找

6qftjkof  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(536)

我是新来的。我们目前正在建设一条管道:
阅读Kafka主题中的事件
借助redis lookup丰富这些数据
将事件写入新的Kafka主题
所以,我的问题是,当我想使用spark redis库时,它的性能非常好,但在我的流媒体工作中数据保持静态。
尽管数据在redis中被刷新,但它并没有反映到我的Dataframe中。spark首先读取数据,然后从不更新数据。另外,我首先从redis数据中读取有关1mio key val字符串的总数据。
我能做什么样的方法,我想用redis作为内存中的动态查找。查找表几乎在1小时内改变。
谢谢。
使用的库:spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar
以下是代码部分:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 

val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))

import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )

val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")

import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()
jq6vz3qz

jq6vz3qz1#

一种选择是不使用spark redis,而是直接在redis中查找。这可以通过 df.mapPartitions 功能。你可以在这里找到一些Spark流的例子https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/. 结构化流的想法是相似的。注意正确处理redis连接。

rdlzhqv9

rdlzhqv92#

另一种解决方案是执行流静态连接(spark docs):
不要将redis rdd收集到驱动程序,而是使用redisDataframe(spark redis docs)作为一个静态Dataframe与您的流连接,因此它将类似于:

val redisStaticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(redisStaticDf, ...)

由于spark micro batch execution engine评估每个触发器上的查询执行,redisDataframe将获取每个触发器上的数据,为您提供最新数据(如果您将缓存Dataframe,则不会)

相关问题