在另一个类似的问题中,他们暗示“安装旧的spark2.4.5”
编辑:上面链接的解决方案是“安装spark 2.4.5,它确实有kafkautils。但问题是我无法下载spark2.4.5-即使在存档中也不可用。
我按照建议安装了旧版本的spark-spark2.4.6(唯一可用的旧版本),还有python37、kafka python和pyspark libs。
我的spark\u job.py文件需要使用kafka
from pyspark.streaming.kafka import KafkaUtils
当点击“python spark\u job.py”
ModuleNotFoundError: No module named 'pyspark.streaming.kafka'
错误仍然存在!
spark\ u作业.py:
from __future__ import print_function
import sys
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json
outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
# -------------------------------------------------
# What I want to do per each RDD...
# -------------------------------------------------
def process(time, rdd):
print("===========-----> %s <-----===========" % str(time))
try:
spark = getSparkSessionInstance(rdd.context.getConf())
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
except Exception as e:
print("--> Opps! It seems an Errrorrr with DB working!", e)
except Exception as e:
print("--> Opps! Is seems an Error!!!", e)
# -------------------------------------------------
# General function
# -------------------------------------------------
def createContext():
sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
broker_list, topic = sys.argv[1:]
try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list={} topic={}".format(broker_list, topic))
parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# RDD handling
parsed_lines.foreachRDD(process)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间7fhtutme1#
我只是用pip把它降级了:
我没有用任何诗歌。重新安装后,kafkautils pkg得到认可。