使用Python WebSocket客户端访问传入消息

gg58donl  于 2022-11-11  发布在  Python
关注(0)|答案(2)|浏览(208)

我正在尝试通过websocket-client模块接收消息,并能够将接收到的消息用于其他目的(例如,根据传入的消息执行买入/卖出订单)。
以下是我目前所掌握的情况:

import websocket
import time
import json

def on_message(ws, message):
    try:
        current_price = json.loads(message)
        print(current_price["price"])       # data type is dict.. only showing values for the key 'price'

    except:
        print("Please wait..")
        time.sleep(1)          

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    sub_params = {'type': 'subscribe', 'product_ids': ['BTC-USD'], 'channels': ['ticker']}
    ws.send(json.dumps(sub_params))

if __name__ == "__main__":
    websocket.enableTrace(False)
    ws = websocket.WebSocketApp("wss://ws-feed.pro.coinbase.com/",
                              on_open = on_open,
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

    ws.run_forever()

运行这段代码会打印出当前的比特币价格(current_price),因为它们通过它的WebSocket提要进入。
接下来我想做的是能够在WebSocket函数之外访问变量current_price,但我在这里遇到了困难。写任何超出ws.run_forever()的内容都将被忽略,因为websocket事件循环永远不会结束。
所以我试着用“threading”模块在一个单独的线程上运行WebSocket:

import websocket
  import json
  import threading

  current_price = 0

  def on_message(ws, message):

      global current_price
      current_price = message

  def on_error(ws, error):
      print(error)

  def on_close(ws):
      print("### closed ###")

  def on_open(ws):
      sub_params = {'type': 'subscribe', 'product_ids': ['BTC-USD'], 'channels': ['ticker']}
      ws.send(json.dumps(sub_params))

  if __name__ == "__main__":
      websocket.enableTrace(False)
      ws = websocket.WebSocketApp("wss://ws-feed.pro.coinbase.com/",
                                on_open = on_open,
                                on_message = on_message,
                                on_error = on_error,
                                on_close = on_close)

      ws_thread = threading.Thread(target = ws.run_forever)
      ws_thread.start()
      print(current_price)

它返回0。我该怎么做才能使它工作呢?

xn1cxnb4

xn1cxnb41#

不 确定 这 是 不 是 最 合适 的 答案 , 但 找到 了 一 种 方法 , 使 这 一 工作 。

import queue
.
.
.
.

    def on_message(ws, message):
            current_price = message
            q.put(current_price)
    .
    .
    .

    ws_thread.start()

    while True:
        print(q.get())

中 的 每 一 个

pkwftd7m

pkwftd7m2#

的关键是使用functools partials来 Package run_forever线程的回调函数。我构建了一个简单的多线程演示器:

from websocket import WebSocketApp
import time
from threading import Thread
from queue import Queue
from functools import partial

class websocket_client:
    def __init__(self):
        self.responseQ = Queue() #our message response Q
        #add the response Qs on by wrapping the callbacks in functools partials
        self.websocket = WebSocketApp("ws://echo.websocket.events",
                                on_open = partial(self.on_open, responseQ=self.responseQ),
                                on_message = partial(self.on_message, responseQ=self.responseQ),
                                on_error = partial(self.on_error, responseQ=self.responseQ),
                                on_close = partial(self.on_close, responseQ=self.responseQ))
        # start run_forever as thread. request keep alive pings every 10 seconds with 5 second timouts
        self.controller = Thread(target=self.websocket.run_forever, args=(None, None, 10, 5))
        self.controller.start()

    def on_open(self, websocket, responseQ):
        #print("Opening Connection (run_forever)")
        responseQ.put("Connected") #transmit status information

    def on_error(self, websocket, error, responseQ):
        #print("Connection Error (run_forever):", error)
        responseQ.put(error) # or error messages

    def on_message(self, websocket, message,  responseQ):
        #print("Message Received (run_forever):", message)
        responseQ.put(message)

    def on_close(self, websocket, status, message, responseQ):
        #print("Closing Connection (run_forever):", status, message)
        responseQ.put({'status': message, 'message': message}) #send close status and message in a dictionary

if __name__ == '__main__':
    client = websocket_client() # make the client
    while client.responseQ.get() != "Connected": time.sleep(0.1) #block until connected
    print("\nSERVER GREETING:", client.responseQ.get()) #greeting from server
    while True:
        message_to_send = time.strftime("%b %d %Y %H:%M:%S")
        client.websocket.send(message_to_send)
        message_received = client.responseQ.get()
        print("Main Thread: SENT --> '%s' RECEIVED --> '%s'" %(message_to_send, message_received))
        time.sleep(1)

相关问题