我想将onehotencoder应用于流式Dataframe中的多个列,但出现以下错误。有什么建议吗?
非常感谢!
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources
must be executed with writeStream.start();;
代码://读取csv
val Stream = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.option("header", "true")
.schema(DFschema)
.load("C:/[...]"/
//Kafka
val properties = new Properties()
//val topic = "mongotest"
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
Stream.selectExpr("CAST(Col AS STRING) AS KEY",
"to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("topic", "predict")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:[...]")
.start()
订阅主题
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "predict")
.load()
val df = Stream
.selectExpr("CAST(value AS STRING)")
val jsons = df.select(from_json($"value", DFschema) as "data").select("data.*")
etl[…]
将函数bucketizer()应用于字段
val Msplits = Array(Double.NegativeInfinity,7, 14, 21, Double.PositiveInfinity)
val bucketizerM = new Bucketizer()
.setInputCol("MEASURE")
.setOutputCol("MEASURE_c")
.setSplits(Msplits)
val bucketedData1 = bucketizerD.transform(out)
val bucketedData2 = bucketizerM.transform(bucketedData1) # Works
使用onehotencoder()时出错
val indexer = new StringIndexer()
.setInputCol("CODE")
.setOutputCol("CODE_index")
val encoder = new OneHotEncoder()
.setInputCol("CODE")
.setOutputCol("CODE_encoded")
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("A","B", "CODE_encoded"))
.setOutputCol("features")
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
val fittedPipeline = transformationPipeline.fit(bucketedData2) # Does't work
暂无答案!
目前还没有任何答案,快来回答吧!