Tom-shushu il y a 3 ans
Parent
commit
41754e310c

+ 0 - 2
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/EmqtApplication.java

@@ -1,11 +1,9 @@
 package com.zhouhong.mqtt.emqt;
 
-import lombok.extern.log4j.Log4j2;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 @SpringBootApplication
-@Log4j2
 public class EmqtApplication {
 
     public static void main(String[] args) {

+ 8 - 10
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/back/MqttAcceptCallback.java

@@ -12,7 +12,7 @@ import javax.annotation.Resource;
 import java.io.UnsupportedEncodingException;
 
 /**
- * description:
+ * description: 接收消息后的回调
  * date: 2022/6/16 15:52
  *
  * @author: zhouhong
@@ -31,9 +31,9 @@ public class MqttAcceptCallback implements MqttCallbackExtended {
      */
     @Override
     public void connectionLost(Throwable throwable) {
-        log.info("连接断开,可以做重连");
+        log.info("接收消息回调:  连接断开,可以做重连");
         if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
-            log.info("emqx重新连接....................................................");
+            log.info("接收消息回调:  emqx重新连接....................................................");
             mqttAcceptClient.reconnection();
         }
     }
@@ -46,10 +46,8 @@ public class MqttAcceptCallback implements MqttCallbackExtended {
      */
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-        log.info("接收消息主题 : " + topic);
-        log.info("接收消息Qos : " + mqttMessage.getQos());
-        log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
-//        int i = 1/0;
+        log.info("接收消息回调:  接收消息主题 : " + topic);
+        log.info("接收消息回调:  接收消息内容 : " + new String(mqttMessage.getPayload()));
     }
 
     /**
@@ -61,13 +59,13 @@ public class MqttAcceptCallback implements MqttCallbackExtended {
     public void deliveryComplete(IMqttDeliveryToken token) {
         String[] topics = token.getTopics();
         for (String topic : topics) {
-            log.info("向主题:" + topic + "发送消息成功!");
+            log.info("接收消息回调:  向主题:" + topic + "发送消息成功!");
         }
         try {
             MqttMessage message = token.getMessage();
             byte[] payload = message.getPayload();
             String s = new String(payload, "UTF-8");
-            log.info("消息的内容是:" + s);
+            log.info("接收消息回调:  消息的内容是:" + s);
         } catch (MqttException e) {
             e.printStackTrace();
         } catch (UnsupportedEncodingException e) {
@@ -87,6 +85,6 @@ public class MqttAcceptCallback implements MqttCallbackExtended {
                 + MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
         // 以/#结尾表示订阅所有以test开头的主题
         // 订阅所有机构主题
-        mqttAcceptClient.subscribe("client:report:1", 0);
+        mqttAcceptClient.subscribe("topic111", 0);
     }
 }

+ 6 - 8
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/back/MqttSendCallBack.java

@@ -11,7 +11,7 @@ import org.springframework.stereotype.Component;
 import java.io.UnsupportedEncodingException;
 
 /**
- * description:
+ * description: 发生消息成功后 的 回调
  * date: 2022/6/16 15:55
  *
  * @author: zhouhong
@@ -22,12 +22,11 @@ public class MqttSendCallBack implements MqttCallbackExtended {
 
     /**
      * 客户端断开后触发
-     *
      * @param throwable
      */
     @Override
     public void connectionLost(Throwable throwable) {
-        log.info("连接断开,可以做重连");
+        log.info("发送消息回调: 连接断开,可以做重连");
     }
 
     /**
@@ -38,9 +37,8 @@ public class MqttSendCallBack implements MqttCallbackExtended {
      */
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-        log.info("接收消息主题 : " + topic);
-        log.info("接收消息Qos : " + mqttMessage.getQos());
-        log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
+        log.info("发送消息回调:  接收消息主题 : " + topic);
+        log.info("发送消息回调:  接收消息内容 : " + new String(mqttMessage.getPayload()));
     }
 
     /**
@@ -52,13 +50,13 @@ public class MqttSendCallBack implements MqttCallbackExtended {
     public void deliveryComplete(IMqttDeliveryToken token) {
         String[] topics = token.getTopics();
         for (String topic : topics) {
-            log.info("向主题:" + topic + "发送消息成功!");
+            log.info("发送消息回调:  向主题:" + topic + "发送消息成功!");
         }
         try {
             MqttMessage message = token.getMessage();
             byte[] payload = message.getPayload();
             String s = new String(payload, "UTF-8");
-            log.info("消息的内容是:" + s);
+            log.info("发送消息回调:  消息的内容是:" + s);
         } catch (MqttException e) {
             e.printStackTrace();
         } catch (UnsupportedEncodingException e) {

+ 1 - 1
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/client/MqttAcceptClient.java

@@ -12,7 +12,7 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 /**
- * description:
+ * description: 服务器段端连接订阅消息、监控topic
  * date: 2022/6/16 15:52
  *
  * @author: zhouhong

+ 3 - 6
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/client/MqttSendClient.java

@@ -7,11 +7,10 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
 import java.util.UUID;
 
 /**
- * description:
+ * description: 发送消息
  * date: 2022/6/16 16:01
  *
  * @author: zhouhong
@@ -55,18 +54,16 @@ public class MqttSendClient {
      * 主题格式: server:report:$orgCode(参数实际使用机构代码)
      *
      * @param retained    是否保留
-     * @param orgCode     orgId
      * @param pushMessage 消息体
      */
