spark结构化流不以“格式”(“内存”)显示任何数据

vcirk6k6  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(317)

当我在下面做它是工作良好

company_info_df.select(col("value"))
             .writeStream()
              .outputMode("append")
              .option("truncate", false)
              .format("console")
              .trigger(Trigger.ProcessingTime("4 seconds"))
              .start();

但当我做如下操作时,即“.format”(“memory”),它不会显示任何内容

company_info_df.select(col("value"))
             .writeStream()
              .outputMode("append")
              .option("truncate", false)
              .format("memory")
              .queryName("company_info")
              .option("checkpointLocation", checkpointDir + "\\console")
              .trigger(Trigger.ProcessingTime("4 seconds"))
              .start();

        Dataset<Row> company_inf = sparkSession.sql("select * from company_info");

        company_inf.show();

我做错什么了?正确的方法是什么?

e0bqpujr

e0bqpujr1#

请参阅spark shell中适用于示例数据的以下代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

val userSchema = new StructType().add("col1", "string").add("col2", "string").add("col3", "string").add("col4", "string").add("col5", "string").add("col6", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("/user/Temp") //reads the stream as source files in a folder.
csvDF.createOrReplaceTempView("abcd");
val dbDf2 = spark.sql("select col2, sum(col6) from abcd group by col2");
dbDf2.writeStream.queryName("abcdquery").outputMode("complete").format("memory").start()

在您的代码中,尝试在写操作期间删除一些选项,看看哪里出错了。

相关问题