将directstream rdd保存到mongodb中

bkhjykvo  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(305)

我正在学习mongodb,在做一个简单的练习时,我遇到了一个问题,即使用pymongou-spark将kafka消费者的数据存储到mongodb。实际上,mongodb中保存的数据与kafka consumer生成的数据是不同的。
这是Kafka消费代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils 
import math
import time
from pyspark import SparkConf
import pymongo_spark

# Important: activate pymongo_spark.

pymongo_spark.activate()

startInformation = {}
oldX = ''
oldY = ''

# Create a local StreamingContext with two working thread and batch interval of 3 second

sc = SparkContext("local[2]", "OdometryConsumer")
ssc = StreamingContext(sc, 3)

kafkaStream = KafkaUtils.createDirectStream(ssc, ['odometry'], {'metadata.broker.list': 'localhost:9092'})

def getPositionSpeed(line):
fr = open('/Users/en1gma/Desktop/info.txt', 'r')
for l in fr.readlines():
    oldX = float(l.split(' ')[0])
    oldY = float(l.split(' ')[1])
    try:
        oldTs = int(l.split(' ')[2])
    except:
        oldTs = int(time.time())
fr.close()
fields = line[1].split(" ")
robotId = fields[0].split(":")[1]
deltaSpace = float(fields[1].split(":")[1])
thetaTwist = float(fields[2].split(":")[1])
ts = int(fields[3].split(":")[1])
newX = oldX + deltaSpace*(math.cos(thetaTwist))
newY = oldY + deltaSpace*(math.sin(thetaTwist))
try:
    speed = deltaSpace/(ts - oldTs)
except:
    speed = float(0)

fw = open('/Users/en1gma/Desktop/info.txt', 'w')
fw.write(str(newX) + " " + str(newY) + " " + str(ts))
fw.close()

startInformation["robotId"] = str(robotId)
startInformation["x_coordinate"] = str(newX)
startInformation["y_coordinate"] = str(newY)
startInformation["speed"] = str(speed)
startInformation["deltaSpace"] = str(deltaSpace)
startInformation["thetaTwist"] = str(thetaTwist)
startInformation["timeStamp"] = str(ts)

return startInformation

elaborate = kafkaStream.map(getPositionSpeed)

def sendRecord(rdd):
try:
    rdd.saveToMongoDB('mongodb://localhost:27017/db.test')
except:
    pass

elaborate.foreachRDD(sendRecord)

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

你知道为什么会这样吗?
提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题