我的目标是从redis接收流数据并进行处理。如何通过spark结构化流连接和处理数据?
wxclj1h51#
要从spark中的redis流中读取数据,我们需要确定如何连接到redis,以及redis流中数据的模式结构。要连接到redis,我们必须使用redis的连接参数创建一个新的sparksession:
import com.redislabs.provider.redis._ import redis.clients.jedis.Jedis object Samj45 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-example") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() val data_from_redis = spark .readStream .format("redis") .option("stream.keys","data_clicks") .schema(StructType(Array( StructField("asset", StringType), StructField("cost", LongType) ))) .load()
对于写作,你可以用一个foreachwriter。如果这有帮助,请告诉我。
1条答案
按热度按时间wxclj1h51#
要从spark中的redis流中读取数据,我们需要确定如何连接到redis,以及redis流中数据的模式结构。
要连接到redis,我们必须使用redis的连接参数创建一个新的sparksession:
对于写作,你可以用一个foreachwriter。如果这有帮助,请告诉我。