FBoxSignalRConnection.java 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package com.huimv.produce.sgd;
  2. import com.github.signalr4j.client.hubs.HubProxy;
  3. import com.google.gson.Gson;
  4. import com.google.gson.GsonBuilder;
  5. import com.google.gson.JsonArray;
  6. import com.google.gson.JsonObject;
  7. import com.huimv.produce.sgd.fbox.Logger;
  8. import com.huimv.produce.sgd.fbox.LoggerFactory;
  9. import com.huimv.produce.sgd.fbox.TokenManager;
  10. import com.huimv.produce.sgd.fbox.signalr.SignalRConnectionBase;
  11. import java.net.Proxy;
  12. import java.text.SimpleDateFormat;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.atomic.LongAdder;
  15. public class FBoxSignalRConnection extends SignalRConnectionBase {
  16. private final Gson gson;
  17. private final Logger logger;
  18. ConcurrentHashMap<Long, LongAdder> dmonIds = new ConcurrentHashMap<>();
  19. private LongAdder dmonMsgCounter = new LongAdder();
  20. private long lastDmonItemCount;
  21. private long lastDmonMsgCount;
  22. private long lastReportTime;
  23. private Proxy proxy;
  24. private LongAdder dmonItemCounter = new LongAdder();
  25. public FBoxSignalRConnection(String hubUrl, String signalrClientId, TokenManager tokenManager, Proxy proxy, LoggerFactory loggerFactory) {
  26. super(hubUrl, signalrClientId, tokenManager, proxy, loggerFactory);
  27. this.logger = loggerFactory.createLogger("FBoxSignalRConnection");
  28. this.proxy = proxy;
  29. gson = new GsonBuilder().create();
  30. // new Thread(() -> {
  31. // //统计条目数线程,可以去掉此线程代码
  32. // for (; ; ) {
  33. // try {
  34. // Thread.sleep(5000);
  35. // } catch (InterruptedException e) {
  36. // e.printStackTrace();
  37. // }
  38. // long currentTime = System.nanoTime();
  39. // long currentMsgCount = this.dmonMsgCounter.longValue();
  40. // long currentItemCount = this.dmonItemCounter.longValue();
  41. // long msgRate = (currentMsgCount - this.lastDmonMsgCount) * 1000000000 / (currentTime - this.lastReportTime);
  42. // long itemRate = (currentItemCount - this.lastDmonItemCount) * 1000000000 / (currentTime - this.lastReportTime);
  43. // this.logger.logInformation(String.format("Dmon id count: %d, item rate: %d, message rate: %d", this.dmonIds.size(), itemRate, msgRate));
  44. // this.lastReportTime = currentTime;
  45. // this.lastDmonMsgCount = currentMsgCount;
  46. // this.lastDmonItemCount = currentItemCount;
  47. // }
  48. // }).start();
  49. }
  50. @Override
  51. public void connected() {
  52. super.connected();
  53. dmonIds.clear();
  54. }
  55. @Override
  56. protected void onHubProxyDestroyed(HubProxy hubProxy){
  57. hubProxy.removeSubscription("dMonUpdateValue");
  58. hubProxy.removeSubscription("alarmTriggered");
  59. hubProxy.removeSubscription("alarmRecovered");
  60. hubProxy.removeSubscription("boxConnStateChanged");
  61. }
  62. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  63. @Override
  64. protected void onHubProxyCreated(HubProxy hubProxy) {
  65. //signalr实时数据推送事件,接收此事件数据前提条件,开启监控点数据推送控制接口(订阅)
  66. hubProxy.subscribe("dMonUpdateValue").addReceivedHandler(jsonElements -> {
  67. Global.threadPool.submit(() -> {
  68. //try{
  69. this.dmonMsgCounter.increment();
  70. //System.out.println("Dmon data received: ");
  71. //收到的推送数据
  72. JsonArray items = jsonElements[1].getAsJsonArray();
  73. for (com.google.gson.JsonElement jsonElement : items) {
  74. JsonObject item = jsonElement.getAsJsonObject();
  75. this.dmonIds.computeIfAbsent(item.get("id").getAsLong(), aLong -> new LongAdder()).increment();
  76. this.dmonItemCounter.increment();
  77. //收到的推送数据
  78. String name = item.get("name").getAsString();
  79. String value = item.get("value").getAsString();
  80. long time = item.get("t").getAsLong();
  81. this.logger.logInformation(String.format(" %s, %s, %d\n",name,value,time));
  82. //监控点正常无status属性
  83. //long statu = item.get("status").getAsLong();
  84. };
  85. //打印监控点的值集合,集合详细信息请看接口文档http://docs.flexem.net/fbox/zh-cn/tutorials/RealtimeDataPush.html
  86. System.out.printf("%s",jsonElements[1].getAsJsonArray());
  87. //打印boxUid
  88. System.out.printf("%s",jsonElements[2].getAsLong());
  89. // }
  90. // catch (Exception e) {
  91. // System.out.printf("%s", e);
  92. // }
  93. });
  94. });
  95. // signalr报警触发事件
  96. // hubProxy.subscribe("alarmTriggered").addReceivedHandler(jsonElements -> {
  97. // Global.threadPool.submit(() -> {
  98. // System.out.println("Alarm triggered: ");
  99. // for (com.google.gson.JsonElement jsonElement : jsonElements) {
  100. // //报警推送消息全部打印。具体参数解释请看接口文档http://docs.flexem.net/fbox/zh-cn/tutorials/AlarmTiggerPush.html
  101. // System.out.println("\t" + jsonElement);
  102. // };
  103. // //打印报警条目的值集合
  104. // System.out.printf("%s",jsonElements[1].getAsJsonArray());
  105. // //打印boxUid
  106. // System.out.printf("%s",jsonElements[2].getAsLong());
  107. // });
  108. // });
  109. // signalr报警还原事件
  110. // hubProxy.subscribe("alarmRecovered").addReceivedHandler(jsonElements -> {
  111. // Global.threadPool.submit(() -> {
  112. // System.out.println("Alarm recovered: ");
  113. // for (com.google.gson.JsonElement jsonElement : jsonElements) {
  114. // //报警推送消息全部打印。具体参数解释请看接口文档http://docs.flexem.net/fbox/zh-cn/tutorials/AlarmReductionPush.html
  115. // System.out.println("\t" + jsonElement);
  116. // };
  117. // //打印报警条目的值集合
  118. // System.out.printf("%s",jsonElements[1].getAsJsonArray());
  119. // //打印boxUid
  120. // System.out.printf("%s",jsonElements[2].getAsLong());
  121. // });
  122. // });
  123. // signalr盒子状态变更事件
  124. // hubProxy.subscribe("boxConnStateChanged").addReceivedHandler(jsonElements -> {
  125. // Global.threadPool.submit(() -> {
  126. // System.out.println("Box state changed.");
  127. // if (jsonElements.length <= 0)
  128. // return;
  129. // BoxStateChanged[] stateChanges = gson.fromJson(jsonElements[0], BoxStateChanged[].class);
  130. // this.logger.logInformation(String.format("receive count: %d", stateChanges.length));
  131. // for (BoxStateChanged stateChange : stateChanges) {
  132. // // stateChange.id 是盒子列表中BoxReg对象下的box.id,可以根据这个过滤要开的盒子。
  133. // // stateChange.state 为1、2是盒子上线事件。实时数据推送需要开点
  134. // if (stateChange.state == 1 || stateChange.state == 2) {
  135. // try {
  136. // // 盒子每次上线后,均需要开启FBox数据推送控制接口(订阅)
  137. // Global.commServer.executePost("box/" + stateChange.id + "/dmon/start", String.class);
  138. // // token有效期为两小时。若token过期,demo会自动刷新token。所以返回401后均需要重试接口
  139. // this.logger.logInformation(String.format("Start dmon points on box ok %s\n",stateChange.id));
  140. // } catch (IOException e) {
  141. // System.out.println(e);
  142. // e.printStackTrace();
  143. // }
  144. // }
  145. // }
  146. //
  147. // });
  148. // });
  149. }
  150. }