wwh 2 лет назад
Родитель
Сommit
3080d53ccc

+ 12 - 3
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/subscribe/ConsumerMQTT.java

@@ -1,6 +1,7 @@
 package com.huimv.env.manage.saas.mqtt.subscribe;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.context.annotation.Configuration;
 
@@ -28,15 +29,15 @@ public class ConsumerMQTT {
             //设置为0,防止ERROR
             options.setExecutorServiceTimeout(0);
             //设置断开后重新连接
-            options.setAutomaticReconnect(true);
+            options.setAutomaticReconnect(false);
             // 设置回调
-            client.setCallback(new PushCallbackTest());
+            client.setCallback(new PushCallback());
 
             client.connect(options);
 
             //订阅消息
 //            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
-            client.subscribe(topic,2);
+            client.subscribe(topic,1);
 
             client.disconnect();
 
@@ -45,6 +46,14 @@ public class ConsumerMQTT {
         }
     }
 
+    public void disConnect(String topic) {
+        try {
+            client.unsubscribe(topic);
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
 //    public static void main(String[] args) {
 //        ConsumerMQTT consumerMQTT = new ConsumerMQTT();
 //        consumerMQTT.start();

+ 17 - 4
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/subscribe/PushCallback.java

@@ -12,14 +12,13 @@ import com.huimv.env.common.service.IBaseThresholdService;
 import com.huimv.env.manage.entity.*;
 import com.huimv.env.manage.saas.dao.entity.EnvDeviceEquipment;
 import com.huimv.env.manage.saas.mqtt.publish.PublishMQTT;
+import com.huimv.env.manage.saas.mqtt.uplod.UploadMqtt;
 import com.huimv.env.manage.saas.mqtt.utils.SpringUtil;
 import com.huimv.env.manage.saas.service.IEnvDeviceEquipmentService;
 import com.huimv.env.manage.service.*;
 import com.mysql.cj.util.StringUtils;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.*;
 import org.springframework.stereotype.Component;
 
 import java.math.BigDecimal;
@@ -30,13 +29,26 @@ import java.util.*;
 @Component
 //接收消息回调
 @Slf4j
-public class PushCallback implements MqttCallback {
+public class PushCallback implements MqttCallbackExtended {
 
+    @Override
+    public void connectComplete(boolean b, String s) {
+        System.out.println("重新订阅!");
 
+    }
     @Override
     public void connectionLost(Throwable cause) {
         // 连接丢失后,一般在这里面进行重连
         System.out.println("连接断开,可以做重连");
+        IEnvDeviceRegisterService deviceRegisterService = SpringUtil.getBean(IEnvDeviceRegisterService.class);
+        List<EnvDeviceRegister> list = deviceRegisterService.list();
+        List<String> chipIds = new ArrayList<>();
+        for (EnvDeviceRegister deviceRegister : list) {
+            String chipId = deviceRegister.getChipId();
+            chipIds.add(chipId);
+        }
+        UploadMqtt.test(chipIds);
+
     }
 
     @Override
@@ -215,4 +227,5 @@ public class PushCallback implements MqttCallback {
     }
 
 
+
 }

+ 69 - 69
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/subscribe/PushCallbackTest.java

@@ -1,69 +1,69 @@
-package com.huimv.env.manage.saas.mqtt.subscribe;
-
-import cn.hutool.core.util.ObjectUtil;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.huimv.env.common.entity.BaseThreshold;
-import com.huimv.env.common.service.IBaseThresholdService;
-import com.huimv.env.manage.entity.*;
-import com.huimv.env.manage.saas.dao.entity.EnvDeviceEquipment;
-import com.huimv.env.manage.saas.mqtt.publish.PublishMQTT;
-import com.huimv.env.manage.saas.mqtt.utils.SpringUtil;
-import com.huimv.env.manage.saas.service.IEnvDeviceEquipmentService;
-import com.huimv.env.manage.service.*;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.stereotype.Component;
-
-import java.math.BigDecimal;
-import java.time.ZonedDateTime;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Component
-//接收消息回调
-@Slf4j
-public class PushCallbackTest implements MqttCallback {
-
-
-    @Override
-    public void connectionLost(Throwable cause) {
-        // 连接丢失后,一般在这里面进行重连
-        System.out.println("连接断开,可以做重连");
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        System.out.println("deliveryComplete---------" + token.isComplete());
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-        // subscribe后得到的消息会执行到这里面
-        System.out.println("接收消息主题 : " + topic);
-        System.out.println("接收消息Qos : " + message.getQos());
-        System.out.println("接收消息内容 : " + new String(message.getPayload()));
-        System.out.println("开始处理当前数据...");
-
-        String s = new String(message.getPayload());
-        IEnvTopicService envTopicService = SpringUtil.getBean(IEnvTopicService.class);
-        Date date = new Date();
-        EnvTopic envTopic = new EnvTopic();
-        envTopic.setTime(date);
-        if ("sync_time".equals(s)) {
-            envTopic.setTopic(0);
-            System.out.println("收到控制响应");
-            System.out.println("数据处理完成!");
-        } else {
-            envTopic.setTopic(1);
-            System.out.println("操作失败");
-        }
-        envTopicService.save(envTopic);
-    }
-}
+//package com.huimv.env.manage.saas.mqtt.subscribe;
+//
+//import cn.hutool.core.util.ObjectUtil;
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONArray;
+//import com.alibaba.fastjson.JSONObject;
+//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+//import com.huimv.env.common.entity.BaseThreshold;
+//import com.huimv.env.common.service.IBaseThresholdService;
+//import com.huimv.env.manage.entity.*;
+//import com.huimv.env.manage.saas.dao.entity.EnvDeviceEquipment;
+//import com.huimv.env.manage.saas.mqtt.publish.PublishMQTT;
+//import com.huimv.env.manage.saas.mqtt.utils.SpringUtil;
+//import com.huimv.env.manage.saas.service.IEnvDeviceEquipmentService;
+//import com.huimv.env.manage.service.*;
+//import lombok.extern.slf4j.Slf4j;
+//import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+//import org.eclipse.paho.client.mqttv3.MqttCallback;
+//import org.eclipse.paho.client.mqttv3.MqttMessage;
+//import org.springframework.stereotype.Component;
+//
+//import java.math.BigDecimal;
+//import java.time.ZonedDateTime;
+//import java.util.Date;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//@Component
+////接收消息回调
+//@Slf4j
+//public class PushCallbackTest implements MqttCallback {
+//
+//
+//    @Override
+//    public void connectionLost(Throwable cause) {
+//        // 连接丢失后,一般在这里面进行重连
+//        System.out.println("连接断开,可以做重连");
+//    }
+//
+//    @Override
+//    public void deliveryComplete(IMqttDeliveryToken token) {
+//        System.out.println("deliveryComplete---------" + token.isComplete());
+//    }
+//
+//    @Override
+//    public void messageArrived(String topic, MqttMessage message) throws Exception {
+//        // subscribe后得到的消息会执行到这里面
+//        System.out.println("接收消息主题 : " + topic);
+//        System.out.println("接收消息Qos : " + message.getQos());
+//        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+//        System.out.println("开始处理当前数据...");
+//
+//        String s = new String(message.getPayload());
+//        IEnvTopicService envTopicService = SpringUtil.getBean(IEnvTopicService.class);
+//        Date date = new Date();
+//        EnvTopic envTopic = new EnvTopic();
+//        envTopic.setTime(date);
+//        if ("sync_time".equals(s)) {
+//            envTopic.setTopic(0);
+//            System.out.println("收到控制响应");
+//            System.out.println("数据处理完成!");
+//        } else {
+//            envTopic.setTopic(1);
+//            System.out.println("操作失败");
+//        }
+//        envTopicService.save(envTopic);
+//    }
+//}

+ 48 - 12
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/uplod/UploadMqtt.java

@@ -18,21 +18,19 @@ import java.util.List;
 @Data
 public class UploadMqtt extends Thread {
     public static final String HOST = "tcp://115.238.57.190:1883";
-    private  String clientid ;
-    private MqttClient client;
-    private MqttConnectOptions options;
+    private String clientid ;
+    private static MqttClient client;
+    private static MqttConnectOptions options;
 
-    private String userName = "admin";    //非必须
-    private String passWord = "admin";  //非必须
+    private static String userName = "admin";    //非必须
+    private static String passWord = "admin";  //非必须
     private List<String> topics ;
 
 
-    @SneakyThrows
+//    @SneakyThrows
     @Override
-    public synchronized void  run() {
+    public  void  run() {
         try {
-
-
             // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
             client = new MqttClient(HOST, "subscribe", new MemoryPersistence());
             // MQTT的连接设置
@@ -44,11 +42,13 @@ public class UploadMqtt extends Thread {
             // 设置连接的密码
             options.setPassword(passWord.toCharArray());
             // 设置超时时间 单位为秒
-            options.setConnectionTimeout(10);
+            options.setConnectionTimeout(60);
             // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(20);
+            options.setKeepAliveInterval(30);
+
             //设置断开后重新连接
-            options.setAutomaticReconnect(true);
+            options.setAutomaticReconnect(false);
+
             // 设置回调
             client.setCallback(new PushCallback());
 
@@ -65,6 +65,42 @@ public class UploadMqtt extends Thread {
             e.printStackTrace();
         }
 
+    }
 
+    public static void test(List<String> topics) {
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, "subscribe", new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(60);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(30);
+
+            //设置断开后重新连接
+            options.setAutomaticReconnect(true);
+
+            // 设置回调
+            client.setCallback(new PushCallback());
+
+            client.connect(options);
+
+            //订阅消息
+//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+            for (String topic : topics) {
+                System.out.println("重新走到了这里开始订阅!");
+                client.subscribe("huimv_up_"+topic, 1);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
 }

+ 1 - 1
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/uplod/UploadMqttStart.java

@@ -27,7 +27,7 @@ public class UploadMqttStart {
         }
         UploadMqtt uploadmqtt = new UploadMqtt();
         uploadmqtt.setTopics(chipIds);
-        uploadmqtt.start();
+        uploadmqtt.run();
     }
 
 }

+ 7 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/service/impl/EnvDeviceRegisterServiceImpl.java

@@ -15,6 +15,7 @@ import com.huimv.env.manage.saas.dao.entity.BasePigpen;
 import com.huimv.env.manage.mapper.EnvDeviceEquipmentMapper;
 import com.huimv.env.manage.saas.dao.entity.EnvDeviceEquipment;
 import com.huimv.env.manage.saas.dao.entity.LampConfig;
+import com.huimv.env.manage.saas.mqtt.subscribe.ConsumerMQTT;
 import com.huimv.env.manage.service.IEnvDeviceRegisterService;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.huimv.env.manage.utils.Result;
@@ -122,6 +123,8 @@ public class EnvDeviceRegisterServiceImpl extends ServiceImpl<EnvDeviceRegisterM
             envDeviceEquipmentMapper.insert(envDeviceEquipment);
             port++;
         }
+        ConsumerMQTT consumerMQTT = new ConsumerMQTT();
+        consumerMQTT.start("tcp://115.238.57.190:1883","newDevice","admin","admin","huimv_up_"+deviceCode);
 
         return new Result(ResultCode.SUCCESS,true);
     }
@@ -138,6 +141,8 @@ public class EnvDeviceRegisterServiceImpl extends ServiceImpl<EnvDeviceRegisterM
     @Transactional
     public Result unbindingBaseDeviceCofig(Map<String, String> map, HttpServletRequest request) {
         String deviceCode = map.get("deviceCode");
+
+        EnvDeviceRegister envDeviceRegister = envDeviceRegisterMapper.selectOne(new QueryWrapper<EnvDeviceRegister>().eq("device_code", deviceCode));
         EnvDeviceRegister deviceRegister = new EnvDeviceRegister();
         deviceRegister.setUnitId("0");
         deviceRegister.setBinding(0);
@@ -149,6 +154,8 @@ public class EnvDeviceRegisterServiceImpl extends ServiceImpl<EnvDeviceRegisterM
         envDeviceEquipment.setUploadStart(0);
         envDeviceEquipment.setBinding(0);
         envDeviceEquipmentMapper.update(envDeviceEquipment,new UpdateWrapper<EnvDeviceEquipment>().eq("device_code",deviceCode));
+        ConsumerMQTT consumerMQTT = new ConsumerMQTT();
+        consumerMQTT.disConnect("huimv_up_"+envDeviceRegister.getChipId());
         return new Result(10000,"解绑成功",true);
     }