kafka producer示例代码是:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser as configparser
from pykafka import KafkaClient
import time
import snappy
config = configparser.ConfigParser()
config.read("conf.ini")
app_name = "test_word_counter"
kafka_hosts = config.get(app_name, 'kafka_hosts')
kafka_topic = config.get(app_name, 'kafka_topic')
print("kafka client: %s" % kafka_hosts)
print("kafka topic: %s" % kafka_topic)
kafka_client = KafkaClient(hosts=kafka_hosts) # Create Kafka client
topic = kafka_client.topics[kafka_topic] # This will create the topic if it does not exist
with topic.get_producer() as producer: # Create Kafka producer on the given topic
while True:
msg = "just a test for snappy compress with kafka and spark"
msg = snappy.compress(msg) # add snappy compress
producer.produce(msg) # Send the message to Kafka
print("send data len(%d)" % len(msg))
print(msg)
time.sleep(5)
代码非常简单,使用python snappy,压缩数据,然后把它放到kafka中。
Pypark代码是:
def word_counter(zk_host, topic):
sc = SparkContext(appName="PythonStreamingKafkaWordCounter")
sc = SparkContext(conf=spark_conf)
ssc = StreamingContext(sc, 30)
kvs = KafkaUtils.createStream(ssc, zk_host, "spark-streaming-consumer", {topic: 2})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
然后,运行spark commit:
spark-submit --jars /usr/local/services/metrics-spark-analyser/external/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar spark_word_counter_consumer.py
我收到了以下错误消息:
UnicodeDecodeError: 'utf8' codec can't decode byte 0xcc in position 1: invalid continuation byte
更详细的错误代码如下:
16/12/18 13:58:30 ERROR Executor: Exception in task 5.0 in stage 7.0 (TID 30)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1792, in combineLocally
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 73, in <lambda>
File "/usr/local/services/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 36, in utf8_decoder
return s.decode('utf-8')
File "/usr/local/services/python/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0xcc in position 1: invalid continuation byte
似乎spark streaming无法从kafka中解压缩快速的数据。
我是否应该在spark中添加任何配置?
谢谢~
软件详细信息:
Kafka0.10.1.0
Hadoop2.7的每个版本spark 2.0.2
python snappy 0.5版
附言:
我写了一个简单的Kafka消费程序来读取Kafka的snappy数据,snappy解压过程是成功的。
暂无答案!
目前还没有任何答案,快来回答吧!