pyspark结构化流式查询-dashbord可见性中的查询

hec6srdp  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(426)

我编写了一些示例代码,这些代码连接到kafka代理,从主题中读取数据并将其放入snappydata表。

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.snappy import SnappySession 
from pyspark.rdd import RDD
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, explode, split
import time
import sys

def main(snappy):
    logger = logging.getLogger('py4j')
    logger.info("My test info statement")

    sns = snappy.newSession()
    df = sns \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.0.0.4:9092") \
    .option("subscribe", "test_import3") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "latest") \
    .load()
    bdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    streamingQuery = bdf\
    .writeStream\
    .format("snappysink") \
    .queryName("Devices3") \
    .trigger(processingTime="30 seconds") \
    .option("tablename","devices2") \
    .option("checkpointLocation","/tmp") \
    .start()

    streamingQuery.awaitTermination()

if __name__ == "__main__":
    from pyspark.sql.snappy import SnappySession
    from pyspark import SparkContext, SparkConf

    sc = SparkSession.builder.master("local[*]").appName("test").config("snappydata.connection", "10.0.0.4:1527").getOrCreate()
    snc = SnappySession(sc)
    main(snc)

我是用命令提交的

/opt/snappydata/bin/spark-submit --master spark://10.0.0.4:1527 /path_to/file.py --conf snappydata.connection=10.0.0.4:1527

一切正常,数据从kafka主题读取并写入snappydata表。我不明白为什么在snappydata Jmeter 板ui中看不到这个流式查询-在控制台提交pyspark代码后,我看到新的spark主ui启动了。
如何从pyspark连接到snappydata internal spark master呢?

f45qwnt8

f45qwnt81#

snappydata支持python作业只能在智能连接器模式下提交,这意味着它总是通过单独的spark集群启动,以便与snappydata集群通信。因此,可以看到python作业出现在这个spark集群的ui上,而不是snappydata的 Jmeter 板上。

相关问题