Dart Mqtt5库在发布消息时抛出InvalidHeaderException

5us2dqdw  于 2023-01-15  发布在  其他
关注(0)|答案(1)|浏览(191)

我正在尝试构建一个flutter UI应用程序来通过mqtt5协议发布和订阅消息。我正在使用库mqtt5_client: ^3.3.4订阅工作起来很有魅力。但在发布库时抛出InvalidHeaderException。源代码对解决问题没有太大帮助。提前感谢您的任何指针。
下面给出了完整的代码和异常堆栈跟踪。问题出在publish调用的publish方法中

import 'dart:io';
import 'dart:convert';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';

class MQTTClient {
  late MqttServerClient _client;

  MQTTClient(String url, String clientId, int port) {
    _client = MqttServerClient(url, clientId);
    _client.port = port;
    _client.keepAlivePeriod = 60;
    _client.onConnected = onConnected;
    _client.onDisconnected = onDisconnected;
    MqttConnectMessage connectMessage =
        MqttConnectMessage().withWillQos(MqttQos.atLeastOnce);
    _client.connectionMessage = connectMessage;
     _client.logging(on: true);
  }

  void subscribe(String topic) {
    _client.onSubscribed = onSubscribed;
    _client.subscribe(topic, MqttQos.atLeastOnce);
    _client.updates.listen((List<MqttReceivedMessage<MqttMessage?>>? msg) {
      final recMess = msg![0].payload as MqttPublishMessage;
      List<int> msgbytes = (recMess.payload.message)!.cast<int>();
      print(
          'Received message: topic is ${msg[0].topic}, payload is ${utf8.decode(msgbytes)} ');
    });
  }

  void publish(String topic, String message) {
    final builder = MqttPayloadBuilder();
    builder.addString(message);
    _client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);
    _client.published!.listen((event) {
      print(
          'Published topic: topic is ${event.variableHeader!.topicName}, with Qos ${event.header!.qos}');
    });
  }

  Future<int> connect() async {
    try {
      await _client.connect();
    } on MqttNoConnectionException catch (e) {
      print('connect exception - $e');
    } on SocketException catch (e) {
      print('socket exception - $e');
    }
    if (_client.connectionStatus!.state == MqttConnectionState.connected) {
      print('client connected');
    } else {
      print(
          'client connection failed - disconnecting, status is ${_client.connectionStatus}');
      _client.disconnect();
      exit(-1);
    }

    return 0;
  }
}

MqttSubscription onSubscribed(MqttSubscription subscription) {
  print('subscribed $subscription');
  return subscription;
}

void onConnected() {
  print("client connected");
}

void onDisconnected() {
  print('client disconnected');
}

异常堆栈跟踪为:

Unhandled exception:
mqtt-client::InvalidHeaderException: The supplied header is invalid. Header must be at least 2 bytes long.
#0      MqttHeader.readFrom (package:mqtt5_client/src/messages/mqtt_header.dart:69:7)
#1      new MqttHeader.fromByteBuffer (package:mqtt5_client/src/messages/mqtt_header.dart:19:5)
#2      MqttByteBuffer.isMessageAvailable (package:mqtt5_client/src/utility/mqtt_byte_buffer.dart:66:29)
#3      MqttServerConnection._onData (package:mqtt5_client/src/connectionhandling/server/mqtt_server_connection.dart:60:26)
#4      _RootZone.runUnaryGuarded (dart:async/zone.dart:1586:10)
#5      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#6      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#7      _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:774:19)
#8      _StreamController._add (dart:async/stream_controller.dart:648:7)
#9      _StreamController.add (dart:async/stream_controller.dart:596:5)
#10     _Socket._onData (dart:io-patch/socket_patch.dart:2324:41)
#11     _RootZone.runUnaryGuarded (dart:async/zone.dart:1586:10)
#12     _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#13     _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#14     _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:774:19)
#15     _StreamController._add (dart:async/stream_controller.dart:648:7)
#16     _StreamController.add (dart:async/stream_controller.dart:596:5)
#17     new _RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:1849:33)
#18     _NativeSocket.issueReadEvent.issue (dart:io-patch/socket_patch.dart:1322:14)
#19     _microtaskLoop (dart:async/schedule_microtask.dart:40:21)
#20     _startMicrotaskLoop (dart:async/schedule_microtask.dart:49:5)
#21     _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:122:13)
#22     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:193:5)
f0ofjuux

f0ofjuux1#

此库的作者已经解决了这个问题;这个错误是mqtt 5特有的,正在调查中,解决方法是处理|忽略该异常,因为它发生在消息发布之后。另一个选项是使用mqtt_client而不是mqtt5_client。这两种方法都适用于https://github.com/shamblett/mqtt_client/issues/438#issuecomment-1378346434

相关问题