使用ESP-IDF和azure-iot-middleware-freertos,在IoTHub中接收来自ESP 32 S3的Azure IoTHub遥测消息,而不是路由到CosmosDB端点

lkaoscv7  于 2022-12-14  发布在  其他
关注(0)|答案(1)|浏览(136)

在使用ESP-IDF(非Arduino)和azure-iot-middleware-freertos(具体为sample_azure_iot_pnp. c)的ESP 32 S3上,ESP 32 S3-Device正在向IoTHub发送遥测数据,我可以使用Azure IoT Explorer查看到达的数据。
我已经设置了一个路由和一个自定义端点,以路由到NoSQL CosmosDB。消息没有被路由,即使在IoTExplorer的遥测窗格中显示的正文是正确的。
如果我将此正文的内容剪切并粘贴到“测试路由正文”,则路由测试符合我设置的查询条件。
我怀疑这与设置ContentEncoding =“utf-8”,ContentType =“application/json”有关。还要确保消息以utf-8编码。
示例代码使用以下函数设置遥测数据:

uint32_t ulCreateTelemetry( uint8_t * pucTelemetryData,
                            uint32_t ulTelemetryDataSize,
                            uint32_t * ulTelemetryDataLength )
{
    int result = snprintf( ( char * ) pucTelemetryData, ulTelemetryDataSize,
                            myTELEMETRY_MESSAGE, myACTION); //sampleazureiotMESSAGE, xDeviceCurrentTemperature

    if( ( result >= 0 ) && ( result < ulTelemetryDataSize ) )
    {
        *ulTelemetryDataLength = result;
        result = 0;
    }
    else
    {
        result = 1;
    }

    return result;
}

以下是RTOS任务循环中的调用函数:

/* Publish messages with QoS1, send and process Keep alive messages. */
        for( ; ; )
        {
            /* Hook for sending Telemetry */
            if( ( ulCreateTelemetry( ucScratchBuffer, sizeof( ucScratchBuffer ), &ulScratchBufferLength ) == 0 ) &&
                ( ulScratchBufferLength > 0 ) )
            {
                xResult = AzureIoTHubClient_SendTelemetry( &xAzureIoTHubClient,
                                                           ucScratchBuffer, ulScratchBufferLength,
                                                           NULL, eAzureIoTHubMessageQoS1, NULL );
                configASSERT( xResult == eAzureIoTSuccess );
            }

            /* Hook for sending update to reported properties */
            ulReportedPropertiesUpdateLength = ulCreateReportedPropertiesUpdate( ucReportedPropertiesUpdate, sizeof( ucReportedPropertiesUpdate ) );

            if( ulReportedPropertiesUpdateLength > 0 )
            {
                xResult = AzureIoTHubClient_SendPropertiesReported( &xAzureIoTHubClient, ucReportedPropertiesUpdate, ulReportedPropertiesUpdateLength, NULL );
                configASSERT( xResult == eAzureIoTSuccess );
            }

            LogInfo( ( "Attempt to receive publish message from IoT Hub.\r\n" ) );
            xResult = AzureIoTHubClient_ProcessLoop( &xAzureIoTHubClient,
                                                     sampleazureiotPROCESS_LOOP_TIMEOUT_MS );
            configASSERT( xResult == eAzureIoTSuccess );

            /* Leave Connection Idle for some time. */
            LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
            vTaskDelay( sampleazureiotDELAY_BETWEEN_PUBLISHES_TICKS );
        }

遥测数据定义如下:

#define myTELEMETRY_MESSAGE "{\"action\":\"%s\",\"ph_weight_device_mac\":\"XXXXXXXXXXXX\",\"ph_weight_index\":99,\"ph_weight_dev_nr\":0,\"ph_weight_block\":\"TEST\",\"ph_weight_activity\":\"ACTIVITY 2\",\"ph_weight_mode\":105,\"ph_weight_tag\":\"6D2517EBB4\",\"ph_weight_weight\":2.6202,\"ph_weight_date\":\"2022-12-07\",\"ph_weight_time\":\"13:45:12\"}"

myACTION为:

#define myACTION "addPackhouseWeight"

实际发布消息的Azure IoT功能:

