wwh hace 1 año
padre
commit
78057a7f69
Se han modificado 16 ficheros con 1042 adiciones y 397 borrados
  1. 91 3
      huimv-admin/src/main/java/com/huimv/guowei/admin/controller/EnvDeviceEquipmentController.java
  2. 4 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/entity/EnvDeviceEquipment.java
  3. 36 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/MqttStart.java
  4. 65 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PublishMQTT.java
  5. 71 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PublishMQTTTree.java
  6. 162 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PushCallback.java
  7. 62 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/UploadMqtt.java
  8. 210 51
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/back/PushCallback.java
  9. 1 0
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/moveScale/SendData.java
  10. 79 79
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/pulish/PublishMQTT.java
  11. 71 71
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/pulish/PublishMQTTCommand.java
  12. 103 102
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTT.java
  13. 61 61
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTTCommand.java
  14. 21 28
      huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTTStart.java
  15. 1 1
      huimv-admin/src/main/java/com/huimv/guowei/admin/service/impl/BaseDuckInfoServiceImpl.java
  16. 4 1
      huimv-admin/src/main/java/com/huimv/guowei/admin/service/impl/EnvRegularCallFeedingServiceImpl.java

+ 91 - 3
huimv-admin/src/main/java/com/huimv/guowei/admin/controller/EnvDeviceEquipmentController.java

@@ -1,10 +1,22 @@
 package com.huimv.guowei.admin.controller;
 
 
-import org.springframework.web.bind.annotation.CrossOrigin;
-import org.springframework.web.bind.annotation.RequestMapping;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.huimv.guowei.admin.common.utils.Result;
+import com.huimv.guowei.admin.entity.EnvDevice;
+import com.huimv.guowei.admin.entity.EnvDeviceEquipment;
+import com.huimv.guowei.admin.mqtt.EnvControll.PublishMQTTTree;
+import com.huimv.guowei.admin.service.IEnvDeviceEquipmentService;
+import com.huimv.guowei.admin.service.IEnvDeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
 
-import org.springframework.web.bind.annotation.RestController;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 /**
  * <p>
@@ -17,6 +29,82 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping("/env-device-equipment")
 @CrossOrigin
+@Slf4j
 public class EnvDeviceEquipmentController {
+    @Autowired
+    private IEnvDeviceEquipmentService equipmentService;
+    @Autowired
+    private IEnvDeviceService deviceService;
 
+    @PostMapping("/update")
+    public Result update(@RequestBody EnvDeviceEquipment envDeviceEquipment) {
+        try {
+            Date date = new Date();
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            System.out.println("----这个是开始请求的时间:"+sdf.format(date));
+            equipmentService.updateById(envDeviceEquipment);
+            EnvDeviceEquipment byId = equipmentService.getById(envDeviceEquipment.getId());
+            String deviceCode = byId.getDeviceCode();
+            System.out.println("设备" + deviceCode);
+            QueryWrapper<EnvDevice> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("id", deviceCode);
+            EnvDevice one = deviceService.getOne(queryWrapper);
+            String chipId = one.getDeviceCode();
+            log.info("-------获取设备编码:" + chipId);
+            log.info("-----开始配置消息----");
+            QueryWrapper<EnvDeviceEquipment> queryWrapper1 = new QueryWrapper<>();
+            queryWrapper1.eq("device_code", deviceCode).orderByDesc("equipment_port");
+            List<EnvDeviceEquipment> list = equipmentService.list(queryWrapper1);
+            StringBuilder deviceStatus = new StringBuilder();
+            for (EnvDeviceEquipment envDeviceEquipment1 : list) {
+                deviceStatus.append(envDeviceEquipment1.getUploadStart());
+            }
+            String s = deviceStatus.toString();
+            int i = Integer.parseInt(s, 2);
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("dev_ctrl", i);
+            log.info("消息配置完成开始发送:" + jsonObject.toJSONString());
+            PublishMQTTTree publishMQTTTree = new PublishMQTTTree();
+            Boolean start = publishMQTTTree.start("tcp://115.238.57.190:1883", "GW" + chipId, "admin", "admin", "huimv_down_" + chipId, jsonObject.toJSONString());
+
+
+            if (start) {
+                Thread.sleep(3000);//线程等待3秒
+                EnvDeviceEquipment byId1 = equipmentService.getById(envDeviceEquipment.getId());
+                Date updateTime = byId1.getUpdateTime();
+                System.out.println("-----这个是修改后的时间:"+updateTime);
+                int i1 = updateTime.compareTo(date);
+                if (i1 > 0) {
+                    QueryWrapper<EnvDeviceEquipment> queryWrapper2 = new QueryWrapper<>();
+                    queryWrapper2.eq("device_code", deviceCode).orderByDesc("equipment_port");
+                    List<EnvDeviceEquipment> list1 = equipmentService.list(queryWrapper2);
+                    for (EnvDeviceEquipment equipment : list1) {
+                        equipmentService.updateById(equipment);
+                    }
+                    return new Result(10000, "修改成功", true);
+                } else {
+                    if (envDeviceEquipment.getUploadStart() == 1) {
+                        envDeviceEquipment.setUploadStart(0);
+                    } else {
+                        envDeviceEquipment.setUploadStart(1);
+                    }
+                    equipmentService.updateById(envDeviceEquipment);
+                    return new Result(10001, "设备掉线,请等待设备重连", false);
+                }
+            } else {
+                if (envDeviceEquipment.getUploadStart() == 1) {
+                    envDeviceEquipment.setUploadStart(0);
+                } else {
+                    envDeviceEquipment.setUploadStart(1);
+                }
+                equipmentService.updateById(envDeviceEquipment);
+                return new Result(10001, "网络连接超时,请等待5秒后重试", false);
+            }
+
+        } catch (Exception e) {
+            System.out.println("22222222222" + e);
+            return new Result(10001, "修改失败", false);
+        }
+
+    }
 }

+ 4 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/entity/EnvDeviceEquipment.java

@@ -4,6 +4,8 @@ import com.baomidou.mybatisplus.annotation.TableName;
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import java.io.Serializable;
+import java.util.Date;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
@@ -65,5 +67,7 @@ public class EnvDeviceEquipment implements Serializable {
 
     private Integer equipmentType;
 
+    /*响应时间*/
+    private Date updateTime;
 
 }

