pyspark问题来自kafka

v64noz0r  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(385)

我试图通过pyspark为我的一个应用程序连接到kafka(0.9.0)流。面临以下问题:
采取的步骤
使用以下命令启动Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

使用kafka python库,我启动了kafka producer。没有问题,我可以通过python将它们消费回来。
现在,如果通过pyspark(1.5.2)使用相同的代码,如下所示:

import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext

sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 3)

zkQuorum, topic = 'localhost:9092', 'test'
kvs = KafkaUtils.createStream(ssc, zkQuorum,"my_group", {topic: 3})

lines = kvs.map(lambda x: x.value)
counts = (lines.flatMap(lambda line: line.split(" "))
  .map(lambda word: (word, 1)) 
  .reduceByKey(lambda a, b: a+b)
 )

 counts.pprint()

 ssc.start()
 ssc.awaitTermination()

我使用以下命令执行上述代码

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py

我得到以下错误:

15/12/17 15:37:20 INFO ClientCnxn: Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:9092, initiating session
  15/12/17 15:37:20 INFO PythonRunner: Times: total = 157, boot = 156, init = 1, finish = 0
  15/12/17 15:37:20 INFO Executor: Finished task 3.0 in stage 4.0 (TID 5). 1213 bytes result sent to driver
  15/12/17 15:37:20 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 5) in 958 ms on localhost (1/4)
  15/12/17 15:37:20 INFO PythonRunner: Times: total = 305, boot = 304, init = 1, finish = 0
  15/12/17 15:37:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 2). 1213 bytes result sent to driver
  15/12/17 15:37:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 1115 ms on localhost (2/4)
  15/12/17 15:37:20 INFO PythonRunner: Times: total = 457, boot = 456, init = 1, finish = 0
  15/12/17 15:37:20 INFO Executor: Finished task 1.0 in stage 4.0 (TID 3). 1213 bytes result sent to driver
  15/12/17 15:37:20 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 1266 ms on localhost (3/4)
  15/12/17 15:37:20 INFO PythonRunner: Times: total = 306, boot = 304, init = 2, finish = 0
  15/12/17 15:37:20 INFO Executor: Finished task 2.0 in stage 4.0 (TID 4). 1213 bytes result sent to driver
  15/12/17 15:37:20 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 1268 ms on localhost (4/4)
  15/12/17 15:37:20 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:393) finished in 1.272 s
  15/12/17 15:37:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
  15/12/17 15:37:20 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:393, took 1.297262 s
  15/12/17 15:37:21 INFO JobScheduler: Added jobs for time 1450346841000 ms
  15/12/17 15:37:21 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
  15/12/17 15:37:21 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:393) with 3 output partitions
  15/12/17 15:37:21 INFO DAGScheduler: Final stage: ResultStage 6(runJob at PythonRDD.scala:393)
  15/12/17 15:37:21 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
  15/12/17 15:37:21 INFO DAGScheduler: Missing parents: List()
  15/12/17 15:37:21 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43), which has no missing parents
  15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(5576) called with curMem=100677, maxMem=556038881
  15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KB, free 530.2 MB)
  15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(3326) called with curMem=106253, maxMem=556038881
  15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.2 KB, free 530.2 MB)
  15/12/17 15:37:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:61820 (size: 3.2 KB, free: 530.3 MB)
  15/12/17 15:37:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861
  15/12/17 15:37:21 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43)
  15/12/17 15:37:21 INFO TaskSchedulerImpl: Adding task set 6.0 with 3 tasks
  15/12/17 15:37:21 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 2024 bytes)
  15/12/17 15:37:21 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 7, localhost, PROCESS_LOCAL, 2024 bytes)
  15/12/17 15:37:21 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 8, localhost, PROCESS_LOCAL, 2024 bytes)
  15/12/17 15:37:21 INFO Executor: Running task 0.0 in stage 6.0 (TID 6)
  15/12/17 15:37:21 INFO Executor: Running task 2.0 in stage 6.0 (TID 8)
  15/12/17 15:37:21 INFO Executor: Running task 1.0 in stage 6.0 (TID 7)
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
  15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
  C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
  15/12/17 15:37:21 INFO PythonRunner: Times: total = 158, boot = 154, init = 1, finish = 3
  C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
  15/12/17 15:37:22 INFO PythonRunner: Times: total = 298, boot = 294, init = 1, finish = 3
  C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
  15/12/17 15:37:22 INFO PythonRunner: Times: total = 448, boot = 444, init = 1, finish = 3
  15/12/17 15:37:22 INFO PythonRunner: Times: total = 152, boot = 151, init = 1, finish = 0
  15/12/17 15:37:22 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1213 bytes result sent to driver
  15/12/17 15:37:22 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 784 ms on localhost (1/3)
  15/12/17 15:37:22 INFO PythonRunner: Times: total = 320, boot = 318, init = 2, finish = 0
  15/12/17 15:37:22 INFO Executor: Finished task 2.0 in stage 6.0 (TID 8). 1213 bytes result sent to driver
  15/12/17 15:37:22 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 8) in 952 ms on localhost (2/3)
  15/12/17 15:37:22 INFO PythonRunner: Times: total = 172, boot = 171, init = 1, finish = 0
  15/12/17 15:37:22 INFO Executor: Finished task 1.0 in stage 6.0 (TID 7). 1213 bytes result sent to driver
  15/12/17 15:37:22 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 7) in 957 ms on localhost (3/3)
  15/12/17 15:37:22 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:393) finished in 0.959 s
  15/12/17 15:37:22 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
  15/12/17 15:37:22 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:393, took 0.987050 s
  15/12/17 15:37:23 INFO ClientCnxn: Client session timed out, have not heard from server in 3000ms for sessionid 0x0, closing socket connection and attempting re
  connect
  -------------------------------------------
  Time: 2015-12-17 15:37:18
  -------------------------------------------

  15/12/17 15:37:23 INFO JobScheduler: Finished job streaming job 1450346838000 ms.0 from job set of time 1450346838000 ms
  15/12/17 15:37:23 INFO JobScheduler: Total delay: 5.780 s for time 1450346838000 ms (execution: 5.725 s)
  15/12/17 15:37:23 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
  15/12/17 15:37:23 INFO JobScheduler: Starting job streaming job 1450346841000 ms.0 from job set of time 1450346841000 ms

尽管“时间”部分继续出现。
皮斯帕克和Kafka没有问题,一切都很好。有人能帮忙解决这个问题吗。

u59ebvdq

u59ebvdq1#

我认为这行有错误

zkQuorum, topic = 'localhost:9092', 'test'

zookeeper端口应为2181

zkQuorum, topic = 'localhost:2181', 'test'

资料来源:http://spark.apache.org/docs/latest/streaming-kafka-integration.html

nuypyhwy

nuypyhwy2#

如果不迭代rdd,就不能使用foreachrdd来处理每条记录,因为dsstream将创建连续的rdd。

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        lines = record.map(lambda x: x.value)
        counts = (lines.flatMap(lambda line: line.split(" "))
                  .map(lambda word: (word, 1)) 
                 .reduceByKey(lambda a, b: a+b)
                  )
         counts.pprint()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

当做
karthikeyan rasipalayam durairaj先生

相关问题