Pyspark流媒体

vof42yt1  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(218)

我在pyspark上写了这个流虹膜分类的代码,但是我得到了这个错误“'RDD'对象没有属性'_jdf'“。我已经将RDD更改为对象框架,但是它告诉我“RDD不是一个itterable”。请帮助我解决它!非常感谢。

这是我的代码:

  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.ml import PipelineModel, Pipeline
  4. from pyspark.sql import Row, DataFrame
  5. from pyspark.sql.types import *
  6. from pyspark.sql.functions import *
  7. conf = SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  8. sc = SparkContext.getOrCreate(conf = conf)
  9. ssc = StreamingContext(sc,1)
  10. lines = ssc.socketTextStream("localhost", 8889)
  11. #Load ML model
  12. sameModel = PipelineModel.load("g:/Demo/DecisionTree_Model1")
  13. #Predict the type of iris from features
  14. result = line.foreachRDD(lambda rdd: sameModel.transform(rdd))
  15. ssc.start()
  16. ssc.awaitTermination()

字符串

和错误:'RDD'对象没有属性'_jdf'

  1. Py4JJavaError Traceback (most recent call last)
  2. <ipython-input-6-18f3db416f1c> in <module>()
  3. 1 ssc.start()
  4. ----> 2 ssc.awaitTermination()
  5. E:\Spark\spark\python\pyspark\streaming\context.py in awaitTermination(self,
  6. timeout)
  7. 204 """
  8. 205 if timeout is None:
  9. --> 206 self._jssc.awaitTermination()
  10. 207 else:
  11. 208 self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
  12. E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in
  13. __call__(self, *args)
  14. 1255 answer = self.gateway_client.send_command(command)
  15. 1256 return_value = get_return_value(
  16. -> 1257 answer, self.gateway_client, self.target_id, self.name)
  17. 1258
  18. 1259 for temp_arg in temp_args:
  19. E:\Spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
  20. 61 def deco(*a, **kw):
  21. 62 try:
  22. ---> 63 return f(*a, **kw)
  23. 64 except py4j.protocol.Py4JJavaError as e:
  24. 65 s = e.java_exception.toString()
  25. E:\Spark\spark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in
  26. get_return_value(answer, gateway_client, target_id, name)
  27. 326 raise Py4JJavaError(
  28. 327 "An error occurred while calling {0}{1}{2}.\n".
  29. --> 328 format(target_id, ".", name), value)
  30. 329 else:
  31. 330 raise Py4JError(
  32. Py4JJavaError: An error occurred while calling o35.awaitTermination.
  33. : org.apache.spark.SparkException: An exception was raised by Python:
  34. Traceback (most recent call last):
  35. File "E:\Spark\spark\python\pyspark\streaming\util.py", line 65, in call
  36. r = self.func(t, *rdds)
  37. File "E:\Spark\spark\python\pyspark\streaming\dstream.py", line 159, in
  38. <lambda>
  39. func = lambda t, rdd: old_func(rdd)
  40. File "<ipython-input-5-64e27204db5a>", line 1, in <lambda>
  41. result = lines.foreachRDD(lambda rdd: sameModel.transform(rdd))
  42. File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
  43. return self._transform(dataset)
  44. File "E:\Spark\spark\python\pyspark\ml\pipeline.py", line 262, in _transform
  45. dataset = t.transform(dataset)
  46. File "E:\Spark\spark\python\pyspark\ml\base.py", line 173, in transform
  47. return self._transform(dataset)
  48. File "E:\Spark\spark\python\pyspark\ml\wrapper.py", line 305, in _transform
  49. return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
  50. AttributeError: 'RDD' object has no attribute '_jdf'
  51. at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  52. at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  53. at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  54. at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  55. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  56. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  57. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  58. at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
  59. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  60. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  61. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  62. at scala.util.Try$.apply(Try.scala:192)
  63. at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  64. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
  65. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  66. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
  67. at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  68. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
  69. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  70. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  71. at java.lang.Thread.run(Thread.java:748)

k2fxgqgv

k2fxgqgv1#

下面的代码展示了如何加载一个预先训练好的模型。用socket的source启动Spark stream,并在其上使用transform。然后将其sink到console。

  1. spark = SparkSession \
  2. .builder \
  3. .appName("transform ml") \
  4. .getOrCreate()
  5. model = PipelineModel.load("./model")
  6. lines = spark \
  7. .readStream \
  8. .format("socket") \
  9. .option("host", "localhost") \
  10. .option("port", 9999) \
  11. .load()
  12. random = Random()
  13. words = lines.select(f.lit(random.randint(1, 10000))
  14. .alias("id"),
  15. lines.value.alias("text")
  16. )
  17. prediction = model.transform(words)
  18. query = prediction \
  19. .writeStream \
  20. .outputMode("append") \
  21. .format("console") \
  22. .start()
  23. query.awaitTermination()

字符串

展开查看全部
ubby3x7f

ubby3x7f2#

您看到的错误消息是因为PipelineModel类的transform方法需要DataFrame,而不是RDD。
但是,Spark Streaming中的foreachRDD方法生成的是RDD,而不是DataFrame。要解决这个问题,您需要在应用transform方法之前将RDD转换为DataFrame。以下是您的操作方法:

  1. from pyspark.sql import SparkSession
  2. # Create a SparkSession
  3. spark = SparkSession(sc)
  4. def process(rdd):
  5. if not rdd.isEmpty():
  6. df = rdd.toDF() # Convert RDD to DataFrame
  7. result = sameModel.transform(df) # Apply the model
  8. result.show() # Display the result
  9. lines.foreachRDD(process)

字符串
process函数中,我们首先检查RDD是否为空。如果不是,我们将RDD转换为DataFrame,应用模型,然后显示结果。
请注意,toDF方法假设RDD是Row对象或元组列表的形式。如果您的数据不是这种格式,则可能需要在调用toDF之前对其进行转换。
另外,如果没有正确推断,请记住为DataFrame定义模式。您可以通过将StructField对象列表传递给StructType构造函数,然后在调用toDF时使用此模式来完成此操作。

展开查看全部

相关问题