我正在尝试构建一个flutter UI应用程序来通过mqtt5协议发布和订阅消息。我正在使用库mqtt5_client: ^3.3.4
订阅工作起来很有魅力。但在发布库时抛出InvalidHeaderException。源代码对解决问题没有太大帮助。提前感谢您的任何指针。
下面给出了完整的代码和异常堆栈跟踪。问题出在publish调用的publish方法中
import 'dart:io';
import 'dart:convert';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';
class MQTTClient {
late MqttServerClient _client;
MQTTClient(String url, String clientId, int port) {
_client = MqttServerClient(url, clientId);
_client.port = port;
_client.keepAlivePeriod = 60;
_client.onConnected = onConnected;
_client.onDisconnected = onDisconnected;
MqttConnectMessage connectMessage =
MqttConnectMessage().withWillQos(MqttQos.atLeastOnce);
_client.connectionMessage = connectMessage;
_client.logging(on: true);
}
void subscribe(String topic) {
_client.onSubscribed = onSubscribed;
_client.subscribe(topic, MqttQos.atLeastOnce);
_client.updates.listen((List<MqttReceivedMessage<MqttMessage?>>? msg) {
final recMess = msg![0].payload as MqttPublishMessage;
List<int> msgbytes = (recMess.payload.message)!.cast<int>();
print(
'Received message: topic is ${msg[0].topic}, payload is ${utf8.decode(msgbytes)} ');
});
}
void publish(String topic, String message) {
final builder = MqttPayloadBuilder();
builder.addString(message);
_client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);
_client.published!.listen((event) {
print(
'Published topic: topic is ${event.variableHeader!.topicName}, with Qos ${event.header!.qos}');
});
}
Future<int> connect() async {
try {
await _client.connect();
} on MqttNoConnectionException catch (e) {
print('connect exception - $e');
} on SocketException catch (e) {
print('socket exception - $e');
}
if (_client.connectionStatus!.state == MqttConnectionState.connected) {
print('client connected');
} else {
print(
'client connection failed - disconnecting, status is ${_client.connectionStatus}');
_client.disconnect();
exit(-1);
}
return 0;
}
}
MqttSubscription onSubscribed(MqttSubscription subscription) {
print('subscribed $subscription');
return subscription;
}
void onConnected() {
print("client connected");
}
void onDisconnected() {
print('client disconnected');
}
异常堆栈跟踪为:
Unhandled exception:
mqtt-client::InvalidHeaderException: The supplied header is invalid. Header must be at least 2 bytes long.
#0 MqttHeader.readFrom (package:mqtt5_client/src/messages/mqtt_header.dart:69:7)
#1 new MqttHeader.fromByteBuffer (package:mqtt5_client/src/messages/mqtt_header.dart:19:5)
#2 MqttByteBuffer.isMessageAvailable (package:mqtt5_client/src/utility/mqtt_byte_buffer.dart:66:29)
#3 MqttServerConnection._onData (package:mqtt5_client/src/connectionhandling/server/mqtt_server_connection.dart:60:26)
#4 _RootZone.runUnaryGuarded (dart:async/zone.dart:1586:10)
#5 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#6 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#7 _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:774:19)
#8 _StreamController._add (dart:async/stream_controller.dart:648:7)
#9 _StreamController.add (dart:async/stream_controller.dart:596:5)
#10 _Socket._onData (dart:io-patch/socket_patch.dart:2324:41)
#11 _RootZone.runUnaryGuarded (dart:async/zone.dart:1586:10)
#12 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#13 _BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:7)
#14 _SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:774:19)
#15 _StreamController._add (dart:async/stream_controller.dart:648:7)
#16 _StreamController.add (dart:async/stream_controller.dart:596:5)
#17 new _RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:1849:33)
#18 _NativeSocket.issueReadEvent.issue (dart:io-patch/socket_patch.dart:1322:14)
#19 _microtaskLoop (dart:async/schedule_microtask.dart:40:21)
#20 _startMicrotaskLoop (dart:async/schedule_microtask.dart:49:5)
#21 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:122:13)
#22 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:193:5)
1条答案
按热度按时间f0ofjuux1#
此库的作者已经解决了这个问题;这个错误是mqtt 5特有的,正在调查中,解决方法是处理|忽略该异常,因为它发生在消息发布之后。另一个选项是使用mqtt_client而不是mqtt5_client。这两种方法都适用于https://github.com/shamblett/mqtt_client/issues/438#issuecomment-1378346434