-    public void publish(boolean retained, String orgCode, String pushMessage) {
+    public void publish(boolean retained, String topic, String pushMessage) {
         MqttMessage message = new MqttMessage();
         message.setQos(mqttProperties.getQos());
         message.setRetained(retained);
         message.setPayload(pushMessage.getBytes());
-        MqttDeliveryToken token;
         MqttClient mqttClient = connect();
         try {
-            mqttClient.publish("" + orgCode, message);
+            mqttClient.publish(topic, message);
         } catch (MqttException e) {
             e.printStackTrace();
         } finally {

+ 0 - 28
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/config/MqttCondition.java

@@ -1,28 +0,0 @@
-package com.zhouhong.mqtt.emqt.config;
-
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
-import org.springframework.context.annotation.Condition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.env.Environment;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * description:
- * date: 2022/6/16 15:56
- *
- * @author: zhouhong
- */
-public class MqttCondition implements Condition {
-
-    @Override
-    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
-        //1、能获取到ioc使用的beanfactory
-        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
-        //2、获取类加载器
-        ClassLoader classLoader = context.getClassLoader();
-        //3、获取当前环境信息
-        Environment environment = context.getEnvironment();
-        String isOpen = environment.getProperty("mqtt.isOpen");
-        return Boolean.parseBoolean(isOpen);
-    }
-}

+ 0 - 2
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/config/MqttConfig.java

@@ -2,7 +2,6 @@ package com.zhouhong.mqtt.emqt.config;
 
 import com.zhouhong.mqtt.emqt.client.MqttAcceptClient;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 
 import javax.annotation.Resource;
@@ -18,7 +17,6 @@ public class MqttConfig {
     @Resource
     private MqttAcceptClient mqttAcceptClient;
 
-    @Conditional(MqttCondition.class)
     @Bean
     public MqttAcceptClient getMqttPushClient() {
         mqttAcceptClient.connect();

+ 0 - 5
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/config/MqttProperties.java

@@ -62,11 +62,6 @@ public class MqttProperties {
     private Boolean reconnect;
 
     /**
-     * 启动的时候是否关闭mqtt
-     */
-    private Boolean isOpen;
-
-    /**
      * 连接方式
      */
     private Integer qos;

+ 2 - 4
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/controller/SendController.java

@@ -7,7 +7,7 @@ import org.springframework.web.bind.annotation.*;
 import javax.annotation.Resource;
 
 /**
- * description:
+ * description: 发消息控制类
  * date: 2022/6/16 15:58
  *
  * @author: zhouhong
@@ -20,8 +20,6 @@ public class SendController {
 
     @PostMapping("/mqtt/sendmessage")
     public void sendMessage(@RequestBody SendParam sendParam) {
-        System.out.println("message:"+sendParam.getMessageContent());
-        mqttSendClient.publish(false,"client:report:1",sendParam.getMessageContent());
+        mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent());
     }
-
 }

+ 0 - 113
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/message/MqttSend.java

@@ -1,113 +0,0 @@
-package com.zhouhong.mqtt.emqt.message;
-
-import com.zhouhong.mqtt.emqt.back.MqttSendCallBack;
-import com.zhouhong.mqtt.emqt.config.MqttProperties;
-import lombok.extern.log4j.Log4j2;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.UUID;
-
-/**
- * description:
- * date: 2022/6/16 15:54
- *
- * @author: zhouhong
- */
-@Component
-@Log4j2
-public class MqttSend {
-
-    @Resource
-    private MqttSendCallBack mqttSendCallBack;
-
-    @Resource
-    private MqttProperties mqttProperties;
-
-    public MqttClient connect() {
-        return getMqttClient(mqttProperties, mqttSendCallBack);
-    }
-
-    public static MqttClient getMqttClient(MqttProperties mqttProperties, MqttSendCallBack mqttSendCallBack) {
-        MqttClient client = null;
-        try {
-            String uuid = UUID.randomUUID().toString().replaceAll("-","");
-            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setUserName(mqttProperties.getUsername());
-            options.setPassword(mqttProperties.getPassword().toCharArray());
-            options.setConnectionTimeout(mqttProperties.getTimeout());
-            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
-            options.setCleanSession(true);
-            options.setAutomaticReconnect(false);
-            try {
-                // 设置回调
-                client.setCallback(mqttSendCallBack);
-                client.connect(options);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return client;
-    }
-
-    /**
-     * 发布消息
-     * 主题格式: server:report:$orgCode(参数实际使用机构代码)
-     *
-     * @param retained    是否保留
-     * @param orgCode     orgId
-     * @param pushMessage 消息体
-     */
-    public void publish(boolean retained, String orgCode, String pushMessage) {
-        MqttMessage message = new MqttMessage();
-        message.setQos(mqttProperties.getQos());
-        message.setRetained(retained);
-        message.setPayload(pushMessage.getBytes());
-        MqttDeliveryToken token;
-        MqttClient mqttClient = connect();
-        try {
-            mqttClient.publish("server:report:" + orgCode, message);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        } finally {
-            disconnect(mqttClient);
-            close(mqttClient);
-        }
-    }
-
-    /**
-     * 关闭连接
-     *
-     * @param mqttClient
-     */
-    public static void disconnect(MqttClient mqttClient) {
-        try {
-            if (mqttClient != null) {
-                mqttClient.disconnect();
-            }
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 释放资源
-     *
-     * @param mqttClient
-     */
-    public static void close(MqttClient mqttClient) {
-        try {
-            if (mqttClient != null) {
-                mqttClient.close();
-            }
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-    }
-}

+ 4 - 2
mqtt-emqt/src/main/java/com/zhouhong/mqtt/emqt/model/param/SendParam.java

@@ -9,10 +9,12 @@ import lombok.Data;
  */
 @Data
 public class SendParam {
-
     /***
      * 消息内容
      */
     private String messageContent;
-
+    /**
+     * 客户端发消息的主题主题
+     */
+    private String topic;
 }

+ 0 - 0
mqtt-emqt/src/main/resources/application.properties


+ 6 - 5
mqtt-emqt/src/main/resources/application.yml

@@ -1,14 +1,15 @@
 server:
   port: 8080
+
 mqtt:
-  hostUrl: tcp://xxx.xx.xx.xxx:1883
+  hostUrl: tcp://mqtt.zhouhong.icu:1883
   username: admin
   password: public
-  clientId: equipment_main
+  ## 服务端 clientId (发送端自己定义)
+  clientId: service_client_id
   cleanSession: true
   reconnect: true
   timeout: 100
   keepAlive: 100
-  defaultTopic: client:report:1
-  isOpen: true
-  qos: 1
+  defaultTopic: topic111
+  qos: 0