瀏覽代碼

mqtt整合

wwh 2 年之前
父節點
當前提交
639c5227fd

+ 24 - 1
huimv-env-platform/huimv-env-manage/pom.xml

@@ -98,7 +98,7 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
-            <version>6.0.6</version>
+            <version>8.0.29</version>
         </dependency>
         <!--Mybatis-Plus生成器依赖-->
         <dependency>
@@ -156,6 +156,29 @@
             <groupId>com.huimv</groupId>
             <artifactId>huimv-env-common</artifactId>
         </dependency>
+
+        <!--mqtt相关依赖-->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-stream</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+
+
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.2</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 33 - 1
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/controller/EnvDeviceEquipmentController.java

@@ -1,14 +1,20 @@
 package com.huimv.env.manage.saas.controller;
 
 
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.huimv.env.manage.entity.EnvDeviceRegister;
 import com.huimv.env.manage.saas.dao.entity.EnvDeviceEquipment;
+import com.huimv.env.manage.saas.mqtt.publish.PublishMQTT;
 import com.huimv.env.manage.saas.service.IEnvDeviceEquipmentService;
 import com.huimv.env.manage.service.IEnvDeviceRegisterService;
 import com.huimv.env.manage.utils.Result;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -22,6 +28,7 @@ import java.util.Map;
 @RestController
 @RequestMapping("/envDeviceEquipment")
 @CrossOrigin
