如何将spark结构化流连接到redis?

js81xvg6  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(428)

我的目标是从redis接收流数据并进行处理。如何通过spark结构化流连接和处理数据?

wxclj1h5

wxclj1h51#

要从spark中的redis流中读取数据,我们需要确定如何连接到redis,以及redis流中数据的模式结构。
要连接到redis,我们必须使用redis的连接参数创建一个新的sparksession:

import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis

object Samj45 {
    def main(args: Array[String]): Unit = {
         val spark = SparkSession
                     .builder()
                     .appName("redis-example")
                     .master("local[*]")
                     .config("spark.redis.host", "localhost")
                     .config("spark.redis.port", "6379")
                     .getOrCreate()

         val data_from_redis = spark
                     .readStream
                     .format("redis")
                     .option("stream.keys","data_clicks")
                     .schema(StructType(Array(
                           StructField("asset", StringType),
                           StructField("cost", LongType)
                      )))
                      .load()

对于写作,你可以用一个foreachwriter。如果这有帮助,请告诉我。

相关问题