AzureIoTResult_t AzureIoTHubClient_SendTelemetry( AzureIoTHubClient_t * pxAzureIoTHubClient,
                                                  const uint8_t * pucTelemetryData,
                                                  uint32_t ulTelemetryDataLength,
                                                  AzureIoTMessageProperties_t * pxProperties,
                                                  AzureIoTHubMessageQoS_t xQOS,
                                                  uint16_t * pusTelemetryPacketID )
{
    AzureIoTMQTTResult_t xMQTTResult;
    AzureIoTResult_t xResult;
    AzureIoTMQTTPublishInfo_t xMQTTPublishInfo = { 0 };
    uint16_t usPublishPacketIdentifier = 0;
    size_t xTelemetryTopicLength;
    az_result xCoreResult;

    if( pxAzureIoTHubClient == NULL )
    {
        AZLogError( ( "AzureIoTHubClient_SendTelemetry failed: invalid argument" ) );
        xResult = eAzureIoTErrorInvalidArgument;
    }
    else if( az_result_failed(
                 xCoreResult = az_iot_hub_client_telemetry_get_publish_topic( &pxAzureIoTHubClient->_internal.xAzureIoTHubClientCore,
                                                                              ( pxProperties != NULL ) ? &pxProperties->_internal.xProperties : NULL,
                                                                              ( char * ) pxAzureIoTHubClient->_internal.pucWorkingBuffer,
                                                                              pxAzureIoTHubClient->_internal.ulWorkingBufferLength,
                                                                              &xTelemetryTopicLength ) ) )
    {
        AZLogError( ( "Failed to get telemetry topic: core error=0x%08lx", xCoreResult ) );
        xResult = AzureIoT_TranslateCoreError( xCoreResult );
    }
    else
    {
        xMQTTPublishInfo.xQOS = xQOS == eAzureIoTHubMessageQoS1 ? eAzureIoTMQTTQoS1 : eAzureIoTMQTTQoS0;
        xMQTTPublishInfo.pcTopicName = pxAzureIoTHubClient->_internal.pucWorkingBuffer;
        xMQTTPublishInfo.usTopicNameLength = ( uint16_t ) xTelemetryTopicLength;
        xMQTTPublishInfo.pvPayload = ( const void * ) pucTelemetryData;
        xMQTTPublishInfo.xPayloadLength = ulTelemetryDataLength;

        /* Get a unique packet id. Not used if QOS is 0 */
        if( xQOS == eAzureIoTHubMessageQoS1 )
        {
            usPublishPacketIdentifier = AzureIoTMQTT_GetPacketId( &( pxAzureIoTHubClient->_internal.xMQTTContext ) );
        }

        /* Send PUBLISH packet. */
        if( ( xMQTTResult = AzureIoTMQTT_Publish( &( pxAzureIoTHubClient->_internal.xMQTTContext ),
                                                  &xMQTTPublishInfo, usPublishPacketIdentifier ) ) != eAzureIoTMQTTSuccess )
        {
            AZLogError( ( "Failed to publish telemetry: MQTT error=0x%08x", xMQTTResult ) );
            xResult = eAzureIoTErrorPublishFailed;
        }
        else
        {
            if( ( xQOS == eAzureIoTHubMessageQoS1 ) && ( pusTelemetryPacketID != NULL ) )
            {
                *pusTelemetryPacketID = usPublishPacketIdentifier;
            }

            AZLogInfo( ( "Successfully sent telemetry message" ) );
            xResult = eAzureIoTSuccess;
        }
    }

    return xResult;
}

数据显示在Azure IoT Explorer的遥测窗格中:

Fri Dec 09 2022 12:35:24 GMT+0200 (South Africa Standard Time):
{
  "body": {
    "action": "addPackhouseWeight",
    "ph_weight_device_mac": "XXXXXXXXXXXX",
    "ph_weight_index": 99,
    "ph_weight_dev_nr": 0,
    "ph_weight_block": "TEST",
    "ph_weight_activity": "ACTIVITY 2",
    "ph_weight_mode": 105,
    "ph_weight_tag": "AD2512EBB4",
    "ph_weight_weight": 2.6202,
    "ph_weight_date": "2022-12-07",
    "ph_weight_time": "13:45:12"
  },
  "enqueuedTime": "Fri Dec 09 2022 12:35:24 GMT+0200 (South Africa Standard Time)",
  "systemProperties": {
    "iothub-connection-device-id": "XXXXXXXXXXXX",
    "iothub-connection-auth-method": "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}",
    "iothub-connection-auth-generation-id": "638052575116976851",
    "iothub-enqueuedtime": 1670582124804,
    "iothub-message-source": "Telemetry",
    "dt-dataschema": "dtmi:com:loadassist:Packhouse;2"
  }
}

