没有名为“pyspark.streaming.kafka”的模块,即使是旧版本的spark

kninwzqo  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(1620)

在另一个类似的问题中,他们暗示“安装旧的spark2.4.5”
编辑:上面链接的解决方案是“安装spark 2.4.5,它确实有kafkautils。但问题是我无法下载spark2.4.5-即使在存档中也不可用。
我按照建议安装了旧版本的spark-spark2.4.6(唯一可用的旧版本),还有python37、kafka python和pyspark libs。
我的spark\u job.py文件需要使用kafka

  1. from pyspark.streaming.kafka import KafkaUtils

当点击“python spark\u job.py”

  1. ModuleNotFoundError: No module named 'pyspark.streaming.kafka'

错误仍然存在!
spark\ u作业.py:

  1. from __future__ import print_function
  2. import sys
  3. import os
  4. import shutil
  5. from pyspark import SparkContext, SparkConf
  6. from pyspark.streaming import StreamingContext
  7. from pyspark.sql import Row, SparkSession
  8. from pyspark.streaming.kafka import KafkaUtils # this is the problem
  9. import json
  10. outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'
  11. def getSparkSessionInstance(sparkConf):
  12. if ('sparkSessionSingletonInstance' not in globals()):
  13. globals()['sparkSessionSingletonInstance'] = SparkSession\
  14. .builder\
  15. .config(conf=sparkConf)\
  16. .getOrCreate()
  17. return globals()['sparkSessionSingletonInstance']
  18. # -------------------------------------------------
  19. # What I want to do per each RDD...
  20. # -------------------------------------------------
  21. def process(time, rdd):
  22. print("===========-----> %s <-----===========" % str(time))
  23. try:
  24. spark = getSparkSessionInstance(rdd.context.getConf())
  25. rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
  26. currency=w['currency'],
  27. amount=w['amount']))
  28. testDataFrame = spark.createDataFrame(rowRdd)
  29. testDataFrame.createOrReplaceTempView("treasury_stream")
  30. sql_query = get_sql_query()
  31. testResultDataFrame = spark.sql(sql_query)
  32. testResultDataFrame.show(n=5)
  33. # Insert into DB
  34. try:
  35. testResultDataFrame.write \
  36. .format("jdbc") \
  37. .mode("append") \
  38. .option("driver", 'org.postgresql.Driver') \
  39. .option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
  40. .option("dbtable", "transaction_flow") \
  41. .option("user", "habr") \
  42. .option("password", "habr12345") \
  43. .save()
  44. except Exception as e:
  45. print("--> Opps! It seems an Errrorrr with DB working!", e)
  46. except Exception as e:
  47. print("--> Opps! Is seems an Error!!!", e)
  48. # -------------------------------------------------
  49. # General function
  50. # -------------------------------------------------
  51. def createContext():
  52. sc = SparkContext(appName="PythonStreamingKafkaTransaction")
  53. sc.setLogLevel("ERROR")
  54. ssc = StreamingContext(sc, 2)
  55. broker_list, topic = sys.argv[1:]
  56. try:
  57. directKafkaStream = KafkaUtils.createDirectStream(ssc,
  58. [topic],
  59. {"metadata.broker.list": broker_list})
  60. except:
  61. raise ConnectionError("Kafka error: Connection refused: \
  62. broker_list={} topic={}".format(broker_list, topic))
  63. parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
  64. # RDD handling
  65. parsed_lines.foreachRDD(process)
  66. return ssc
  67. if __name__ == "__main__":
  68. if len(sys.argv) != 3:
  69. print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
  70. exit(-1)
  71. print("--> Creating new context")
  72. if os.path.exists(outputPath):
  73. shutil.rmtree('outputPath')
  74. ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
  75. ssc.start()
  76. ssc.awaitTermination()
7fhtutme

7fhtutme1#

我只是用pip把它降级了:

  1. pip install --force-reinstall pyspark==2.4.6

我没有用任何诗歌。重新安装后,kafkautils pkg得到认可。

相关问题