我有一个Java应用程序和一个NodeJS应用程序,它们都使用单个Azure服务总线消息队列。
我和我的客户一起见证了一些奇怪的效果,如下所示。
JAVA MESSAGE PRODUCER(根据Azure JMS教程使用QPID库):
TextMessage message = sendSession.createTextMessage();
message.setText("Test AMQP message from JMS");
long randomMessageID = randomGenerator.nextLong() >>>1;
message.setJMSMessageID("ID:" + randomMessageID);
sender.send(message);
System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());
输出:发送了JMSMessageID = ID的消息:2414932965987073843
节点消息使用者:
serviceBus.receiveQueueMessage(queue, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {
if(message !==null)console.log(util.inspect(message, {showHidden: false, depth: null}));
});
输出:
{ body: '@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001aTest AMQP message from JMS',
brokerProperties:
{ DeliveryCount: 1,
EnqueuedSequenceNumber: 5000004,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:28:21 GMT',
MessageId: '2414932965987073843',
PartitionKey: '89',
SequenceNumber: 59672695067659070,
State: 'Active',
TimeToLive: 1209600,
To: 'moequeue' },
contentType: 'application/xml; charset=utf-8' }
如果将其与通过serviceBus.sendQueueMessage()插入队列的消息进行比较,则属性如下所示:
{ body: 'test message',
brokerProperties:
{ DeliveryCount: 1,
EnqueuedSequenceNumber: 0,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:44:03 GMT',
MessageId: 'bc0a3d4f-15ba-434f-9fb0-1a3789885f8c',
PartitionKey: '734',
SequenceNumber: 37436171906517256,
State: 'Active',
TimeToLive: 1209600 },
contentType: 'text/plain',
customProperties:
{ message_number: 0,
sent_date: Wed Nov 04 2015 21:44:03 GMT+0000 (UTC) } }
因此,内容类型在开始时是不同的-为什么?-那么第一个消息有效负载的主体中奇怪的垃圾来自哪里:@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/ \u001a这是序列化的结果吗?如何缓解这种情况?
在这里也可以找到代码:http://pastebin.com/T9RTFRBk
4条答案
按热度按时间dffbzjpn1#
我们遇到了完全相同的问题,虽然在一个更复杂的例子使用 Camel 为基础的生产者。由于我们的环境的变化,我们开始遇到这些问题。
这里的问题是REST服务在编码对节点客户机的HTTP响应时如何解释JMS消息。
我们发现JmsTextmessage由于某种原因(不完全清楚)被假定为“application/xml”类型,并且内容将按此转发,因此您在示例中得到了OUTPUT。
如果使用JmsByteMessage,则内容被解释为“application/octet-stream”,并且不会在传输中损坏。
因此,不妨尝试以下方法:
我们使用它来传输JSON编码的数据以供Node.js客户端解释。
f5emj3cl2#
Azure服务总线支持两种不同的协议:AMQP和HTTP。使用qpid库的Java/JMS正在为ServiceBus使用AMQP协议。但是,ServiceBus REST API Package 在NodeJS中,而不是HTTP协议中。
有关服务总线中AMQP支持的详细信息,请参阅https://azure.microsoft.com/en-us/documentation/articles/service-bus-amqp-overview/。
对于服务总线的REST API,请参考https://msdn.microsoft.com/en-us/library/azure/hh780717.aspx。
AMQP是一种二进制应用层协议,旨在高效地支持各种消息应用程序和通信模式。-from WikiPedia
但是HTTP是一个文本协议。
报文格式如下,请参考aritifact www.example.com的
Message Format
部分http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format,AMQP规范可参考http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html。因此,在Java中发送的消息或在NodeJS中发送的消息被序列化为不同的结果。
AMQP正文内容中格式化的内容
\uXXXX
为Unicode字符。Unicode字符
\u0006
是确认控制字符,请参阅https://en.wikipedia.org/wiki/Acknowledge_character了解。其中Unicode字符
\u001a
为替换控制字符,请参考https://en.wikipedia.org/wiki/Substitute_character。它们限制消息头中元数据的开始和结束。
zrfyljdw3#
我也遇到了同样的问题,邮件正文的前缀是“@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/ \u0001”。
我使用Nodejs(带有azure-iot-device-mqtt和azure-iot-device包)将消息发送到IoT中心。我使用流分析作业从IoT中心接收消息并将其发布到队列。我使用Nodejs(带有amqp 10包)从队列接收事件。
问题不是由我发送或接收消息的方式引起的,而是,问题出在流分析兼容级别!兼容级别1.0(至少在我部署时是默认的)使用DataContractSerializer,它将消息序列化为XML流!(已修复)兼容级别为1.1时的此问题。因此,您可能只需要将流分析作业的兼容级别(配置-〉兼容级别)更改为1.1。
参见:https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-compatibility-level#major-changes-in-the-latest-compatibility-level-11:
isr3a4wc4#
JMS发送的消息实际上是. NET二进制XML格式(MC-NBFX),如here所指定的。我认为这是当您尝试使用DataContractSerializer与XmlDictionaryWrite.CreateBinaryWriter(尽管我对. NET一无所知)序列化对象时得到的结果。
引用上面的消息正文以供快速参考,并将Unicode字符替换为十六进制等效字符(例如
\u0006
替换为\x06
),将\b
(退格转义)替换为ASCII代码0x08(BS):string
。<string>
元素的XML命名空间,实际上它也被编码为可变长度字符串,长度为字节3
(0x33或十进制51)和51个URL字符。Test AMQP message from JMS
.string
元素的结束,即XML文本中的</string>
。因此,等效的XML文本如下所示:
<string xmlns="http://schemas.microsoft.com/2003/10/Serialization/">Test AMQP message from JMS</string>
有一些库可以为您解码,例如libnbfx(C++)或msbin(Python)。