我试图通过pyspark为我的一个应用程序连接到kafka(0.9.0)流。面临以下问题:
采取的步骤
使用以下命令启动Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
使用kafka python库,我启动了kafka producer。没有问题,我可以通过python将它们消费回来。
现在,如果通过pyspark(1.5.2)使用相同的代码,如下所示:
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 3)
zkQuorum, topic = 'localhost:9092', 'test'
kvs = KafkaUtils.createStream(ssc, zkQuorum,"my_group", {topic: 3})
lines = kvs.map(lambda x: x.value)
counts = (lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我使用以下命令执行上述代码
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py
我得到以下错误:
15/12/17 15:37:20 INFO ClientCnxn: Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:9092, initiating session
15/12/17 15:37:20 INFO PythonRunner: Times: total = 157, boot = 156, init = 1, finish = 0
15/12/17 15:37:20 INFO Executor: Finished task 3.0 in stage 4.0 (TID 5). 1213 bytes result sent to driver
15/12/17 15:37:20 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 5) in 958 ms on localhost (1/4)
15/12/17 15:37:20 INFO PythonRunner: Times: total = 305, boot = 304, init = 1, finish = 0
15/12/17 15:37:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 2). 1213 bytes result sent to driver
15/12/17 15:37:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 1115 ms on localhost (2/4)
15/12/17 15:37:20 INFO PythonRunner: Times: total = 457, boot = 456, init = 1, finish = 0
15/12/17 15:37:20 INFO Executor: Finished task 1.0 in stage 4.0 (TID 3). 1213 bytes result sent to driver
15/12/17 15:37:20 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 1266 ms on localhost (3/4)
15/12/17 15:37:20 INFO PythonRunner: Times: total = 306, boot = 304, init = 2, finish = 0
15/12/17 15:37:20 INFO Executor: Finished task 2.0 in stage 4.0 (TID 4). 1213 bytes result sent to driver
15/12/17 15:37:20 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 1268 ms on localhost (4/4)
15/12/17 15:37:20 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:393) finished in 1.272 s
15/12/17 15:37:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/12/17 15:37:20 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:393, took 1.297262 s
15/12/17 15:37:21 INFO JobScheduler: Added jobs for time 1450346841000 ms
15/12/17 15:37:21 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
15/12/17 15:37:21 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:393) with 3 output partitions
15/12/17 15:37:21 INFO DAGScheduler: Final stage: ResultStage 6(runJob at PythonRDD.scala:393)
15/12/17 15:37:21 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
15/12/17 15:37:21 INFO DAGScheduler: Missing parents: List()
15/12/17 15:37:21 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43), which has no missing parents
15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(5576) called with curMem=100677, maxMem=556038881
15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KB, free 530.2 MB)
15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(3326) called with curMem=106253, maxMem=556038881
15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.2 KB, free 530.2 MB)
15/12/17 15:37:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:61820 (size: 3.2 KB, free: 530.3 MB)
15/12/17 15:37:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861
15/12/17 15:37:21 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43)
15/12/17 15:37:21 INFO TaskSchedulerImpl: Adding task set 6.0 with 3 tasks
15/12/17 15:37:21 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 2024 bytes)
15/12/17 15:37:21 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 7, localhost, PROCESS_LOCAL, 2024 bytes)
15/12/17 15:37:21 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 8, localhost, PROCESS_LOCAL, 2024 bytes)
15/12/17 15:37:21 INFO Executor: Running task 0.0 in stage 6.0 (TID 6)
15/12/17 15:37:21 INFO Executor: Running task 2.0 in stage 6.0 (TID 8)
15/12/17 15:37:21 INFO Executor: Running task 1.0 in stage 6.0 (TID 7)
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
15/12/17 15:37:21 INFO PythonRunner: Times: total = 158, boot = 154, init = 1, finish = 3
C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
15/12/17 15:37:22 INFO PythonRunner: Times: total = 298, boot = 294, init = 1, finish = 3
C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
15/12/17 15:37:22 INFO PythonRunner: Times: total = 448, boot = 444, init = 1, finish = 3
15/12/17 15:37:22 INFO PythonRunner: Times: total = 152, boot = 151, init = 1, finish = 0
15/12/17 15:37:22 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1213 bytes result sent to driver
15/12/17 15:37:22 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 784 ms on localhost (1/3)
15/12/17 15:37:22 INFO PythonRunner: Times: total = 320, boot = 318, init = 2, finish = 0
15/12/17 15:37:22 INFO Executor: Finished task 2.0 in stage 6.0 (TID 8). 1213 bytes result sent to driver
15/12/17 15:37:22 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 8) in 952 ms on localhost (2/3)
15/12/17 15:37:22 INFO PythonRunner: Times: total = 172, boot = 171, init = 1, finish = 0
15/12/17 15:37:22 INFO Executor: Finished task 1.0 in stage 6.0 (TID 7). 1213 bytes result sent to driver
15/12/17 15:37:22 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 7) in 957 ms on localhost (3/3)
15/12/17 15:37:22 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:393) finished in 0.959 s
15/12/17 15:37:22 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
15/12/17 15:37:22 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:393, took 0.987050 s
15/12/17 15:37:23 INFO ClientCnxn: Client session timed out, have not heard from server in 3000ms for sessionid 0x0, closing socket connection and attempting re
connect
-------------------------------------------
Time: 2015-12-17 15:37:18
-------------------------------------------
15/12/17 15:37:23 INFO JobScheduler: Finished job streaming job 1450346838000 ms.0 from job set of time 1450346838000 ms
15/12/17 15:37:23 INFO JobScheduler: Total delay: 5.780 s for time 1450346838000 ms (execution: 5.725 s)
15/12/17 15:37:23 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/12/17 15:37:23 INFO JobScheduler: Starting job streaming job 1450346841000 ms.0 from job set of time 1450346841000 ms
尽管“时间”部分继续出现。
皮斯帕克和Kafka没有问题,一切都很好。有人能帮忙解决这个问题吗。
2条答案
按热度按时间u59ebvdq1#
我认为这行有错误
zookeeper端口应为2181
资料来源:http://spark.apache.org/docs/latest/streaming-kafka-integration.html
nuypyhwy2#
如果不迭代rdd,就不能使用foreachrdd来处理每条记录,因为dsstream将创建连续的rdd。
当做
karthikeyan rasipalayam durairaj先生