Newspaper 1 éve
szülő
commit
68273c0f52

+ 134 - 4
huimv-eartag2-eartag/src/main/java/com/huimv/eartag2/eartag/config/TopicRabbitMQConfig.java

@@ -1,10 +1,7 @@
 package com.huimv.eartag2.eartag.config;
 package com.huimv.eartag2.eartag.config;
 
 
 import com.huimv.eartag2.common.mq.Const;
 import com.huimv.eartag2.common.mq.Const;
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.core.*;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
 
 
@@ -60,4 +57,137 @@ public class TopicRabbitMQConfig {
 //        return BindingBuilder.bind(eartagQueue()).to(eartagExchange()).with(Const.ROUTING_KEY_EARTAG);
 //        return BindingBuilder.bind(eartagQueue()).to(eartagExchange()).with(Const.ROUTING_KEY_EARTAG);
 //    }
 //    }
 
 
+
+    // ============================= 计算小时运动量 ============================= //
+    @Bean
+    public Queue syncCountActHourQueue() {
+        return new Queue(Const.QUEUE_COUNT_ACT_HOUR);
+    }
+    @Bean
+    public  DirectExchange syncCountActHourExchange() {
+        return new DirectExchange(Const.EXCHANGE_COUNT_ACT_HOUR);
+    }
+    @Bean
+    public Binding bindingCountActHourExchangeMessage() {
+        return BindingBuilder.bind(syncCountActHourQueue()).to(syncCountActHourExchange()).with(Const.ROUTING_KEY_COUNT_ACT_HOUR);
+    }
+
+    // ============================= 更新设备注册信息 ============================= //
+    @Bean
+    public Queue syncDeviceRegistQueue() {
+        return new Queue(Const.QUEUE_DEVICE_REGIST);
+    }
+    @Bean
+    public  DirectExchange syncDeviceRegistExchange() {
+        return new DirectExchange(Const.EXCHANGE_DEVICE_REGIST);
+    }
+    @Bean
+    public Binding bindingDeviceRegistExchangeMessage() {
+        return BindingBuilder.bind(syncDeviceRegistQueue()).to(syncDeviceRegistExchange()).with(Const.ROUTING_KEY_DEVICE_REGIST);
+    }
+
+    // ============================= 更新耳标注册信息 ============================= //
+    @Bean
+    public Queue syncDartagRegistQueue() {
+        return new Queue(Const.QUEUE_EARTAG_REGIST);
+    }
+    @Bean
+    public  DirectExchange synceartagRegistExchange() {
+        return new DirectExchange(Const.EXCHANGE_EARTAG_REGIST);
+    }
+    @Bean
+    public Binding bindineartagRegistExchangeMessage() {
+        return BindingBuilder.bind(syncDartagRegistQueue()).to(synceartagRegistExchange()).with(Const.ROUTING_KEY_EARTAG_REGIST);
+    }
+
+
+    // ============================= 更新设备在线统计 ============================= //
+    @Bean
+    public Queue syncdeviceCountQueue() {
+        return new Queue(Const.QUEUE_DEVICE_COUNT);
+    }
+    @Bean
+    public  DirectExchange deviceCountExchange() {
+        return new DirectExchange(Const.EXCHANGE_DEVICE_COUNT);
+    }
+    @Bean
+    public Binding deviceCountExchangeMessage() {
+        return BindingBuilder.bind(syncdeviceCountQueue()).to(deviceCountExchange()).with(Const.ROUTING_KEY_DEVICE_COUNT);
+    }
+
+    // ============================= 更新耳标在线统计 ============================= //
+    @Bean
+    public Queue eartagCountQueue() {
+        return new Queue(Const.QUEUE_EARTAG_COUNT);
+    }
+    @Bean
+    public  DirectExchange eartagCountExchange() {
+        return new DirectExchange(Const.EXCHANGE_EARTAG_COUNT);
+    }
+    @Bean
+    public Binding eartagCountMessage() {
+        return BindingBuilder.bind(eartagCountQueue()).to(eartagCountExchange()).with(Const.ROUTING_KEY_EARTAG_COUNT);
+    }
+
+
+    // ============================= 更新耳标在线状态 ============================= //
+    @Bean
+    public Queue eartagLiveStatusQueue() {
+        return new Queue(Const.QUEUE_EARTAG_LIVE);
+    }
+    @Bean
+    public  DirectExchange eartagLiveStatusExchange() {
+        return new DirectExchange(Const.EXCHANGE_EARTAG_LIVE);
+    }
+    @Bean
+    public Binding eartagLiveStatusMessage() {
+        return BindingBuilder.bind(eartagLiveStatusQueue()).to(eartagLiveStatusExchange()).with(Const.ROUTING_KEY_EARTAG_LIVE);
+    }
+
+    // ============================= 更新耳标设备在线关联统计 ============================= //
+    @Bean
+    public Queue updateEartagDeviceOnlineCountQueue() {
+        return new Queue(Const.QUEUE_EARTAG_DEVICE_ONLINE);
+    }
+    @Bean
+    public  DirectExchange updateEartagDeviceOnlineCountExchange() {
+        return new DirectExchange(Const.EXCHANGE_EARTAG_DEVICE_ONLINE);
+    }
+    @Bean
+    public Binding updateEartagDeviceOnlineCountMessage() {
+        return BindingBuilder.bind(updateEartagDeviceOnlineCountQueue()).to(updateEartagDeviceOnlineCountExchange()).with(Const.ROUTING_KEY_EARTAG_DEVICE_ONLINE);
+    }
+
+    // ============================= 异常统计和异常报警 ============================= //
+    @Bean
+    public Queue abnormalCountAndAlarmQueue() {
+        return new Queue(Const.QUEUE_ABNORMAL);
+    }
+    @Bean
+    public  DirectExchange abnormalCountAndAlarmExchange() {
+        return new DirectExchange(Const.EXCHANGE_ABNORMAL);
+    }
+    @Bean
+    public Binding abnormalCountAndAlarmMessage() {
+        return BindingBuilder.bind(abnormalCountAndAlarmQueue()).to(abnormalCountAndAlarmExchange()).with(Const.ROUTING_KEY_ABNORMAL);
+    }
+
+    // ============================= 判断该设备编号是否存在牧场缓存在线集合当中 ============================= //
+    @Bean
+    public Queue updateEartagPropertyOfAllStatusQueue() {
+        return new Queue(Const.QUEUE_AllSTATUS);
+    }
+    @Bean
+    public  DirectExchange updateEartagPropertyOfAllStatusExchange() {
+        return new DirectExchange(Const.EXCHANGE_AllSTATUS);
+    }
+    @Bean
+    public Binding updateEartagPropertyOfAllStatusMessage() {
+        return BindingBuilder.bind(updateEartagPropertyOfAllStatusQueue()).to(updateEartagPropertyOfAllStatusExchange()).with(Const.ROUTING_KEY_AllSTATUS);
+    }
+
+
+
+
+
 }
 }

