我在pyspark上写了这个流虹膜分类的代码,但是我得到了这个错误“'RDD'对象没有属性'_jdf'“。我已经将RDD更改为对象框架,但是它告诉我“RDD不是一个itterable”。请帮助我解决它!非常感谢。
这是我的代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.ml import PipelineModel, Pipeline
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *
conf = SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
sc = SparkContext.getOrCreate(conf = conf)
ssc = StreamingContext(sc,1)
lines = ssc.socketTextStream("localhost", 8889)
#Load ML model
sameModel = PipelineModel.load("g:/Demo/DecisionTree_Model1")
#Predict the type of iris from features
result = line.foreachRDD(lambda rdd: sameModel.transform(rdd))
ssc.start()
ssc.awaitTermination()
字符串
和错误:'RDD'对象没有属性'_jdf'
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-18f3db416f1c> in <module>()
1 ssc.start()
----> 2 ssc.awaitTermination()
E:\Spark\spark\python\pyspark\streaming\context.py in awaitTermination(self,
timeout)
204 """
205 if timeout is None:
--> 206 self._jssc.awaitTermination()
207 else:
208 self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in
__call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
E:\Spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o35.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "E:\Spark\spark\python\pyspark\streaming\util.py", line 65, in call
r = self.func(t, *rdds)
File "E:\Spark\spark\python\pyspark\streaming\dstream.py", line 159, in
<lambda>
func = lambda t, rdd: old_func(rdd)
File "<ipython-input-5-64e27204db5a>", line 1, in <lambda>
result = lines.foreachRDD(lambda rdd: sameModel.transform(rdd))
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\pipeline.py", line 262, in _transform
dataset = t.transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
return self._transform(dataset)
File "E:\Spark\spark\python\pyspark\ml\wrapper.py", line 305, in _transform
return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
AttributeError: 'RDD' object has no attribute '_jdf'
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
型
2条答案
按热度按时间k2fxgqgv1#
下面的代码展示了如何加载一个预先训练好的模型。用socket的source启动Spark stream,并在其上使用transform。然后将其sink到console。
字符串
ubby3x7f2#
您看到的错误消息是因为
PipelineModel
类的transform
方法需要DataFrame,而不是RDD。但是,Spark Streaming中的
foreachRDD
方法生成的是RDD,而不是DataFrame。要解决这个问题,您需要在应用transform
方法之前将RDD转换为DataFrame。以下是您的操作方法:字符串
在
process
函数中,我们首先检查RDD是否为空。如果不是,我们将RDD转换为DataFrame,应用模型,然后显示结果。请注意,
toDF
方法假设RDD是Row对象或元组列表的形式。如果您的数据不是这种格式,则可能需要在调用toDF
之前对其进行转换。另外,如果没有正确推断,请记住为DataFrame定义模式。您可以通过将
StructField
对象列表传递给StructType
构造函数,然后在调用toDF
时使用此模式来完成此操作。