我正试图用以下python脚本发布rabbitmq消息--
import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue_durable', durable=True)
message="Hello_Hbase!"
channel.basic_publish(exchange='',routing_key='queue1',body=message)
print(" [x] Sent %r" % message)
connection.close()
接下来,这是订阅服务器脚本。在这个脚本中,我想从 routing_queue='queue1'
想把信息储存在别的地方。。
import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import time
import pika
import happybase
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
connection = happybase.Connection(host='localhost', port=9090)
table = connection.table('blogpost')
print(connection.tables())
def callback(ch, method, body):
print(" [x] Received %r" % body)
time.sleep(body.count('.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
#return(body)
channel.basic_qos(prefetch_count=1)
body=channel.basic_consume( callback,queue='queue1')
print(body) ## here it is giving some encrypted msg, how to retrieve in original form
# here in the body,I am getting this - ctag1.587a9ab83301436195fc3f653c2f6db0
table.put('1', {'post:status': body})
print("hbase insertion done")
# channel.start_consuming()
有人能告诉我如何从rabbitmq队列中检索原始形式的msg吗?
1条答案
按热度按时间0x6upsns1#
basic_consume
不返回消息的正文。在回调函数中可以访问的
body
已正确使用的参数。你也应该在那里操作数据库。