+ 140 - 96
huimv-eartag2-eartag/src/main/java/com/huimv/eartag2/eartag/listener/EartagListener.java

@@ -1,5 +1,6 @@
 package com.huimv.eartag2.eartag.listener;
 package com.huimv.eartag2.eartag.listener;
 
 
+import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.huimv.eartag2.common.mq.Const;
 import com.huimv.eartag2.common.mq.Const;
 import com.huimv.eartag2.common.utils.DateUtil;
 import com.huimv.eartag2.common.utils.DateUtil;
@@ -10,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
 //import org.apache.kafka.clients.consumer.ConsumerRecord;
 //import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 //import org.springframework.kafka.annotation.KafkaListener;
 //import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
@@ -41,105 +43,11 @@ public class EartagListener {
     private IDeviceService deviceService;
     private IDeviceService deviceService;
     @Autowired
     @Autowired
     private IEartagService eartagService;
     private IEartagService eartagService;
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
 
 
     private static final String TOPIC = "test-topic";
     private static final String TOPIC = "test-topic";
 
 
-
-//    @KafkaListener(topics = TOPIC, groupId = "my-group")
-//    public void listen(ConsumerRecord<String, String> record) {
-//        System.out.println("Received message: " + record.value());
-//    }
-    /**
-     * @Method : processRawdata
-     * @Description :
-     * @Params : [RawMap]
-     * @Return : void
-     * @Author : ZhuoNing
-     * @Date : 2022/3/12
-     * @Time : 17:53
-     */
-//    @RabbitListener(queues = Const.QUEUE_ASK_EARTAG )
-    @RabbitListener(queues = "#{'direct.askEartag.queue' + ${rabbitmq.queues}}")
-    @RabbitHandler
-    public void processRawdata(Map RawMap) throws ParseException, IOException {
-         log.info("开始处理耳标数据------>"+RawMap);
-        String askText = RawMap.get("askText").toString();
-
-        //本模块主要执行保存以下4类数据:(其他注册数据,在线数据,状态数据等都是由其他模块工程执行处理-process2模块工程)
-        //--保存原始流水数据(原始流水表)
-        //--保存设备心跳流水数据(设备心跳流水表)
-        //--保存设备环境流水数据(设备温度流水表)
-        //--保存耳标流水数据(耳标流水表)
-
-        //{处理请求报文}
-        Map askMap = handleAskText(askText);
-        if (askMap == null){
-            return;
-        }
-        String type = askMap.get("type").toString();
-        JSONObject dataJo = (JSONObject) askMap.get("data");
-        if (type.trim().equalsIgnoreCase("eartag")) {
-            //{处理耳标数据}
-            eartagService.handleEartag(dataJo);
-            //调用省平台的接口,将耳标数据发送到省平台
-//            eartagService.sendSowProvincePlatform(dataJo,askText);
-        }
-
-    }
-    public boolean getAskTime(String askTime)   {
-        String newAskTime = askTime.substring(0, 4) + "-" + askTime.substring(4, 6) + "-" + askTime.substring(6, 8) + " " + askTime.substring(8, 10) + ":" + askTime.substring(10, 12) + ":" + askTime.substring(12, 14);
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        try {
-            Date targetDate = sdf.parse(newAskTime);
-            Date currentDate = new Date();
-
-            // 获取时间戳
-            long targetTimestamp = targetDate.getTime();
-            long currentTimestamp = currentDate.getTime();
-
-            // 设置时间差阈值,例如10分钟
-            long maxTimeDifference = 60 * 1000; // 10 minutes in milliseconds
-
-            // 计算时间差
-            long timeDifference = Math.abs(targetTimestamp - currentTimestamp);
-
-            if (timeDifference <= maxTimeDifference) {
-                return true;
-            } else {
-                return false;
-            }
-        } catch (ParseException e) {
-            e.printStackTrace();
-        }
-        return false;
-    }
-
-    public void writeTxt(String conent,String file) throws ParseException, IOException {
-        String todayText = getTodayDateText();
-        File f = new File("/opt/log/log_"+file+"_"+todayText+".txt");
-        if (f.exists()) {
-        } else {
-            f.createNewFile();// 不存在则创建
-        }
-        BufferedWriter output = new BufferedWriter(new FileWriter(f,true));//true,则追加写入text文本
-        output.write(conent);
-        output.write("\r\n");//换行
-        output.flush();
-        output.close();
-    }
-
-    public String getTodayDateText() throws ParseException {
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-        return sdf.format(new Date());
-    }
-
-    private boolean checkTestRange(String earmark) {
-        Long c = Long.parseLong(earmark);
-        Long a = 122083123610001L;
-        Long b = 122083123610100L;
-        return (c >= a && c < b) || (c > a && c <= b);
-    }
-
     /**
     /**
      * @Method : handleAskText
      * @Method : handleAskText
      * @Description : 处理请求
      * @Description : 处理请求
@@ -236,4 +144,140 @@ public class EartagListener {
         outMap.put("data", dataJo);
         outMap.put("data", dataJo);
         return outMap;
         return outMap;
     }
     }
+
+    /**
+     * @Method : processRawdata
+     * @Description :
+     * @Params : [RawMap]
+     * @Return : void
+     * @Author : ZhuoNing
+     * @Date : 2022/3/12
+     * @Time : 17:53
+     */
+//    @RabbitListener(queues = Const.QUEUE_ASK_EARTAG )
+    @RabbitListener(queues = "#{'direct.askEartag.queue' + ${rabbitmq.queues}}")
+    @RabbitHandler
+    public void processRawdata(Map RawMap) throws ParseException, IOException {
+         log.info("开始处理原始耳标数据------>"+RawMap);
+        String askText = RawMap.get("askText").toString();
+
+        //{处理请求报文}
+        Map askMap = handleAskText(askText);
+        if (askMap == null){
+            return;
+        }
+        String type = askMap.get("type").toString();
+        JSONObject dataJo = (JSONObject) askMap.get("data");
+        if (type.trim().equalsIgnoreCase("eartag")) {
+            String deviceCode = dataJo.getString("device");
+            String farmId = deviceService.getFarmIdByDeviceCode(deviceCode);
+            System.out.println("####### farmId=" + farmId);
+            if (ObjectUtil.isEmpty(farmId)){
+                log.error("出错:该请求的FarmId为空.请检查该设备编码[" + deviceCode + "]是否存在FarmId.");
+                return;
+            }
+            dataJo.put("farmId",farmId);
+            //计算小时运动量
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_COUNT_ACT_HOUR,Const.ROUTING_KEY_COUNT_ACT_HOUR,dataJo);
+
+            //更新设备注册信息
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_DEVICE_REGIST,Const.ROUTING_KEY_DEVICE_REGIST,dataJo);
+
+            //更新耳标注册信息
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_EARTAG_REGIST,Const.ROUTING_KEY_EARTAG_REGIST,dataJo);
+
+            //更新设备在线统计
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_DEVICE_COUNT,Const.ROUTING_KEY_DEVICE_COUNT,dataJo);
+
+            //更新耳标在线统计
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_EARTAG_COUNT,Const.ROUTING_KEY_EARTAG_COUNT,dataJo);
+
+            //更新耳标在线状态
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_EARTAG_LIVE,Const.ROUTING_KEY_EARTAG_LIVE,dataJo);
+
+            //更新耳标设备在线关联统计
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_EARTAG_DEVICE_ONLINE,Const.ROUTING_KEY_EARTAG_DEVICE_ONLINE,dataJo);
+
+            //异常统计和异常报警
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_ABNORMAL,Const.ROUTING_KEY_ABNORMAL,dataJo);
+
+            //判断该设备编号是否存在牧场缓存在线集合当中
+            rabbitTemplate.convertAndSend(Const.EXCHANGE_AllSTATUS,Const.ROUTING_KEY_AllSTATUS,dataJo);
+
+
+        }
+        eartagService.handleEartag(dataJo);
+    }
+
+
+
+    @RabbitListener(queues = Const.QUEUE_COUNT_ACT_HOUR)
+    @RabbitHandler
+    public void countActHour(JSONObject dataJo) throws ParseException, IOException {
+        log.info("开始计算小时运动量------>"+dataJo);
+
+    }
+
+
+
+
+
+
+
+
+
+
+    public boolean getAskTime(String askTime)   {
+        String newAskTime = askTime.substring(0, 4) + "-" + askTime.substring(4, 6) + "-" + askTime.substring(6, 8) + " " + askTime.substring(8, 10) + ":" + askTime.substring(10, 12) + ":" + askTime.substring(12, 14);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        try {
+            Date targetDate = sdf.parse(newAskTime);
+            Date currentDate = new Date();
+
+            // 获取时间戳
+            long targetTimestamp = targetDate.getTime();
+            long currentTimestamp = currentDate.getTime();
+
+            // 设置时间差阈值,例如10分钟
+            long maxTimeDifference = 60 * 1000; // 10 minutes in milliseconds
+
+            // 计算时间差
+            long timeDifference = Math.abs(targetTimestamp - currentTimestamp);
+
+            if (timeDifference <= maxTimeDifference) {
+                return true;
+            } else {
+                return false;
+            }
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    public void writeTxt(String conent,String file) throws ParseException, IOException {
+        String todayText = getTodayDateText();
+        File f = new File("/opt/log/log_"+file+"_"+todayText+".txt");
+        if (f.exists()) {
+        } else {
+            f.createNewFile();// 不存在则创建
+        }
+        BufferedWriter output = new BufferedWriter(new FileWriter(f,true));//true,则追加写入text文本
+        output.write(conent);
+        output.write("\r\n");//换行
+        output.flush();
+        output.close();
+    }
+
+    public String getTodayDateText() throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        return sdf.format(new Date());
+    }
+
+    private boolean checkTestRange(String earmark) {
+        Long c = Long.parseLong(earmark);
+        Long a = 122083123610001L;
+        Long b = 122083123610100L;
+        return (c >= a && c < b) || (c > a && c <= b);
+    }
 }
 }

