scala—数据转换后将kafka流Dataframe保存到DataRicks中的redis

y4ekin9u  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(241)

在对数据进行聚合之后,我使用pyspark将kafka流定向到redis。最后的输出是流式数据流。
我连接到Kafka流的代码(你可能会发现我的代码是外行的工作,请忽略)

app_schema = StructType([
        StructField("applicationId",StringType(),True),
        StructField("applicationTimeStamp",StringType(),True)
    ])

# group_id = "mygroup"

topic = "com.mobile-v1"
bootstrap_servers = "server-1:9093,server-2:9093,server-3:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user@stream.com" password="xxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_mobile_apps_df = spark.readStream.format("kafka").options(**options).options().load()

kafka_mobile_apps_df = kafka_mobile_apps_df\
    .select(from_json(col("value").cast("string"), app_schema).alias("mob_apps"))

作为代理,这给了我流Dataframe。在此之后,我将数据聚合到count\ df,如图所示

count_df = kafka_mobile_apps_df.withColumn("diff_days", ((col("TimeStamp_")) - (col("TimeStamp")))/(60.0*60.0*24))\
                            .withColumn("within_7d_ind", when(col("diff_days") < 7.0, 1).otherwise(0))\
                            .groupBy("_applicationId")
                            .agg(sum(col("within_7d_ind")).alias(feature+"_7day_velocity"))

现在,我正试图将这个count\u df流写入redis。在我的resreach之后,我发现我可以使用“spark-redis 2.11”来连接spark-redis。
我不知道斯卡拉,我发现一个Sparkredis github exmaple与斯卡拉。有人能帮我用什么方法在pyspark中写这个count\u df到redis中
请在此处找到spark redis github
我已经安装了所需的jar“com。redislabs:spark-redis_2.12:2.5.0英寸。
谢谢。
刚刚发现他们还不支持python,请告诉我还有其他的方法写这个吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题