结合spark和kafka使用python多处理

xqk2d5yq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(396)

我编写了一个包含两个函数的小脚本,一个生成随机日志并提供给kafka生产者,另一个使用kafka主题在spark streaming中创建数据流。
我希望这两个函数使用python多处理并发工作,不幸的是,当我运行脚本时,我得到一个与kafkautils.createstream相关的错误。。。
以下是我的终端显示的内容:

~/Desktop/spark_test/kafka_sparkStream/python spark-submit --jars spark-streaming-kafka-assembly_2.10-1.3.0.jar randomLogGenerator.py
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/13 11:50:49 INFO SparkContext: Running Spark version 1.3.0
15/04/13 11:50:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/13 11:50:49 INFO SecurityManager: Changing view acls to: Mandok
15/04/13 11:50:49 INFO SecurityManager: Changing modify acls to: Mandok
15/04/13 11:50:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mandok); users with modify permissions: Set(Mandok)
15/04/13 11:50:50 INFO Slf4jLogger: Slf4jLogger started
15/04/13 11:50:50 INFO Remoting: Starting remoting
15/04/13 11:50:50 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.235.249.202:52867]
15/04/13 11:50:50 INFO Utils: Successfully started service 'sparkDriver' on port 52867.
15/04/13 11:50:50 INFO SparkEnv: Registering MapOutputTracker
15/04/13 11:50:50 INFO SparkEnv: Registering BlockManagerMaster
15/04/13 11:50:50 INFO DiskBlockManager: Created local directory at /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-212d4f02-8166-4dec-bec3-f3f618ab03bf/blockmgr-7c57842e-99ae-47ac-b408-8587b573f8f5
15/04/13 11:50:50 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/04/13 11:50:50 INFO HttpFileServer: HTTP File server directory is /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-4b2527ac-a253-455f-94f2-eef96397426f/httpd-f83cd6dc-0a9d-4bf4-94ca-cad9ab611191
15/04/13 11:50:50 INFO HttpServer: Starting HTTP Server
15/04/13 11:50:50 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 11:50:50 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52868
15/04/13 11:50:50 INFO Utils: Successfully started service 'HTTP file server' on port 52868.
15/04/13 11:50:50 INFO SparkEnv: Registering OutputCommitCoordinator
15/04/13 11:50:50 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 11:50:50 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/04/13 11:50:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/04/13 11:50:50 INFO SparkUI: Started SparkUI at http://10.235.249.202:4040
15/04/13 11:50:51 INFO SparkContext: Added JAR file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/spark-streaming-kafka-assembly_2.10-1.3.0.jar at http://10.235.249.202:52868/jars/spark-streaming-kafka-assembly_2.10-1.3.0.jar with timestamp 1428918651275
15/04/13 11:50:51 INFO Utils: Copying /Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py to /var/folders/n9/3b4rd9wx0v957x03v6h41xnw0000gn/T/spark-46278e87-f4df-42b2-a666-449ab3c52b24/userFiles-e05fb779-60ee-4cd8-aa6b-1e7bcf51b14e/randomLogGenerator.py
15/04/13 11:50:51 INFO SparkContext: Added file file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py at file:/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py with timestamp 1428918651404
15/04/13 11:50:51 INFO Executor: Starting executor ID <driver> on host localhost
15/04/13 11:50:51 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.235.249.202:52867/user/HeartbeatReceiver
15/04/13 11:50:51 INFO NettyBlockTransferService: Server created on 52869
15/04/13 11:50:51 INFO BlockManagerMaster: Trying to register BlockManager
15/04/13 11:50:51 INFO BlockManagerMasterActor: Registering block manager localhost:52869 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 52869)
15/04/13 11:50:51 INFO BlockManagerMaster: Registered BlockManager
HEY I AM HERE
HEY I AM HERE
Process Process-2:
Traceback (most recent call last):
  File "/Users/Mandok/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Users/Mandok/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args,**self._kwargs)
  File "/Users/Mandok/Desktop/spark_test/kafka_sparkStream/python/randomLogGenerator.py", line 56, in sparkStream
    messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
  File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/pyspark/streaming/kafka.py", line 69, in createStream
    jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
  File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
    self.target_id, self.name)
  File "/usr/local/Cellar/apache-spark/1.3.0/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 314, in get_return_value
    return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
KeyError: u'o'
Sent 1000 messages
Sent 2000 messages
Sent 3000 messages
Sent 4000 messages
Sent 5000 messages

我的两个职能:

def randomLog():
    i = 0
    while True:
        timestamp = str(datetime.now())
        publisher = random.choice(Publishers)
        advertiser = random.choice(Advertisers)
        website = "website_" + str(random.randint(0, 10000)) + ".com"
        cookie = "cookie_" + str(random.randint(0, 10000))
        geo = random.choice(Geos)
        bid = random.random()

        log = '{0}, {1}, {2}, {3}, {4}, {5}, {6}'.format(
            timestamp, str(publisher), str(advertiser), website, cookie, geo, str(bid))
        producer.send_messages("adnetwork_topic", log)

        i += 1
        if i % 1000 == 0:
            print "Sent %d messages" % i

def sparkStream():
    messages = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
    lines = messages.map(lambda x: x[1])

    counts = lines.flatMap(lambda line: line.split(", ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

if __name__ == '__main__':
    process_randomLog = Process(target=randomLog)
    process_sparkStream = Process(target=sparkStream)
    process_randomLog.start()
    process_sparkStream.start()
    process_sparkStream.join()
    process_randomLog.join()

谢谢你的帮助!

vc9ivgsu

vc9ivgsu1#

我通过在sparkstreaming函数中添加流上下文解决了这个问题。

相关问题