python—rabbitmq如何在内部存储消息,以及如何以原始形式检索回消息?

hof1towb  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(245)

我正试图用以下python脚本发布rabbitmq消息--

import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue_durable', durable=True)
message="Hello_Hbase!"
channel.basic_publish(exchange='',routing_key='queue1',body=message)
print(" [x] Sent %r" % message)
connection.close()

接下来,这是订阅服务器脚本。在这个脚本中,我想从 routing_queue='queue1' 想把信息储存在别的地方。。

import findspark
findspark.init("/home/spark/spark-2.2.0")
from pyspark.sql import SparkSession

import time
import pika
import happybase
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

connection = happybase.Connection(host='localhost', port=9090)
table = connection.table('blogpost')
print(connection.tables())

    def callback(ch, method, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count('.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        #return(body)

channel.basic_qos(prefetch_count=1)
body=channel.basic_consume( callback,queue='queue1') 
print(body) ## here it is giving some encrypted msg, how to retrieve in original form

# here in the body,I am getting this - ctag1.587a9ab83301436195fc3f653c2f6db0

table.put('1', {'post:status': body})
print("hbase insertion done")

# channel.start_consuming()

有人能告诉我如何从rabbitmq队列中检索原始形式的msg吗?

0x6upsns

0x6upsns1#

basic_consume 不返回消息的正文。
在回调函数中可以访问的 body 已正确使用的参数。你也应该在那里操作数据库。

相关问题