从iot集线器读取设备状态时获取connectionerrorexception

oxf4rvwz  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(305)

我正在准备一个拓扑,以便从azure中的iot hub读取设备的状态,但在读取设备的状态一分钟后,我得到了此异常的消息。
我正在设置4个工人的人数。

java.lang.RuntimeException: com.microsoft.eventhubs.client.EventHubException: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
    at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:156) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at backtype.storm.daemon.executor$fn__5624$fn__5639.invoke(executor.clj:564) ~[storm-core-0.10.0.jar:0.10.0]
    at backtype.storm.util$async_loop$fn__545.invoke(util.clj:477) [storm-core-0.10.0.jar:0.10.0]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: com.microsoft.eventhubs.client.EventHubException: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
    at com.microsoft.eventhubs.client.EventHubReceiver.ensureReceiverCreated(EventHubReceiver.java:112) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.EventHubReceiver.<init>(EventHubReceiver.java:65) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.EventHubConsumerGroup.createReceiver(EventHubConsumerGroup.java:48) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.ResilientEventHubReceiver.initialize(ResilientEventHubReceiver.java:63) ~[eventhubs-client-0.9.1.jar:?]
    at org.apache.storm.eventhubs.spout.EventHubReceiverImpl.open(EventHubReceiverImpl.java:74) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.SimplePartitionManager.open(SimplePartitionManager.java:77) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.EventHubSpout.preparePartitions(EventHubSpout.java:134) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:153) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    ... 4 more
Caused by: org.apache.qpid.amqp_1_0.client.ConnectionErrorException: At least one receiver for the endpoint is created with epoch of '12', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. TrackingId:f650897e-9cda-4b12-86ed-6e0b08cd4d43_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:EventHub:trinityhub-operationmonitoring~29490, Timestamp:11/29/2016 7:25:33 AM Reference:0204db86-8f3f-4365-9b8f-bca97e78b3fa, TrackingId:82395b51-0004-4833-ba2e-b16dc308f5db_B7, SystemTracker:iothub-ns-trinityhub-71157-43d51dd035:eventhub:trinityhub-operationmonitoring~29490|$default, Timestamp:11/29/2016 7:25:33 AM TrackingId:0d6a85ffa5a0494cbec2b2bc309776e5_G4, SystemTracker:gateway6, Timestamp:11/29/2016 7:25:34 AM
    at org.apache.qpid.amqp_1_0.client.Receiver.<init>(Receiver.java:223) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:281) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:260) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.client.Session.createReceiver(Session.java:185) ~[qpid-amqp-1-0-client-0.32.jar:0.32]
    at com.microsoft.eventhubs.client.EventHubReceiver.ensureReceiverCreated(EventHubReceiver.java:108) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.EventHubReceiver.<init>(EventHubReceiver.java:65) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.EventHubConsumerGroup.createReceiver(EventHubConsumerGroup.java:48) ~[eventhubs-client-0.9.1.jar:?]
    at com.microsoft.eventhubs.client.ResilientEventHubReceiver.initialize(ResilientEventHubReceiver.java:63) ~[eventhubs-client-0.9.1.jar:?]
    at org.apache.storm.eventhubs.spout.EventHubReceiverImpl.open(EventHubReceiverImpl.java:74) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.SimplePartitionManager.open(SimplePartitionManager.java:77) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.EventHubSpout.preparePartitions(EventHubSpout.java:134) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    at org.apache.storm.eventhubs.spout.EventHubSpout.open(EventHubSpout.java:153) ~[storm-eventhubs-0.10.0.jar:0.10.0]
    ... 4 more

喷口:创建一个螺栓并将其设置到喷口中


* //Set the WASB bolt to read from the parser output

  /*  topologyBuilder.setBolt("wasbbolt", wasbBolt, 10)
      .shuffleGrouping("EventHubsSpout")
      .setNumTasks(spoutConfig.getPartitionCount());*/
    //Parse the data from the JSON format in the Event Hub into tuples
    topologyBuilder.setBolt("healthDataBolt", new HealthDataBolt(), spoutConfig.getPartitionCount())
      .shuffleGrouping("EventHubsSpout")
      .setNumTasks(spoutConfig.getPartitionCount());*

bolt:这个是解析状态包并存储到postgres数据库的bolt。

@Override
    public void execute(Tuple tuple) {
        String data=tuple.getString(0);
        JSONObject jsonObject=new JSONObject(data);
        Iterator keys = jsonObject.keys();
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");
        //sdf.setTimeZone (TimeZone.getTimeZone ("IST"));
        String currentTime=sdf.format (now);
        while(keys.hasNext())
        {
            String key = (String) keys.next();
            if(key.equals("deviceId"))
            {
                String deviceId=(String) jsonObject.get("deviceId");
                String operationName=(String) jsonObject.get("operationName");
                String sql="update iot.device_configuration set connection_status=?,current_status_time=?,status_packet=? where licence_key=?";

                try
                {
                PreparedStatement prepareStatement = connectPostgres.prepareStatement(sql);
                prepareStatement.setString(1, operationName);
                prepareStatement.setString(2, currentTime);
                prepareStatement.setString(3, data);
                prepareStatement.setString(4, deviceId);
                boolean execute = prepareStatement.execute();
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
            }
        }

        HTable parsedPacket = null;
        try {
            parsedPacket = new HTable(hbaseConfig, "device_health_table");
        } catch (IOException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        Put data1 = new Put(Bytes.toBytes(currentTime.toString()));
        data1.add(Bytes.toBytes("details"), Bytes.toBytes("packet"),
                Bytes.toBytes(data.toString()));
        data1.add(Bytes.toBytes("details"), Bytes.toBytes("updated_time"),
                Bytes.toBytes(currentTime.toString()));
        try {
            parsedPacket.put(data1);
        } catch (RetriesExhaustedWithDetailsException | InterruptedIOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题