使用Python错误从Azure IoT Hub向Digital Twins接收远程作业数据

j2datikz  于 2023-08-07  发布在  Python
关注(0)|答案(1)|浏览(97)

我想使用Python而不是C#(我没有C#经验)将遥测数据从IOT中心发送到Azure数字孪生,但我无法获得它。
我已经遵循了Microsoft(https://learn.microsoft.com/en-us/azure/digital-twins/how-to-ingest-iot-hub-data)的完整示例,但没有成功。
这是我的模拟装置代码。我检查了一下,它成功地将消息发送到IoT Hub:

from azure.iot.device import IoTHubDeviceClient
import json
import random
import time

CONNECTION_STRING = "HostName=nmslhub.azure-devices.net;DeviceId=factory110503;SharedAccessKey=okok4Y2Ri3dhsW5h5JSZlijnE2SHyxPA187AOBIMVHA="

def simulate_telemetry(device_client):
    while True:
        temperature = random.randint(20, 30)
        humidity = random.randint(60, 80)

        telemetry = {
            "temperature": temperature,
            "humidity": humidity
        }

        message = json.dumps(telemetry)
        device_client.send_message(message)

        print(f"Sent message: {message}")
        time.sleep(1)

if __name__ == "__main__":
    device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
    device_client.connect()

    simulate_telemetry(device_client)

字符串
这是使用python和事件网格触发器的函数app的代码(我遵循教程链接中的C#代码):

import os 
import logging
import json
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient
from azure.core.exceptions import ResourceNotFoundError
import azure.functions as func

adt_instance_url = os.getenv("DIGITAL_TWINS_URL")
if adt_instance_url is None:
    logging.error("Application setting 'DIGITAL_TWINS_URL' not set")

async def main(event: func.EventGridEvent):
    try:
        # Authenticate with Digital Twins
        credential = DefaultAzureCredential()
        client = DigitalTwinsClient(adt_instance_url, credential)
        logging.info("ADT service client connection created.")

        if event is not None and event.data is not None:
            logging.info(event.data)

            # Find device ID and temperature
            device_message = event.get_json()
            device_id = device_message["systemProperties"]["iothub-connection-device-id"]
            temperature = device_message["body"]["temperature"]
            humidity = device_message["body"]["humidity"]
            
            logging.info(f"Device: {device_id} Temperature is: {temperature} Humidity is : {humidity}")

            # Update twin with device temperature
            twin_id = device_id
            patch = [
                {
                    "op": "replace",
                    "path": "/Temperature",
                    "value": temperature
                },
               {
                    "op": "replace",
                    "path": "/Humidity",
                    "value": humidity
                }
            ]
            await client.update_digital_twin(twin_id, patch)
    except ResourceNotFoundError:
        logging.error(f"Digital twin with ID {twin_id} not found.")
    except Exception as ex:
        logging.error(f"Error in ingest function: {ex}")


我还创建了事件订阅来连接IoT中心和Function App。我遵循教程中的所有步骤。
当我在VS代码(Ubuntu 20.04)上本地运行函数应用程序时,它成功执行,但在中间有这个错误:
第一个月
我只使用这2个文件的整个项目的代码。
我在Azure Digital Twins资源管理器上运行查询,但未看到双胞胎按预期更新。
我应该添加一些其他的东西(代码)或者我可以改变什么?我认为问题可能是缺少数字双胞胎作为输出或物联网中心作为输入。

kkih6yb8

kkih6yb81#

我已经用Python代码测试了遥测摄取,这里是我测试过的一个示例。
1 IoT Hub数据模拟示例

import os
import random
import time
from datetime import date, datetime
import json
from azure.iot.device import IoTHubDeviceClient, Message

CONNECTION_STRING = "<Device Connection String>"
TEMPERATURE = 35.0
HUMIDITY = 60
MSG_TXT = '{{"Temperature": {Temperature},"Humidity": {Humidity}}}'

def run_telemetry_sample(client):
    print("IoT Hub device sending periodic messages")

    client.connect()

    while True:
        temperature = TEMPERATURE + (random.random() * 15)
        humidity = HUMIDITY + (random.random() * 20)                     
        msg_txt_formatted = MSG_TXT.format(
        Temperature=temperature, Humidity=humidity)
    
        message = Message(msg_txt_formatted, content_encoding="utf-8", content_type="application/json")
        if temperature > 30:
            message.custom_properties["temperatureAlert"] = "true"
        else:
            message.custom_properties["temperatureAlert"] = "false"
    
        print("Sending message: {}".format(message))
        client.send_message(message)
        print("Message successfully sent")
        time.sleep(30)

def main():
    print("IoT Hub Quickstart #1 - Simulated device")
    print("Press Ctrl-C to exit")

    client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)

    try:
        run_telemetry_sample(client)
    except KeyboardInterrupt:
        print("IoTHubClient sample stopped by user")
    finally:
        print("Shutting down IoTHubClient")
        client.shutdown()

if __name__ == '__main__':
    main()

字符串
请确保在上述代码中提供正确的IoT Hub设备连接字符串
2 Azure事件网格触发函数

import json
import logging
import azure.functions as func
import os
from azure.identity import DefaultAzureCredential
from azure.digitaltwins.core import DigitalTwinsClient

adt_instance_url = "<SDT instance URL>"
credential = DefaultAzureCredential()
client = DigitalTwinsClient(adt_instance_url, credential)

async def main(event: func.EventGridEvent):
    
    if not adt_instance_url:
        raise ValueError("Application setting \"ADT_SERVICE_URL\" not set")

    try:
        # Authenticate with Digital Twins

        logging.info("ADT service client connection created.")
        result = json.dumps({
            'id': event.id,
            'data': event.get_json(),
            'topic': event.topic,
            'subject': event.subject,
            'event_type': event.event_type,
        })
        data = json.loads(result)

        if result and data:                

            # <Find_device_ID_and_temperature>
            device_message = data["data"]
            logging.info('Python EventGrid trigger processed an event: %s', device_message)
            device_id = device_message["systemProperties"]["iothub-connection-device-id"]
            temperature = device_message["body"]["Temperature"]
            # </Find_device_ID_and_temperature>

            logging.info(f"Device:{device_id} Temperature is:{temperature}")

            # <Update_twin_with_device_temperature>
            update_twin_data = [{"op": "replace", "path": "/Temperature", "value": temperature}]
            await client.update_digital_twin(device_id, update_twin_data)
            # </Update_twin_with_device_temperature>
    except Exception as ex:
        logging.info("Error in ingest function: %s", str(ex))


请为连接字符串提供正确的ADT示例URL。请注意,我在代码中直接提供了URL。但是,建议使用您尝试遵循的方法,并通过Environment变量将URL传递给代码。
我已将代码发布到Azure函数应用程序,并为函数应用程序分配了“Azure Digital Twins Data Owner”角色访问权限。我还配置了功能应用程序,以从IoT Hub接收遥测事件数据。请参阅分配访问角色和将功能连接到IoT Hub部分了解执行此操作所需的步骤。
一旦我配置好了一切,我就可以看到Azure云函数接收到的事件,并可以更新Azure数字孪生模型。请参考下图以供参考。


的数据



还请确保Azure Digital Twin具有加载到温度属性的初始值,因为我们正在使用replace方法修补程序更新Twin。

相关问题