+@Slf4j
 public class EnvDeviceEquipmentController {
 
     @Autowired
@@ -40,12 +47,37 @@ public class EnvDeviceEquipmentController {
     public Result update(@RequestBody EnvDeviceEquipment envDeviceEquipment){
         try {
             envDeviceEquipmentService.updateById(envDeviceEquipment);
+            String deviceCode = envDeviceEquipment.getDeviceCode();
+            System.out.println("设备"+deviceCode);
+            QueryWrapper<EnvDeviceRegister> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("device_code",deviceCode);
+            EnvDeviceRegister one = deviceRegisterService.getOne(queryWrapper);
+            String chipId = one.getChipId();
+            log.info("-------获取设备编码:"+chipId);
+            PublishMQTT publishMQTT = new PublishMQTT();
+            String connect = publishMQTT.Connect(chipId);
+            log.info("-----连接成功----");
+
+            log.info("-----开始配置消息----");
+            QueryWrapper<EnvDeviceEquipment> queryWrapper1 = new QueryWrapper<>();
+            queryWrapper1.eq("device_code",deviceCode);
+            List<EnvDeviceEquipment> list = envDeviceEquipmentService.list(queryWrapper1);
+
+            StringBuffer deviceStatus = new StringBuffer("0000000000000000");
+            for (EnvDeviceEquipment envDeviceEquipment1 : list) {
+                Integer equipmentPort = envDeviceEquipment1.getEquipmentPort();
+                Integer openStart = envDeviceEquipment1.getUploadStart();
+                deviceStatus.replace(equipmentPort -1,equipmentPort,openStart+"");
+            }
+            String s = deviceStatus.toString();
+            log.info("消息配置完成开始发送:"+s);
+
+            publishMQTT.start("tcp://192.168.1.68:1883",chipId,"admin","admin","huimv_down_"+chipId,s);
             return new Result(10000,"修改成功",true);
         }catch (Exception e){
             return new Result(10001,"修改失败",false);
         }
 
-
     }
 
 

+ 109 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/publish/PublishMQTT.java

@@ -0,0 +1,109 @@
+package com.huimv.env.manage.saas.mqtt.publish;
+
+import cn.hutool.extra.spring.SpringUtil;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import java.sql.*;
+import java.util.List;
+import java.util.Map;
+
+@Configuration
+@Component
+public class PublishMQTT {
+
+//    public static final String HOST = "tcp://192.168.1.68:1883";
+//    private static final String clientid = "publish";
+//    private MqttClient client;
+//    private MqttConnectOptions options;
+//
+//    private String userName = "admin";    //非必须
+//    private String passWord = "admin";  //非必须
+
+    public void start(String HOST,String clientid,String userName,String passWord,String topic,String message) {
+        MqttClient client;
+        MqttConnectOptions options;
+
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            //设置断开后重新连接
+            options.setAutomaticReconnect(true);
+
+            client.connect(options);
+
+//            pubMessage(client,"list",topic);
+            MqttMessage mess = new MqttMessage();
+            mess.setQos(1);
+            mess.setRetained(true);
+            mess.setPayload(message.getBytes());
+            try {
+                client.publish(topic, mess);
+            } catch (Exception e) {
+                //LOGGER.error(e.getLocalizedMessage());
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 消息发送
+     * @param message
+     * @param topic
+     */
+    public void pubMessage(MqttClient client,String message,String topic){
+        MqttMessage mess = new MqttMessage();
+        mess.setQos(1);
+        mess.setRetained(true);
+        mess.setPayload(message.getBytes());
+        try {
+            client.publish(topic, mess);
+        } catch (Exception e) {
+            //LOGGER.error(e.getLocalizedMessage());
+        }
+    }
+
+    /*
+     * 连接数据库
+     * */
+    public String Connect(String chipId) throws SQLException, ClassNotFoundException {
+
+
+            //1.加载驱动
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            //2.链接数据库
+            String url = "jdbc:mysql://122.112.224.199:3306/huimv-env-platform-qingshan";
+            Connection conn = DriverManager.getConnection(url, "qingshan", "qingshan@2022");
+            System.out.println("开始连接"+conn);
+            //3.获取statement对象
+            Statement statement = conn.createStatement();
+            String sql = "select chip_id from env_device_register where chip_id=".concat(chipId);
+            //4.获取结果集
+            ResultSet resultSet = statement.executeQuery(sql);
+            resultSet.next();//这个一定不能少,指针取值
+            String deviceCode = resultSet.getString("chip_id");
+            System.out.println(deviceCode);
+
+        return deviceCode;
+    }
+
+}

+ 50 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/subscribe/ConsumerMQTT.java

@@ -0,0 +1,50 @@
+package com.huimv.env.manage.saas.mqtt.subscribe;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ConsumerMQTT {
+    public static final String HOST = "tcp://192.168.1.68:1883";
+    private static final String clientid = "subscribe";
+    private MqttClient client;
+    private MqttConnectOptions options;
+
+    private String userName = "admin";    //非必须
+    private String passWord = "admin";  //非必须
+
+    public void start() {
+        try {
+            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
+            client = new MqttClient(HOST, clientid, new MemoryPersistence());
+            // MQTT的连接设置
+            options = new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(true);
+            // 设置连接的用户名
+            options.setUserName(userName);
+            // 设置连接的密码
+            options.setPassword(passWord.toCharArray());
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(10);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(20);
+            //设置断开后重新连接
+            options.setAutomaticReconnect(true);
+            // 设置回调
+            client.setCallback(new PushCallback());
+
+            client.connect(options);
+
+            //订阅消息
+//            int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次
+            client.subscribe("test",1);
+
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 45 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/subscribe/PushCallback.java

@@ -0,0 +1,45 @@
+package com.huimv.env.manage.saas.mqtt.subscribe;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+@Component
+//接收消息回调
+class PushCallback implements MqttCallback {
+
+
+    @Override
+    public void connectionLost(Throwable cause) {
+
+        // 连接丢失后,一般在这里面进行重连
+        System.out.println("连接断开,可以做重连");
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        System.out.println("deliveryComplete---------" + token.isComplete());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        // subscribe后得到的消息会执行到这里面
+        System.out.println("接收消息主题 : " + topic);
+        System.out.println("接收消息Qos : " + message.getQos());
+        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+        System.out.println("开始处理当前数据...");
+
+        String messages = new String(message.getPayload());
+        String[] split = messages.split(",");
+//        PeopleServiceImpl peopleService = SpringUtil.getBean(PeopleServiceImpl.class);//创建impl实例
+//        People people = new People();
+//        people.setName(split[0]);
+//        people.setAge(Integer.parseInt(split[1]));
+//        people.setSex(split[2]);
+//        peopleService.save(people);
+//        peopleService.addPeople(split);
+        System.out.println("数据处理完成!");
+    }
+
+}

+ 79 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/utils/Result.java

@@ -0,0 +1,79 @@
+package com.huimv.env.manage.saas.mqtt.utils;
+
+import java.io.Serializable;
+
+public class Result implements Serializable {
+
+    private boolean success;
+    private Integer code;
+    private String message;
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        this.data = data;
+    }
+
+    private Object data;
+
+    //不需要返回数据时使用
+    public Result(ResultCode code) {
+        this.success = code.success;
+        this.code = code.code;
+        this.message = code.message;
+    }
+
+    public Result(ResultCode code, Object data) {
+        this.success = code.success;
+        this.code = code.code;
+        this.message = code.message;
+        this.data = data;
+    }
+
+    public Result(Integer code, String message, boolean success) {
+        this.code = code;
+        this.message = message;
+        this.success = success;
+    }
+
+    /*
+     * 调用ResultCode类封装常用的返回数据
+     */
+    public static Result SUCCESS(){
+        return new Result(ResultCode.SUCCESS);
+    }
+
+    public static Result ERROR(){
+        return new Result(ResultCode.SERVER_ERROR);
+    }
+
+    public static Result FAIL(){
+        return new Result(ResultCode.FAIL);
+    }
+}

+ 41 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/utils/ResultCode.java

@@ -0,0 +1,41 @@
+package com.huimv.env.manage.saas.mqtt.utils;
+
+public enum ResultCode {
+    SUCCESS(true,10000,"操作成功!"),
+    //---系统错误返回码-----
+    FAIL(false,10001,"操作失败"),
+    UNAUTHENTICATED(false,10002,"您还未登录"),
+    UNAUTHORISE(false,10003,"权限不足"),
+    SERVER_ERROR(false,99999,"抱歉,系统繁忙,请稍后重试!"),
+
+    //---用户操作返回码  2xxxx----
+    MOBILEORPASSWORDERROR(false,20001,"用户名或密码错误");
+    //---企业操作返回码  3xxxx----
+    //---权限操作返回码----
+    //---其他操作返回码----
+
+    //操作是否成功
+    boolean success;
+    //操作代码
+    int code;
+    //提示信息
+    String message;
+
+    ResultCode(boolean success, int code, String message){
+        this.success = success;
+        this.code = code;
+        this.message = message;
+    }
+
+    public boolean success() {
+        return success;
+    }
+
+    public int code() {
+        return code;
+    }
+
+    public String message() {
+        return message;
+    }
+}

+ 71 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/utils/ResultUtil.java

@@ -0,0 +1,71 @@
+package com.huimv.env.manage.saas.mqtt.utils;
+
+import cn.hutool.core.util.ObjectUtil;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @Project : huimv-breed
+ * @Package : IntelliJ IDEA
+ * @Description : TODO
+ * @Version : 1.0
+ * @Author : ZhuoNing
+ * @Create : 2022/9/27
+ **/
+@Component
+public class ResultUtil {
+
+    public static Result result(int rows){
+        if(rows == 0){
+            return new Result(ResultCode.FAIL,"处理失败.");
+        }else{
+            return new Result(ResultCode.SUCCESS,"处理成功.");
+        }
+    }
+
+    public static Result list(List o){
+        if(ObjectUtil.isNotEmpty(o) ){
+            return new Result(ResultCode.SUCCESS,o);
+        }else{
+            return new Result(ResultCode.FAIL,new ArrayList<>());
+        }
+    }
+    public static Result getResult(Integer rows){
+        if(rows > 0){
+            return new Result(ResultCode.SUCCESS);
+        }else{
+            return new Result(ResultCode.FAIL);
+        }
+    }
+
+    public static Result addResult(Integer rows){
+        if(rows > 0){
+            return new Result(10000, "添加成功",true);
+        }else{
+            return new Result(10001, "添加失败",false);
+        }
+    }
+    public static Result updateResult(Integer rows){
+        if(rows > 0){
+            return new Result(10000, "修改成功",true);
+        }else{
+            return new Result(10001, "修改失败",false);
+        }
+    }
+    public static Result deleteResult(Integer rows){
+        if(rows > 0){
+            return new Result(10000, "删除成功",true);
+        }else{
+            return new Result(10001, "删除失败",false);
+        }
+    }
+
+    public static Result exist(){
+       return new Result(10001, "该数据已存在",false);
+    }
+//    public static Result isNull(){
+//        return new Result(ResultCode.DATA_NULL);
+//    }
+}

+ 36 - 0
huimv-env-platform/huimv-env-manage/src/main/java/com/huimv/env/manage/saas/mqtt/utils/SpringUtil.java

@@ -0,0 +1,36 @@
+package com.huimv.env.manage.saas.mqtt.utils;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class SpringUtil implements ApplicationContextAware {
+    private static ApplicationContext applicationContext = null;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        if(SpringUtil.applicationContext == null){
+            SpringUtil.applicationContext  = applicationContext;
+        }
+    }
+
+    public static ApplicationContext getApplicationContext() {
+        return applicationContext;
+    }
+
+    public static Object getBean(String name){
+        return getApplicationContext().getBean(name);
+    }
+
+    public static <T> T getBean(Class<T> clazz){
+        return getApplicationContext().getBean(clazz);
+    }
+
+    public static <T> T getBean(String name,Class<T> clazz){
+        return getApplicationContext().getBean(name, clazz);
+    }
+
+
+}

+ 4 - 0
huimv-env-platform/huimv-env-manage/src/main/resources/application-dev.yml

@@ -1,5 +1,9 @@
 server:
   port: 8096
+  ssl:
+    key-store: classpath:8717089_qingshan.ifarmcloud.com.jks
+    key-store-type: jks
+    key-store-password: QYCEc3uL
 spring:
   application:
     name: huimv-env-manage