所以,它到达了,但可能是Base64编码和Azure IoT Explorer只是处理它并正确显示。
但是我怀疑路由不起作用,因为示例代码没有设置编码和数据类型。有Arduino的例子,他们设置和编码消息,但在这个SDK中没有任何效果。
我看不出他们是否在示例代码的其他地方这样做。
我不知道如何使用SDK和使用的MQTT WebSocket来实现这一点。
我还想设置一个属性值,这样我就可以在该属性上进行查询和路由,而不必执行“action”:“addPackhousWeight”与重量数据一起存储到cosmosDB中。
好吧,我希望我已经足够清楚了。我尝试了与微软的支持票,但它被他们关闭,没有回答。Thx

**添加的注解:**我在sample_azure_iot_pnp. c中发现以下内容,但sample_azure_iot_pnp. c中不包含这些内容:

/* Create a bag of properties for the telemetry */ 
        xResult = AzureIoTMessage_PropertiesInit( &xPropertyBag, ucPropertyBuffer, 0, sizeof( ucPropertyBuffer ) );
        configASSERT( xResult == eAzureIoTSuccess );

        xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "content-encoding", sizeof( "content-encoding" ) - 1,
                                                    ( uint8_t * ) "utf-8", sizeof( "utf-8" ) - 1 );
        configASSERT( xResult == eAzureIoTSuccess );

xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "content-type", sizeof( "content-type" ) - 1,
                                                            ( uint8_t * ) "application%2Fjson", sizeof( "application%2Fjson" ) - 1 );
                configASSERT( xResult == eAzureIoTSuccess );

或如下所示:

/* Create a bag of properties for the telemetry */ 
        xResult = AzureIoTMessage_PropertiesInit( &xPropertyBag, ucPropertyBuffer, 0, sizeof( ucPropertyBuffer ) );
        configASSERT( xResult == eAzureIoTSuccess );

        xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "contentEncoding", sizeof( "contentEncoding" ) - 1,
                                                    ( uint8_t * ) "utf-8", sizeof( "utf-8" ) - 1 );
        configASSERT( xResult == eAzureIoTSuccess );

xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "contentType", sizeof( "contentType" ) - 1,
                                                            ( uint8_t * ) "application%2Fjson", sizeof( "application%2Fjson" ) - 1 );
                configASSERT( xResult == eAzureIoTSuccess );

尝试了content-type和contentType等,但还是无法路由...

o7jaxewo

o7jaxewo1#

最后我让它工作:

xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "$.ce", sizeof( "$.ce" ) - 1,
                                                    ( uint8_t * ) "utf-8", sizeof( "utf-8" ) - 1 );
        configASSERT( xResult == eAzureIoTSuccess );

xResult = AzureIoTMessage_PropertiesAppend( &xPropertyBag, ( uint8_t * ) "$.ct", sizeof( "$.ct" ) - 1,
                                                            ( uint8_t * ) "application%2Fjson", sizeof( "application%2Fjson" ) - 1 );
                configASSERT( xResult == eAzureIoTSuccess );

我在这里找到了解决方案:https://azure.microsoft.com/sv-se/blog/iot-hub-message-routing-now-with-routing-on-message-body/

因此,添加“$.ce”和“$.ct”有助于让它工作。字面上花了我几天的时间。希望它能帮助其他人。
我还学到了:

/**
         *   @note The properties init API will not encode properties. In order to support
         *       the following characters, they must be percent-encoded (RFC3986) as follows:
         *         - `/` : `%2F`
         *         - `%` : `%25`
         *         - `#` : `%23`
         *         - `&` : `%26`
         *       Only these characters would have to be encoded. If you would like to avoid the need to
         *       encode the names/values, avoid using these characters in names and values.
         */

相关问题