+ 36 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/MqttStart.java

@@ -0,0 +1,36 @@
+package com.huimv.guowei.admin.mqtt.EnvControll;
+
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.huimv.guowei.admin.entity.EnvDevice;
+import com.huimv.guowei.admin.service.IEnvDeviceService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class MqttStart {
+
+    @Autowired
+    private IEnvDeviceService deviceService;
+
+    @PostConstruct
+    public  void  test() {
+        QueryWrapper<EnvDevice> queryWrapper = new QueryWrapper<>();
+        queryWrapper.eq("device_type", 1).eq("farm_id", 21).eq("device_status",1);
+        List<EnvDevice> list = deviceService.list(queryWrapper);
+
+        List<String> chipIds = new ArrayList<>();
+        for (EnvDevice device : list) {
+            String chipId = device.getDeviceCode();
+            chipIds.add(chipId);
+        }
+        UploadMqtt uploadmqtt = new UploadMqtt();
+        uploadmqtt.setTopics(chipIds);
+        uploadmqtt.run();
+    }
+
+}

+ 65 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PublishMQTT.java

@@ -0,0 +1,65 @@
+package com.huimv.guowei.admin.mqtt.EnvControll;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import java.sql.*;
+
+@Configuration
+@Component
+public class PublishMQTT {
+
+//    public static final String HOST = "tcp://115.238.57.190:1883";
+//    private static final String clientid = "publish";
+    private MqttClient client;
+    private MqttConnectOptions options;
+//
+//    private String userName = "admin";    //非必须
+//    private String passWord = "admin";  //非必须
+
+    public void start(String HOST,String clientid,String userName,String passWord,String topic,String message) {
+//        MqttClient client;
+//        MqttConnectOptions options;
+
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            //设置断开后重新连接
+            options.setAutomaticReconnect(true);
+
+            client.connect(options);
+
+//            pubMessage(client,"list",topic);
+            MqttMessage mess = new MqttMessage();
+            mess.setQos(1);
+            mess.setRetained(true);
+            mess.setPayload(message.getBytes());
+            try {
+                client.publish(topic, mess);
+                client.disconnect();
+            } catch (Exception e) {
+//                LOGGER.error(e.getLocalizedMessage());
+                System.out.println("1111111111"+e);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 71 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PublishMQTTTree.java

@@ -0,0 +1,71 @@
+package com.huimv.guowei.admin.mqtt.EnvControll;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+@Configuration
+@Component
+public class PublishMQTTTree {
+
+    //    public static final String HOST = "tcp://115.238.57.190:1883";
+//    private static final String clientid = "publish";
+    private MqttClient client;
+    private MqttConnectOptions options;
+//
+//    private String userName = "admin";    //非必须
+//    private String passWord = "admin";  //非必须
+
+    public Boolean start(String HOST, String clientid, String userName, String passWord, String topic, String message) {
+//        MqttClient client;
+//        MqttConnectOptions options;
+
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            //设置断开后重新连接
+            options.setAutomaticReconnect(true);
+
+            client.connect(options);
+
+//            pubMessage(client,"list",topic);
+            MqttMessage mess = new MqttMessage();
+            mess.setQos(1);
+            mess.setRetained(true);
+            mess.setPayload(message.getBytes());
+
+            return topublish(client, topic, mess);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    private Boolean topublish(MqttClient client, String topic, MqttMessage mess) {
+        try {
+            client.publish(topic, mess);
+            client.disconnect();
+            return true;
+        } catch (Exception e) {
+//                LOGGER.error(e.getLocalizedMessage());
+            System.out.println(e.getLocalizedMessage());
+            return false;
+        }
+    }
+}

+ 162 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/PushCallback.java

@@ -0,0 +1,162 @@
+package com.huimv.guowei.admin.mqtt.EnvControll;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.huimv.guowei.admin.entity.EnvWarningInfo;
+import com.huimv.guowei.admin.service.IEnvWarningInfoService;
+import com.huimv.guowei.admin.utils.SpringUtil;
+import com.huimv.guowei.admin.entity.EnvDevice;
+import com.huimv.guowei.admin.entity.EnvDeviceEquipment;
+import com.huimv.guowei.admin.service.IEnvDeviceEquipmentService;
+import com.huimv.guowei.admin.service.IEnvDeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.time.ZonedDateTime;
+import java.util.*;
+
+@Component
+//接收消息回调
+@Slf4j
+public class PushCallback implements MqttCallbackExtended {
+
+    @Override
+    public void connectComplete(boolean b, String s) {
+        System.out.println("重新订阅!");
+
+    }
+    @Override
+    public void connectionLost(Throwable cause) {
+        // 连接丢失后,一般在这里面进行重连
+        System.out.println("连接断开,可以做重连");
+
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        System.out.println("deliveryComplete---------" + token.isComplete());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        // subscribe后得到的消息会执行到这里面
+        System.out.println("接收消息主题 : " + topic);
+        System.out.println("接收消息Qos : " + message.getQos());
+        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+        System.out.println("开始处理当前数据...");
+
+        String[] split = topic.split("_");
+        String chip = split[2];
+        System.out.println(chip);
+        String messages = new String(message.getPayload());
+        if ("sync_time".equals(messages)) {
+            log.info("-----开始配置同步时间----");
+            JSONObject jsonObject1 = new JSONObject();
+            ZonedDateTime ZonedDateTime = java.time.ZonedDateTime.now();
+            Map<String, Object> map = new HashMap<>();
+            map.put("year", ZonedDateTime.getYear());
+            map.put("mon", ZonedDateTime.getMonthValue());
+            map.put("date", ZonedDateTime.getDayOfMonth());
+            map.put("hour", ZonedDateTime.getHour());
+            map.put("min", ZonedDateTime.getMinute());
+            map.put("sec", ZonedDateTime.getSecond());
+            jsonObject1.put("sync_time", map);
+            System.out.println(jsonObject1);
+            try {
+                PublishMQTT publishMQTT = new PublishMQTT();
+                publishMQTT.start("tcp://115.238.57.190:1883", "GUOWEIpublish", "admin", "admin", "huimv_down_" + chip, jsonObject1.toJSONString());
+            } catch (Exception e) {
+                System.out.println(e);
+            }
+        }else if ("alarm_cfg".equals(messages)) {
+            log.info("高低温报警不做处理!!");
+        } else if ("get_dev_ctrl".equals(messages)) {
+            JSONObject jsonObject = new JSONObject();
+            log.info("-----开始配置设备控制消息----");
+            QueryWrapper<EnvDevice> queryWrapper1 = new QueryWrapper<>();
+            queryWrapper1.eq("device_code", chip);
+            IEnvDeviceService envDeviceService = SpringUtil.getBean(IEnvDeviceService.class);
+            EnvDevice one = envDeviceService.getOne(queryWrapper1);
+
+            IEnvDeviceEquipmentService envDeviceEquipmentService = SpringUtil.getBean(IEnvDeviceEquipmentService.class);
+            QueryWrapper<EnvDeviceEquipment> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("device_code", one.getId()).orderByDesc("equipment_port");
+            List<EnvDeviceEquipment> list = envDeviceEquipmentService.list(queryWrapper);
+            String deviceStatus = "";
+            for (EnvDeviceEquipment envDeviceEquipment1 : list) {
+                deviceStatus = deviceStatus + envDeviceEquipment1.getUploadStart();
+            }
+            String s = deviceStatus;
+            int i = Integer.parseInt(s, 2);
+            jsonObject.put("dev_ctrl", i);
+            log.info("设备配置完成开始发送:" + i);
+            PublishMQTT publishMQTT = new PublishMQTT();
+            publishMQTT.start("tcp://115.238.57.190:1883", "GUOWEIpublish", "admin", "admin", "huimv_down_" + chip, jsonObject.toJSONString());
+            log.info("---设备控制发送完成---");
+        }  else if ("dev_ctrl_suc".equals(messages)) {
+            QueryWrapper<EnvDevice> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("device_code", chip);
+            IEnvDeviceService envDeviceService = SpringUtil.getBean(IEnvDeviceService.class);
+            EnvDevice one = envDeviceService.getOne(queryWrapper);
+
+            IEnvDeviceEquipmentService envDeviceEquipmentService = SpringUtil.getBean(IEnvDeviceEquipmentService.class);
+            QueryWrapper<EnvDeviceEquipment> queryWrapper1 = new QueryWrapper<>();
+            queryWrapper1.eq("device_code", one.getId()).orderByDesc("equipment_port");
+            List<EnvDeviceEquipment> list = envDeviceEquipmentService.list(queryWrapper1);
+            for (EnvDeviceEquipment equipment : list) {
+                equipment.setUpdateTime(new Date());
+                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                System.out.println("----这个是应答时间:"+sdf.format(new Date()));
+                envDeviceEquipmentService.updateById(equipment);
+                System.out.println("-----收到控制响应后修改标识成功!!");
+            }
+
+            System.out.println("收到控制响应");
+        } else {
+            JSONObject jsonObject = JSON.parseObject(messages);
+            System.out.println("这里是1111:" + jsonObject);
+             if (ObjectUtil.isNotEmpty(jsonObject.get("alarm_event"))) {
+                log.info("----报警信息开始保存---");
+                JSONObject jsonObject1 = (JSONObject) jsonObject.get("alarm_event");
+                try {
+                    IEnvWarningInfoService envWarningInfoService = SpringUtil.getBean(IEnvWarningInfoService.class);
+                    QueryWrapper<EnvDevice> queryWrapper = new QueryWrapper<>();
+                    queryWrapper.eq("device_code", chip);
+                    IEnvDeviceService envDeviceService = SpringUtil.getBean(IEnvDeviceService.class);
+                    EnvDevice one = envDeviceService.getOne(queryWrapper);
+
+                    EnvWarningInfo warningInfo = new EnvWarningInfo();
+                    warningInfo.setWarningType(4);
+                    warningInfo.setBuildLocation(one.getBuildLocation());
+                    warningInfo.setUnitId(one.getUnitId());
+                    warningInfo.setDate(new Date());
+                    warningInfo.setFarmId(one.getFarmId());
+                    Integer type = (Integer) jsonObject1.get("sta");
+                    if (type == 1) {
+                        warningInfo.setWarningContent(one.getBuildLocation()+"的设备断电报警!");
+                    } else {
+                        warningInfo.setWarningContent(one.getBuildLocation()+"的设备恢复供电!");
+                    }
+                    envWarningInfoService.save(warningInfo);
+                } catch (Exception e) {
+                    System.out.println("报警信息保存异常!!!");
+                }
+
+                log.info("----报警信息保存成功!----");
+            }
+        }
+
+        System.out.println("数据处理完成!");
+    }
+
+
+
+}

+ 62 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/EnvControll/UploadMqtt.java

@@ -0,0 +1,62 @@
+package com.huimv.guowei.admin.mqtt.EnvControll;
+
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+
+@Configuration
+@Data
+public class UploadMqtt extends Thread {
+    public static final String HOST = "tcp://115.238.57.190:1883";
+    private String clientid="GUOWEIsubscribe" ;
+    private static MqttClient client;
+    private static MqttConnectOptions options;
+
+    private static String userName = "admin";    //非必须
+    private static String passWord = "admin";  //非必须
+    private List<String> topics ;
+
+    @Override
+    public  void  run() {
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            //设置断开后重新连接
+            options.setAutomaticReconnect(false);
+            // 设置回调
+            client.setCallback(new PushCallback());
+
+            client.connect(options);
+
+            //订阅消息
+//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+            for (String topic : topics) {
+                System.out.println("走到了这里开始订阅!");
+                client.subscribe("huimv_up_"+topic, 1);
+            }
+//            client.subscribe(topic,1);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+
+}

+ 210 - 51
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/back/PushCallback.java

@@ -1,51 +1,210 @@
-package com.huimv.guowei.admin.mqtt.back;
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-@Component
-public
-//接收消息回调
-class PushCallback implements MqttCallback {
-
-
-    @Override
-    public void connectionLost(Throwable cause) {
-
-        // 连接丢失后,一般在这里面进行重连
-        System.out.println("连接断开,可以做重连");
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        System.out.println("deliveryComplete---------" + token.isComplete());
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-        // subscribe后得到的消息会执行到这里面
-        System.out.println("接收消息主题 : " + topic);
-        System.out.println("接收消息Qos : " + message.getQos());
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        Date date = new Date();
-        System.out.println("接收消息的时间:" + sdf.format(date));
-        byte[] payload = message.getPayload();
-        StringBuilder sb = new StringBuilder();
-        for (byte b : payload) {
-            sb.append(String.format("%02x", b));
-        }
-        String s = sb.toString();
-        System.out.println("接收消息内容 : " + s);
-        System.out.println("开始处理当前数据...");
-        System.out.println("数据处理完成!");
-    }
-
-}
-
+//package com.huimv.guowei.admin.mqtt.back;
+//
+//import cn.hutool.core.util.ObjectUtil;
+//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+//import com.huimv.guowei.admin.entity.*;
+//import com.huimv.guowei.admin.service.*;
+//import com.huimv.guowei.admin.utils.SpringUtil;
+//import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+//import org.eclipse.paho.client.mqttv3.MqttCallback;
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttMessage;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//
+//import java.math.BigDecimal;
+//import java.text.SimpleDateFormat;
+//import java.util.ArrayList;
+//import java.util.Date;
+//import java.util.List;
+//
+//@Component
+//public
+////接收消息回调
+//class PushCallback implements MqttCallback {
+//
+//
+//    @Override
+//    public void connectionLost(Throwable cause) {
+//
+//        // 连接丢失后,一般在这里面进行重连
+//        System.out.println("连接断开,可以做重连");
+//    }
+//
+//    @Override
+//    public void deliveryComplete(IMqttDeliveryToken token) {
+//        System.out.println("deliveryComplete---------" + token.isComplete());
+//    }
+//
+//    @Override
+//    public void messageArrived(String topic, MqttMessage message) throws Exception {
+//        // subscribe后得到的消息会执行到这里面
+//        System.out.println("接收消息主题 : " + topic);
+//        System.out.println("接收消息Qos : " + message.getQos());
+//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+//        Date date = new Date();
+//        System.out.println("接收消息的时间:" + sdf.format(date));
+//        byte[] payload = message.getPayload();
+//        StringBuilder sb = new StringBuilder();
+//        for (byte b : payload) {
+//            sb.append(String.format("%02x", b));
+//        }
+//        String s = sb.toString();
+//        System.out.println("接收消息内容 : " + s);
+//        System.out.println("开始处理当前数据...");
+//
+//        List list = getListFromContent(s, 2);
+//        String id = list.get(5).toString().concat(list.get(4).toString()).concat(list.get(3).toString()).concat(list.get(2).toString());
+//        System.out.println(id);
+//        int id1 = Integer.parseInt(id, 16);
+//        System.out.println("设备id:" + id1);//id
+//        String s1 = list.get(8).toString().concat(list.get(7).toString());//电量
+//        int anInt = Integer.parseInt(s1, 16);
+//        Float f = (float) anInt / 10;
+//        System.out.println("电池电量:" + f);
+//
+//        IRawDataService rawDataService = SpringUtil.getBean(IRawDataService.class);
+//        QueryWrapper<RawData> queryWrapper = new QueryWrapper<>();
+//        queryWrapper.eq("device_code", id1).orderByDesc("create_time").last(" limit 1");
+//        RawData one = rawDataService.getOne(queryWrapper);//拿到上一次传输时的最后一条数据
+//        System.out.println("第一个查询没问题!!");
+//
+//        IEnvDeviceService envDeviceService = SpringUtil.getBean(IEnvDeviceService.class);
+//        QueryWrapper<EnvDevice> queryWrapper1 = new QueryWrapper<>();
+//        queryWrapper1.eq("device_code", id1);
+//        EnvDevice device = envDeviceService.getOne(queryWrapper1);
+//        System.out.println("第二个查询没问题!!");
+//
+//        IBaseDuckInfoService infoService = SpringUtil.getBean(IBaseDuckInfoService.class);
+//        QueryWrapper<BaseDuckInfo> queryWrapper2 = new QueryWrapper<>();
+//        queryWrapper2.eq("unit_id", device.getUnitId());
+//        BaseDuckInfo info = infoService.getOne(queryWrapper2);
+//        System.out.println("第三个查询没问题!!");
+//
+//        IEnvRegularCallFeedingService feedingService = SpringUtil.getBean(IEnvRegularCallFeedingService.class);
+//        IEnvRegularCallEggService eggService = SpringUtil.getBean(IEnvRegularCallEggService.class);
+//        System.out.println("设置两个service没问题!!");
+//
+//        BigDecimal a1;
+//        if (ObjectUtil.isEmpty(one)) {
+//            a1 = BigDecimal.valueOf(0);
+//        } else {
+//            String oneData = one.getData();
+//            a1 = new BigDecimal(oneData);
+//        }
+//        try {
+//            for (int i = 10; i < list.size() - 1; i = i + 2) {
+//                String data = list.get(i + 1).toString().concat(list.get(i).toString());
+//                int parseInt = Integer.parseInt(data, 16);
+//                Float d = (float) parseInt / 100;
+//                float weight = ((float) Math.round(d * 10)) / 10;//下一次的数据
+//                System.out.println("重量:" + weight);
+//
+//                BigDecimal a2 = new BigDecimal(Float.toString(weight));
+//                System.out.println("---重量转换是正常的:"+a2);
+//                System.out.println("---上一次的重量:"+a1);
+//
+//                if (a2.compareTo(a1) !=1) { //a1>=a2
+//                    System.out.println("进入了保存采食环节!!");
+//                    BigDecimal subtract = a1.subtract(a2);
+//                    System.out.println("1");
+//                    EnvRegularCallFeeding feeding = new EnvRegularCallFeeding();
+//                    feeding.setUnitName(device.getUnitName());
+//                    System.out.println("2");
+//                    feeding.setUnitId(device.getUnitId());
+//                    System.out.println("3");
+//                    feeding.setDuckNum(info.getDuckNum());
+//                    System.out.println("4");
+//                    feeding.setCallName(device.getDeviceName());
+//                    System.out.println("5");
+//                    feeding.setCallCode(device.getDeviceCode());
+//                    System.out.println("6");
+//                    feeding.setDuckWeight(subtract.doubleValue());
+//                    System.out.println("7");
+//                    feeding.setCallDate(new Date());
+//                    System.out.println("8");
+//                    feeding.setFarmId(device.getFarmId());
+//                    System.out.println("9");
+//                    feeding.setBattery(f.toString());
+//                    System.out.println("10");
+//                    feeding.setDuckId(info.getId());
+//                    System.out.println("值设置完成!!");
+//                    feedingService.save(feeding);
+//                } else {
+//                    BigDecimal subtract = a2.subtract(a1);
+//                    BigDecimal a = new BigDecimal("50");
+//                    BigDecimal b = new BigDecimal("100");
+//
+//                    System.out.println("进入了彩蛋环节!!!");
+//                    if (subtract.compareTo(a) != -1 && subtract.compareTo(b) != 1) {
+//                        EnvRegularCallEgg egg = new EnvRegularCallEgg();
+//                        egg.setCallName(device.getDeviceName());
+//                        egg.setCallCode(device.getDeviceCode());
+//                        egg.setCallDate(new Date());
+//                        egg.setDuckNum(String.valueOf(id1));
+//                        egg.setDuckId(info.getId());
+//                        egg.setBattery(f.toString());
+//                        egg.setEggNum(1);
+//                        egg.setDuckWeight(subtract.doubleValue());
+//                        egg.setFarmId(device.getFarmId());
+//                        egg.setUnitName(device.getUnitName());
+//                        egg.setUnitId(device.getUnitId());
+//                        eggService.save(egg);
+//                    }
+//                }
+//                RawData rawData = new RawData();
+//                rawData.setDeviceCode(String.valueOf(id1));
+//                rawData.setData(String.valueOf(weight));
+//                rawData.setCreateTime(new Date());
+//                rawDataService.save(rawData);
+//                a1 = a2;
+//
+//            }
+//        } catch (Exception e) {
+//            System.out.println("-------保存出现了异常!!!" );
+//        }
+//
+//
+//
+//        System.out.println("数据处理完成!");
+//    }
+//
+//    public static List getListFromContent(String content, int count) {
+//        List list = new ArrayList();
+//        // 获取String的总长度
+//        int contentLength = content.length();
+//        if (contentLength < count) {
+//            list.add(content);
+//        } else {
+//            int begin = 0;
+//            // 获取需要切割多少段
+//            int cutCount = contentLength / count;
+//            int cutCounts = contentLength % count;
+//            // 获取切割段的长度
+//            if (cutCounts != 0) {
+//                cutCount++;
+//            }
+//            for (int i = 1; i <= cutCount; i++) {
+//                String temp;
+//                // 不是最后一段
+//                if (i != cutCount) {
+//                    temp = content.substring(begin, count * i);
+//                } else {
+//                    temp = content.substring(begin, contentLength);
+//                }
+//                begin = count * i;
+//                list.add(temp);
+//            }
+//        }
+//        return list;
+//    }
+//
+//    public static void main(String[] args) {
+//        BigDecimal a = BigDecimal.valueOf(0);
+//        BigDecimal b = new BigDecimal(0.0);
+//        BigDecimal subtract = b.subtract(a);
+//        System.out.println(subtract);
+//
+//    }
+//}
+//

+ 1 - 0
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/moveScale/SendData.java

@@ -63,6 +63,7 @@ public class SendData  {
         System.out.println(mess);
         try {
             client.publish(topic, mess);
+            client.disconnect();
         } catch (Exception e) {
             //LOGGER.error(e.getLocalizedMessage());
         }

+ 79 - 79
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/pulish/PublishMQTT.java

@@ -1,79 +1,79 @@
-package com.huimv.guowei.admin.mqtt.pulish;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-@Configuration
-@Component
-public class PublishMQTT {
-//接收数据时如果需要响应时调用
-
-    public static final String HOST = "tcp://192.168.1.68:1883";
-    private static final String clientid = "FabuCeShi";
-    private MqttClient client;
-    private MqttConnectOptions options;
-    private String userName = "admin";    //非必须
-    private String passWord = "admin";  //非必须
-
-    public void start() {
-        try {
-            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
-            client = new MqttClient(HOST, clientid, new MemoryPersistence());
-            // MQTT的连接设置
-            options = new MqttConnectOptions();
-            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-            options.setCleanSession(true);
-            // 设置连接的用户名
-            options.setUserName(userName);
-            // 设置连接的密码
-            options.setPassword(passWord.toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(10);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(20);
-            //设置断开后重新连接
-            options.setAutomaticReconnect(true);
-            // 设置回调
-
-            client.connect(options);
-
-            pubMessage("","");
-            System.out.println("发送完成!");
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 消息发送
-     * @param message
-     * @param topic
-     */
-    public  void pubMessage(String message,String topic){
-        MqttMessage mess = new MqttMessage();
-        mess.setQos(1);
-        mess.setRetained(true);
-        mess.setPayload(message.getBytes());
-        try {
-            client.publish(topic, mess);
-        } catch (Exception e) {
-            //LOGGER.error(e.getLocalizedMessage());
-        }
-    }
-
-
-    public static void main(String[] args) {
-        PublishMQTT client1 = new PublishMQTT();
-        client1.start();
-//        client1.Connect();
-    }
-}
+//package com.huimv.guowei.admin.mqtt.pulish;
+//
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.eclipse.paho.client.mqttv3.MqttMessage;
+//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.stereotype.Component;
+//
+//import java.sql.Connection;
+//import java.sql.DriverManager;
+//import java.sql.ResultSet;
+//import java.sql.Statement;
+//
+//@Configuration
+//@Component
+//public class PublishMQTT {
+////接收数据时如果需要响应时调用
+//
+//    public static final String HOST = "tcp://192.168.1.68:1883";
+//    private static final String clientid = "FabuCeShi";
+//    private MqttClient client;
+//    private MqttConnectOptions options;
+//    private String userName = "admin";    //非必须
+//    private String passWord = "admin";  //非必须
+//
+//    public void start() {
+//        try {
+//            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+//            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+//            // MQTT的连接设置
+//            options = new MqttConnectOptions();
+//            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+//            options.setCleanSession(true);
+//            // 设置连接的用户名
+//            options.setUserName(userName);
+//            // 设置连接的密码
+//            options.setPassword(passWord.toCharArray());
+//            // 设置超时时间 单位为秒
+//            options.setConnectionTimeout(10);
+//            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+//            options.setKeepAliveInterval(20);
+//            //设置断开后重新连接
+//            options.setAutomaticReconnect(true);
+//            // 设置回调
+//
+//            client.connect(options);
+//
+//            pubMessage("","");
+//            System.out.println("发送完成!");
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    /**
+//     * 消息发送
+//     * @param message
+//     * @param topic
+//     */
+//    public  void pubMessage(String message,String topic){
+//        MqttMessage mess = new MqttMessage();
+//        mess.setQos(1);
+//        mess.setRetained(true);
+//        mess.setPayload(message.getBytes());
+//        try {
+//            client.publish(topic, mess);
+//        } catch (Exception e) {
+//            //LOGGER.error(e.getLocalizedMessage());
+//        }
+//    }
+//
+//
+//    public static void main(String[] args) {
+//        PublishMQTT client1 = new PublishMQTT();
+//        client1.start();
+////        client1.Connect();
+//    }
+//}

+ 71 - 71
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/pulish/PublishMQTTCommand.java

@@ -1,72 +1,72 @@
-package com.huimv.guowei.admin.mqtt.pulish;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
-
-import java.sql.*;
-
-@Configuration
-@Component
-public class PublishMQTTCommand {
-
-//    public static final String HOST = "tcp://115.238.57.190:1883";
-//    private static final String clientid = "publish";
-//    private MqttClient client;
-//    private MqttConnectOptions options;
+//package com.huimv.guowei.admin.mqtt.pulish;
 //
-//    private String userName = "admin";    //非必须
-//    private String passWord = "admin";  //非必须
-
-    public Boolean start(String HOST,String clientid,String userName,String passWord,String topic,String message) {
-        MqttClient client;
-        MqttConnectOptions options;
-
-        try {
-            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
-            client = new MqttClient(HOST, clientid, new MemoryPersistence());
-            // MQTT的连接设置
-            options = new MqttConnectOptions();
-            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-            options.setCleanSession(true);
-            // 设置连接的用户名
-            options.setUserName(userName);
-            // 设置连接的密码
-            options.setPassword(passWord.toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(10);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(20);
-            //设置断开后重新连接
-            options.setAutomaticReconnect(true);
-
-            client.connect(options);
-
-//            pubMessage(client,"list",topic);
-            MqttMessage mess = new MqttMessage();
-            mess.setQos(1);
-            mess.setRetained(true);
-            mess.setPayload(message.getBytes());
-            return topublish(client,topic,mess);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-
-    private Boolean topublish(MqttClient client, String topic, MqttMessage mess) {
-        try {
-            client.publish(topic, mess);
-            return true;
-        } catch (Exception e) {
-//                LOGGER.error(e.getLocalizedMessage());
-            System.out.println("发起指令出现错误!!"+e);
-            return false;
-        }
-    }
-
-}
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.eclipse.paho.client.mqttv3.MqttMessage;
+//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.stereotype.Component;
+//
+//import java.sql.*;
+//
+//@Configuration
+//@Component
+//public class PublishMQTTCommand {
+//
+////    public static final String HOST = "tcp://115.238.57.190:1883";
+////    private static final String clientid = "publish";
+////    private MqttClient client;
+////    private MqttConnectOptions options;
+////
+////    private String userName = "admin";    //非必须
+////    private String passWord = "admin";  //非必须
+//
+//    public Boolean start(String HOST,String clientid,String userName,String passWord,String topic,String message) {
+//        MqttClient client;
+//        MqttConnectOptions options;
+//
+//        try {
+//            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+//            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+//            // MQTT的连接设置
+//            options = new MqttConnectOptions();
+//            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+//            options.setCleanSession(true);
+//            // 设置连接的用户名
+//            options.setUserName(userName);
+//            // 设置连接的密码
+//            options.setPassword(passWord.toCharArray());
+//            // 设置超时时间 单位为秒
+//            options.setConnectionTimeout(10);
+//            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+//            options.setKeepAliveInterval(20);
+//            //设置断开后重新连接
+//            options.setAutomaticReconnect(true);
+//
+//            client.connect(options);
+//
+////            pubMessage(client,"list",topic);
+//            MqttMessage mess = new MqttMessage();
+//            mess.setQos(1);
+//            mess.setRetained(true);
+//            mess.setPayload(message.getBytes());
+//            return topublish(client,topic,mess);
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            return false;
+//        }
+//    }
+//
+//    private Boolean topublish(MqttClient client, String topic, MqttMessage mess) {
+//        try {
+//            client.publish(topic, mess);
+//            return true;
+//        } catch (Exception e) {
+////                LOGGER.error(e.getLocalizedMessage());
+//            System.out.println("发起指令出现错误!!"+e);
+//            return false;
+//        }
+//    }
+//
+//}

+ 103 - 102
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTT.java

@@ -1,102 +1,103 @@
-package com.huimv.guowei.admin.mqtt.subscribe;
-
-
-import com.huimv.guowei.admin.mqtt.back.PushCallback;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-import java.util.List;
-
-@EqualsAndHashCode(callSuper = true)
-@Data
-public class ConsumerMQTT extends Thread {
-    public static final String HOST = "tcp://115.238.57.190:1883";
-    private String clientid ;
-    private static MqttClient client;
-    private static MqttConnectOptions options;
-
-    private static String userName = "admin";    //非必须
-    private static String passWord = "admin";  //非必须
-    private List<String> topics ;
-
-
-//    @SneakyThrows
-    @Override
-    public  void  run() {
-        try {
-            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
-            client = new MqttClient(HOST, "subscribe", new MemoryPersistence());
-            // MQTT的连接设置
-            options = new MqttConnectOptions();
-            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-            options.setCleanSession(true);
-            // 设置连接的用户名
-            options.setUserName(userName);
-            // 设置连接的密码
-            options.setPassword(passWord.toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(60);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(30);
-
-            //设置断开后重新连接
-            options.setAutomaticReconnect(false);
-
-            // 设置回调
-            client.setCallback(new PushCallback());
-
-            client.connect(options);
-
-            //订阅消息
-//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
-            for (String topic : topics) {
-                System.out.println("走到了这里开始订阅!");
-                client.subscribe("huimv_up_"+topic, 1);
-            }
-//            client.subscribe(topic,1);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-    }
-
-    public static void test(List<String> topics) {
-        try {
-            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
-            client = new MqttClient(HOST, "subscribe", new MemoryPersistence());
-            // MQTT的连接设置
-            options = new MqttConnectOptions();
-            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-            options.setCleanSession(true);
-            // 设置连接的用户名
-            options.setUserName(userName);
-            // 设置连接的密码
-            options.setPassword(passWord.toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(60);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(30);
-
-            //设置断开后重新连接
-            options.setAutomaticReconnect(true);
-
-            // 设置回调
-            client.setCallback(new PushCallback());
-
-            client.connect(options);
-
-            //订阅消息
-//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
-            for (String topic : topics) {
-                System.out.println("重新走到了这里开始订阅!");
-                client.subscribe("huimv_up_"+topic, 1);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}
+//package com.huimv.guowei.admin.mqtt.subscribe;
+//
+//
+//import com.huimv.guowei.admin.mqtt.back.PushCallback;
+//import lombok.Data;
+//import lombok.EqualsAndHashCode;
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+//
+//import java.util.List;
+//
+//@EqualsAndHashCode(callSuper = true)
+//@Data
+//public class ConsumerMQTT extends Thread {
+//    public static final String HOST = "tcp://192.168.1.68:1883";
+//    private String clientid ;
+//    private static MqttClient client;
+//    private static MqttConnectOptions options;
+//
+//    private static String userName = "admin";    //非必须
+//    private static String passWord = "admin";  //非必须
+//    /*private List<String> topics ;*/
+//
+//
+////    @SneakyThrows
+//    @Override
+//    public  void  run() {
+//        try {
+//            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+//            client = new MqttClient(HOST, "222769716", new MemoryPersistence());
+//            // MQTT的连接设置
+//            options = new MqttConnectOptions();
+//            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+//            options.setCleanSession(true);
+//            // 设置连接的用户名
+//            options.setUserName(userName);
+//            // 设置连接的密码
+//            options.setPassword(passWord.toCharArray());
+//            // 设置超时时间 单位为秒
+//            options.setConnectionTimeout(60);
+//            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+//            options.setKeepAliveInterval(30);
+//
+//            //设置断开后重新连接
+//            options.setAutomaticReconnect(false);
+//
+//            // 设置回调
+//            client.setCallback(new PushCallback());
+//
+//            client.connect(options);
+//
+//            //订阅消息
+//            client.subscribe("/LG210/D4AD204BAEE4/up", 1);
+////            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+//          /*  for (String topic : topics) {
+//                System.out.println("走到了这里开始订阅!");
+//                client.subscribe("/LG210/D4AD204BAEE4/up", 1);
+//            }*/
+////            client.subscribe(topic,1);
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//
+//    }
+//
+//    public static void test(List<String> topics) {
+//        try {
+//            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+//            client = new MqttClient(HOST, "subscribe", new MemoryPersistence());
+//            // MQTT的连接设置
+//            options = new MqttConnectOptions();
+//            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+//            options.setCleanSession(true);
+//            // 设置连接的用户名
+//            options.setUserName(userName);
+//            // 设置连接的密码
+//            options.setPassword(passWord.toCharArray());
+//            // 设置超时时间 单位为秒
+//            options.setConnectionTimeout(60);
+//            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+//            options.setKeepAliveInterval(30);
+//
+//            //设置断开后重新连接
+//            options.setAutomaticReconnect(true);
+//
+//            // 设置回调
+//            client.setCallback(new PushCallback());
+//
+//            client.connect(options);
+//
+//            //订阅消息
+////            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+//            for (String topic : topics) {
+//                System.out.println("重新走到了这里开始订阅!");
+//                client.subscribe("huimv_up_"+topic, 1);
+//            }
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//}

+ 61 - 61
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTTCommand.java

@@ -1,62 +1,62 @@
-package com.huimv.guowei.admin.mqtt.subscribe;
-import com.huimv.guowei.admin.mqtt.back.PushCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ConsumerMQTTCommand {
-
-    MqttConnectOptions options;
-    MqttClient client;
-    public void start(String HOST,String clientid,String userName,String passWord,String topic) {
-        try {
-            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
-            client = new MqttClient(HOST, clientid, new MemoryPersistence());
-            // MQTT的连接设置
-            options = new MqttConnectOptions();
-            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
-            options.setCleanSession(true);
-            // 设置连接的用户名
-            options.setUserName(userName);
-            // 设置连接的密码
-            options.setPassword(passWord.toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(10);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(20);
-            //设置为0,防止ERROR
-            options.setExecutorServiceTimeout(0);
-            //设置断开后重新连接
-            options.setAutomaticReconnect(false);
-            // 设置回调
-            client.setCallback(new PushCallback());
-
-            client.connect(options);
-
-            //订阅消息
-//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
-            client.subscribe(topic,1);
-
-            client.disconnect();
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void disConnect(String topic) {
-        try {
-            client.unsubscribe(topic);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-    }
-
-//    public static void main(String[] args) {
-//        ConsumerMQTT consumerMQTT = new ConsumerMQTT();
-//        consumerMQTT.start();
+//package com.huimv.guowei.admin.mqtt.subscribe;
+//import com.huimv.guowei.admin.mqtt.back.PushCallback;
+//import org.eclipse.paho.client.mqttv3.MqttClient;
+//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+//import org.eclipse.paho.client.mqttv3.MqttException;
+//import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+//import org.springframework.context.annotation.Configuration;
+//
+//@Configuration
+//public class ConsumerMQTTCommand {
+//
+//    MqttConnectOptions options;
+//    MqttClient client;
+//    public void start(String HOST,String clientid,String userName,String passWord,String topic) {
+//        try {
+//            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+//            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+//            // MQTT的连接设置
+//            options = new MqttConnectOptions();
+//            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+//            options.setCleanSession(true);
+//            // 设置连接的用户名
+//            options.setUserName(userName);
+//            // 设置连接的密码
+//            options.setPassword(passWord.toCharArray());
+//            // 设置超时时间 单位为秒
+//            options.setConnectionTimeout(10);
+//            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+//            options.setKeepAliveInterval(20);
+//            //设置为0,防止ERROR
+//            options.setExecutorServiceTimeout(0);
+//            //设置断开后重新连接
+//            options.setAutomaticReconnect(false);
+//            // 设置回调
+//            client.setCallback(new PushCallback());
+//
+//            client.connect(options);
+//
+//            //订阅消息
+////            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+//            client.subscribe(topic,1);
+//
+//            client.disconnect();
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
 //    }
-}
+//
+//    public void disConnect(String topic) {
+//        try {
+//            client.unsubscribe(topic);
+//        } catch (MqttException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+////    public static void main(String[] args) {
+////        ConsumerMQTT consumerMQTT = new ConsumerMQTT();
+////        consumerMQTT.start();
+////    }
+//}

+ 21 - 28
huimv-admin/src/main/java/com/huimv/guowei/admin/mqtt/subscribe/ConsumerMQTTStart.java

@@ -1,30 +1,23 @@
-package com.huimv.guowei.admin.mqtt.subscribe;
-
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.util.ArrayList;
-import java.util.List;
-
-@Component
-public class ConsumerMQTTStart {
-
+//package com.huimv.guowei.admin.mqtt.subscribe;
+//
+//import com.huimv.guowei.admin.service.IEnvDeviceService;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.PostConstruct;
+//import java.util.ArrayList;
+//import java.util.List;
+//
+//@Component
+//public class ConsumerMQTTStart {
+//
+//
 //    @Autowired
-//    private IEnvDeviceRegisterService deviceRegisterService;
-
-    @PostConstruct
-    public  void  test() {
-//        List<EnvDeviceRegister> list = deviceRegisterService.list();
-//        System.out.println("设备的数量:"+list.size());
+//    private IEnvDeviceService deviceService;
+//    @PostConstruct
+//    public  void  test() {
+//    /*    ConsumerMQTT consumerMQTT = new ConsumerMQTT();
+//        consumerMQTT.run();*/
+//    }
 //
-//        List<String> chipIds = new ArrayList<>();
-//        for (EnvDeviceRegister deviceRegister : list) {
-//            String chipId = deviceRegister.getChipId();
-//            chipIds.add(chipId);
-//        }
-//        UploadMqtt uploadmqtt = new UploadMqtt();
-//        uploadmqtt.setTopics(chipIds);
-//        uploadmqtt.run();
-    }
-
-}
+//}

+ 1 - 1
huimv-admin/src/main/java/com/huimv/guowei/admin/service/impl/BaseDuckInfoServiceImpl.java

@@ -118,7 +118,7 @@ public class BaseDuckInfoServiceImpl extends ServiceImpl<BaseDuckInfoMapper, Bas
             queryWrapper1.eq("unit_id", paramsMap.get("unitId"));
             List<BaseDuckInfo> baseDuckInfos = duckInfoMapper.selectList(queryWrapper1);
             for (BaseDuckInfo duckInfo : baseDuckInfos) {
-                if (duckInfo.getIsCage() == 1) {
+                if (duckInfo.getIsCage() == 0) {
                     return new Result(10001, "鸭笼已被使用!", false);
                 }
             }

+ 4 - 1
huimv-admin/src/main/java/com/huimv/guowei/admin/service/impl/EnvRegularCallFeedingServiceImpl.java

@@ -117,7 +117,10 @@ public class EnvRegularCallFeedingServiceImpl extends ServiceImpl<EnvRegularCall
         QueryWrapper<EnvRegularCallFeeding> queryWrapper = new QueryWrapper<>();
         Calendar calendar = Calendar.getInstance();
         calendar.set(Calendar.DATE, calendar.get(Calendar.DATE) - 7);
-        queryWrapper.eq("farm_id", farmId).like(StringUtils.isNotBlank(duckId),"duck_id", duckId).ge("call_date", calendar.getTime());
+        queryWrapper.eq("farm_id", farmId).eq("event_type",0)
+                .like(StringUtils.isNotBlank(duckId),"duck_id", duckId)
+                .select("SUM(duck_weight) duckWeight,duck_num duckNum,call_date callDate")
+                .ge("call_date", calendar.getTime());
         return new Result(ResultCode.SUCCESS, feedingMapper.selectList(queryWrapper));
     }