MqttAcceptCallback.java 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package com.zhouhong.mqtt.emqt.back;
  2. import com.zhouhong.mqtt.emqt.client.MqttAcceptClient;
  3. import lombok.extern.log4j.Log4j2;
  4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
  6. import org.eclipse.paho.client.mqttv3.MqttException;
  7. import org.eclipse.paho.client.mqttv3.MqttMessage;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.io.UnsupportedEncodingException;
  11. /**
  12. * description:
  13. * date: 2022/6/16 15:52
  14. *
  15. * @author: zhouhong
  16. */
  17. @Component
  18. @Log4j2
  19. public class MqttAcceptCallback implements MqttCallbackExtended {
  20. @Resource
  21. private MqttAcceptClient mqttAcceptClient;
  22. /**
  23. * 客户端断开后触发
  24. *
  25. * @param throwable
  26. */
  27. @Override
  28. public void connectionLost(Throwable throwable) {
  29. log.info("连接断开,可以做重连");
  30. if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
  31. log.info("emqx重新连接....................................................");
  32. mqttAcceptClient.reconnection();
  33. }
  34. }
  35. /**
  36. * 客户端收到消息触发
  37. *
  38. * @param topic 主题
  39. * @param mqttMessage 消息
  40. */
  41. @Override
  42. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  43. log.info("接收消息主题 : " + topic);
  44. log.info("接收消息Qos : " + mqttMessage.getQos());
  45. log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
  46. // int i = 1/0;
  47. }
  48. /**
  49. * 发布消息成功
  50. *
  51. * @param token token
  52. */
  53. @Override
  54. public void deliveryComplete(IMqttDeliveryToken token) {
  55. String[] topics = token.getTopics();
  56. for (String topic : topics) {
  57. log.info("向主题:" + topic + "发送消息成功!");
  58. }
  59. try {
  60. MqttMessage message = token.getMessage();
  61. byte[] payload = message.getPayload();
  62. String s = new String(payload, "UTF-8");
  63. log.info("消息的内容是:" + s);
  64. } catch (MqttException e) {
  65. e.printStackTrace();
  66. } catch (UnsupportedEncodingException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. /**
  71. * 连接emq服务器后触发
  72. *
  73. * @param b
  74. * @param s
  75. */
  76. @Override
  77. public void connectComplete(boolean b, String s) {
  78. log.info("--------------------ClientId:"
  79. + MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");
  80. // 以/#结尾表示订阅所有以test开头的主题
  81. // 订阅所有机构主题
  82. mqttAcceptClient.subscribe("client:report:1", 0);
  83. }
  84. }