+ 5 - 2
huimv-eartag2-eartag/src/main/java/com/huimv/eartag2/eartag/service/impl/EartagServiceImpl.java

@@ -122,7 +122,7 @@ public class EartagServiceImpl implements IEartagService {
         System.out.println("####### farmId=" + farmId);
         System.out.println("####### farmId=" + farmId);
         if (farmId != null) {
         if (farmId != null) {
             //{计算小时运动量}
             //{计算小时运动量}
-            countHourAct(dataJo, nowTimestamp, todayDate, farmId);
+            countHourAct(dataJo);
 
 
             //{保存耳标流水(所有耳标数据,可能重复上传)}
             //{保存耳标流水(所有耳标数据,可能重复上传)}
             saveEartagFlow(dataJo, nowTimestamp, todayDate, farmId);
             saveEartagFlow(dataJo, nowTimestamp, todayDate, farmId);
@@ -606,7 +606,10 @@ public class EartagServiceImpl implements IEartagService {
     }
     }
 
 
     // 计算小时运动量
     // 计算小时运动量
-    private void countHourAct(JSONObject eartagJo, Timestamp nowTimestamp, java.sql.Date todayDate, String farmId) {
+    private void countHourAct(JSONObject eartagJo) {
+        java.sql.Date todayDate = new java.sql.Date(new java.util.Date().getTime());
+        Timestamp nowTimestamp = new Timestamp(new java.util.Date().getTime());
+        String farmId = eartagJo.getString("farmId");
         DateUtil du = new DateUtil();
         DateUtil du = new DateUtil();
         int nowHour = du.getNowHour();
         int nowHour = du.getNowHour();
         // 小时
         // 小时

+ 6 - 4
huimv-eartag2-eartag/src/main/resources/application.properties

@@ -6,9 +6,11 @@
 spring.profiles.active=prod
 spring.profiles.active=prod
 #spring.profiles.active=test2
 #spring.profiles.active=test2
 
 
-device.register.prefix=device_register_
-redis.expire.eartag_online_set=25
+#device.register.prefix=device_register_
+
+
+#redis.expire.eartag_online_set=25
 
 
 #
 #
-alarm.maxTemp=40
-alarm.minTemp=30
+#alarm.maxTemp=40
+#alarm.minTemp=30