|
@@ -0,0 +1,360 @@
|
|
|
+package com.ruoyi.web.core.nfid;
|
|
|
+
|
|
|
+import com.github.benmanes.caffeine.cache.Cache;
|
|
|
+import com.github.benmanes.caffeine.cache.Caffeine;
|
|
|
+import com.lmax.disruptor.*;
|
|
|
+import com.lmax.disruptor.dsl.Disruptor;
|
|
|
+import com.lmax.disruptor.dsl.ProducerType;
|
|
|
+import com.ruoyi.app.DTO.HookBindBatchListDTO;
|
|
|
+import com.ruoyi.app.DTO.HookBindDetailDTO;
|
|
|
+import com.ruoyi.app.domain.HookBind;
|
|
|
+import com.ruoyi.app.domain.PorkSideProduce;
|
|
|
+import com.ruoyi.app.service.IHookBindService;
|
|
|
+import com.ruoyi.app.service.IHookRegisterService;
|
|
|
+import com.ruoyi.app.service.INFIDReaderService;
|
|
|
+import com.ruoyi.app.service.IPorkSideProduceService;
|
|
|
+import com.ruoyi.common.utils.DateUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.dao.DataAccessException;
|
|
|
+import org.springframework.data.redis.core.RedisOperations;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.data.redis.core.SessionCallback;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.sql.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+import com.ruoyi.app.domain.NFIDReader;
|
|
|
+
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+
|
|
|
+@Component
|
|
|
+public class MultiReaderNFIDProcessor {
|
|
|
+ // 配置参数
|
|
|
+ private static final int BUFFER_FLUSH_SIZE = 200;
|
|
|
+ private static final long BUFFER_FLUSH_INTERVAL_MS = 3000;
|
|
|
+ private static final long TAG_EXPIRY_MS = 30000;
|
|
|
+ private static final int DISRUPTOR_BUFFER_SIZE = 1024 * 8;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RedisTemplate<String,String> redisTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IHookRegisterService hookRegisterService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private INFIDReaderService NFIDReaderService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IHookBindService hookBindService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IPorkSideProduceService porkSideProduceService;
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void destroy()
|
|
|
+ {
|
|
|
+ disruptor.shutdown();
|
|
|
+ flushAllBuffers();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 数据结构
|
|
|
+ private final ConcurrentHashMap<String, ReaderInfo> readers = new ConcurrentHashMap<>();
|
|
|
+ //private final ConcurrentHashMap<String, Long> localCache;
|
|
|
+ private final Cache<String, Long> localCache = Caffeine.newBuilder()
|
|
|
+ .maximumSize(100_000)
|
|
|
+ .expireAfterWrite(TAG_EXPIRY_MS, TimeUnit.MILLISECONDS)
|
|
|
+ .build();
|
|
|
+ private final String redisCachePrefix = "nfid:reader:"; // 按识别器分组的缓存
|
|
|
+
|
|
|
+ // Disruptor相关
|
|
|
+ private final Disruptor<TagEvent> disruptor;
|
|
|
+ private final RingBuffer<TagEvent> ringBuffer;
|
|
|
+
|
|
|
+ // 批量处理
|
|
|
+ private final List<TagRecord> batchBuffer = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ // 按阶段分组缓冲区
|
|
|
+ private final ConcurrentMap<String, List<TagRecord>> batchBuffers = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+
|
|
|
+ // 数据模型
|
|
|
+ private static class ReaderInfo {
|
|
|
+ String readerId;
|
|
|
+ String readerName;
|
|
|
+ long lastActiveTime;
|
|
|
+ // 其他识别器元数据...
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class TagEvent {
|
|
|
+ String tagId;
|
|
|
+ String readerId;
|
|
|
+ long timestamp;
|
|
|
+
|
|
|
+ void set(String tagId, String readerId, long timestamp) {
|
|
|
+ this.tagId = tagId;
|
|
|
+ this.readerId = readerId;
|
|
|
+ this.timestamp = timestamp;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class TagRecord {
|
|
|
+ String tagId;
|
|
|
+ String readerId;
|
|
|
+ Timestamp detectionTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public MultiReaderNFIDProcessor() {
|
|
|
+
|
|
|
+ // 初始化Disruptor
|
|
|
+ ExecutorService disruptorExecutor = Executors.newSingleThreadExecutor();
|
|
|
+ this.disruptor = new Disruptor<>(
|
|
|
+ TagEvent::new,
|
|
|
+ DISRUPTOR_BUFFER_SIZE,
|
|
|
+ disruptorExecutor,
|
|
|
+ ProducerType.MULTI,
|
|
|
+ new SleepingWaitStrategy()
|
|
|
+ );
|
|
|
+
|
|
|
+ // 设置事件处理器
|
|
|
+ disruptor.handleEventsWith(this::handleEvent);
|
|
|
+ this.ringBuffer = disruptor.start();
|
|
|
+
|
|
|
+ // 启动定时任务
|
|
|
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
|
|
|
+ scheduler.scheduleAtFixedRate(
|
|
|
+ this::flushAllBuffers,
|
|
|
+ BUFFER_FLUSH_INTERVAL_MS,
|
|
|
+ BUFFER_FLUSH_INTERVAL_MS,
|
|
|
+ TimeUnit.MILLISECONDS
|
|
|
+ );
|
|
|
+ scheduler.scheduleAtFixedRate(
|
|
|
+ this::cleanExpiredCache,
|
|
|
+ 1, 1, TimeUnit.MINUTES
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 接收标签数据(带识别器ID)
|
|
|
+ */
|
|
|
+ public void receiveTag(String tagId, String readerId) {
|
|
|
+ long sequence = ringBuffer.next();
|
|
|
+ try {
|
|
|
+ TagEvent event = ringBuffer.get(sequence);
|
|
|
+ event.set(tagId, readerId, System.currentTimeMillis());
|
|
|
+ } finally {
|
|
|
+ ringBuffer.publish(sequence);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新识别器活跃状态
|
|
|
+ ReaderInfo reader = readers.get(readerId);
|
|
|
+ if (reader != null) {
|
|
|
+ reader.lastActiveTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理标签事件
|
|
|
+ */
|
|
|
+ private void handleEvent(TagEvent event, long sequence, boolean endOfBatch) {
|
|
|
+ System.out.println("处理标签事件");
|
|
|
+ String compositeKey = event.readerId + ":" + event.tagId;
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 1. 检查本地缓存
|
|
|
+ Long lastSeen = localCache.getIfPresent(compositeKey);
|
|
|
+ if (lastSeen != null && (now - lastSeen) < TAG_EXPIRY_MS) {
|
|
|
+
|
|
|
+ System.out.println("检查本地缓存");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 检查Redis缓存
|
|
|
+ String redisKey = redisCachePrefix + event.readerId;
|
|
|
+ try {
|
|
|
+ Boolean exists = redisTemplate.opsForHash().hasKey(redisKey, event.tagId);
|
|
|
+ System.out.println(redisKey+"-"+event.tagId);
|
|
|
+ if (exists != null && exists) {
|
|
|
+ System.out.println("保存"+compositeKey);
|
|
|
+ localCache.put(compositeKey, now);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 新标签,加入处理流程
|
|
|
+ localCache.put(compositeKey, now);
|
|
|
+
|
|
|
+ // 准备批量记录
|
|
|
+ System.out.println("准备批量记录");
|
|
|
+
|
|
|
+
|
|
|
+ // 更新Redis缓存
|
|
|
+ updateRedisCache(event.readerId, event.tagId, now);
|
|
|
+
|
|
|
+ String batchName = getReaderType(event.readerId);
|
|
|
+
|
|
|
+ TagRecord record = new TagRecord();
|
|
|
+ record.tagId = event.tagId;
|
|
|
+ record.readerId = event.readerId;
|
|
|
+ record.detectionTime = new Timestamp(event.timestamp);
|
|
|
+ batchBuffer.add(record);
|
|
|
+
|
|
|
+ // 添加到对应表的缓冲区
|
|
|
+ batchBuffers.computeIfAbsent(batchName, k -> new ArrayList<>())
|
|
|
+ .add(record);
|
|
|
+ // 检查批量大小
|
|
|
+ if (batchBuffers.get(batchName).size() >= BUFFER_FLUSH_SIZE) {
|
|
|
+ flushBuffer(batchName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getReaderType(String deviceSerial){
|
|
|
+ return NFIDReaderService.selectNFIDReaderSpotByKey(deviceSerial);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateRedisCache(String readerId, String tagId, long timestamp) {
|
|
|
+ System.out.println("更新Redis缓存");
|
|
|
+ String redisKey = redisCachePrefix + readerId;
|
|
|
+ try {
|
|
|
+ redisTemplate.opsForHash().put(redisKey, tagId, String.valueOf(timestamp));
|
|
|
+
|
|
|
+ redisTemplate.expire(redisKey, TAG_EXPIRY_MS / 1000, TimeUnit.SECONDS);
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //刷新全部
|
|
|
+ private void flushAllBuffers() {
|
|
|
+ Set<String> batchNames;
|
|
|
+ synchronized (batchBuffers) {
|
|
|
+ batchNames = new HashSet<>(batchBuffers.keySet());
|
|
|
+ }
|
|
|
+
|
|
|
+ for (String batchName : batchNames) {
|
|
|
+ flushBuffer(batchName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量写入数据库
|
|
|
+ */
|
|
|
+ private void flushBuffer(String batchName) {
|
|
|
+ List<TagRecord> records;
|
|
|
+ synchronized (batchBuffers) {
|
|
|
+ records = new ArrayList<>(batchBuffers.getOrDefault(batchName, Collections.emptyList()));
|
|
|
+ batchBuffers.remove(batchName);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (records.isEmpty()) return;
|
|
|
+
|
|
|
+ try {
|
|
|
+ if(Objects.equals(batchName, NFIDReader.BIND_SPOT)){
|
|
|
+ //吊钩绑定
|
|
|
+ saveBind(records);
|
|
|
+ }else if(Objects.equals(batchName, NFIDReader.WEIGHT_SPOT)){
|
|
|
+ //白条称重
|
|
|
+ saveWeight(records);
|
|
|
+ }else{
|
|
|
+ throw new Exception("未知点位类型:"+batchName);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //吊钩绑定
|
|
|
+ private void saveBind(List<TagRecord> records) {
|
|
|
+ //生成新对象
|
|
|
+ List<HookBind> hookBinds = records.stream()
|
|
|
+ .map(item -> {
|
|
|
+ HookBind hookBind = new HookBind();
|
|
|
+ hookBind.setHookNo(hookRegisterService.selectHookNoByEpcNo(item.tagId));
|
|
|
+ hookBind.setDeviceSerial(item.readerId);
|
|
|
+ hookBind.setCreateTime(item.detectionTime);
|
|
|
+ hookBind.setIsBind(HookBind.NOT_BIND);
|
|
|
+ return hookBind;
|
|
|
+ })
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ hookBindService.insertHookBindBatch(hookBinds);
|
|
|
+ }
|
|
|
+
|
|
|
+ //白条称重
|
|
|
+ private void saveWeight(List<TagRecord> records) {
|
|
|
+ //生成新对象
|
|
|
+ List<PorkSideProduce> porkSideProduces = records.stream()
|
|
|
+ .map(item -> {
|
|
|
+ PorkSideProduce porkSideProduce = new PorkSideProduce();
|
|
|
+ porkSideProduce.setHookNo(hookRegisterService.selectHookNoByEpcNo(item.tagId));
|
|
|
+ porkSideProduce.setDeviceSerial(item.readerId);
|
|
|
+ porkSideProduce.setProduceTime(item.detectionTime);
|
|
|
+ porkSideProduce.setCreateTime(item.detectionTime);
|
|
|
+ porkSideProduce.setProductName(PorkSideProduce.DEFAULT_NAME);
|
|
|
+
|
|
|
+ //获取吊钩绑定关联信息
|
|
|
+ HookBindDetailDTO hookBindDetail = hookBindService.selectHookBindDetail(porkSideProduce.getHookNo());
|
|
|
+ porkSideProduce.setEntranceBatchId(hookBindDetail.getEntranceBatchId());
|
|
|
+ porkSideProduce.setDistributeBatchId(hookBindDetail.getDistributeBatchId());
|
|
|
+ porkSideProduce.setSlaughterCode(hookBindDetail.getSlaughterCode());
|
|
|
+
|
|
|
+ return porkSideProduce;
|
|
|
+ })
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ porkSideProduceService.insertPorkSideProduceBatch(porkSideProduces);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 清理过期缓存
|
|
|
+ */
|
|
|
+ private void cleanExpiredCache() {
|
|
|
+ System.out.println("清理过期缓存");
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 清理本地缓存
|
|
|
+ localCache.asMap().entrySet().removeIf(entry ->
|
|
|
+ (now - entry.getValue()) >= TAG_EXPIRY_MS
|
|
|
+ );
|
|
|
+
|
|
|
+ // 清理Redis缓存
|
|
|
+ try{
|
|
|
+ // 获取所有识别器缓存键
|
|
|
+ // 获取所有匹配的keys
|
|
|
+ Set<String> readerKeys = redisTemplate.keys(redisCachePrefix + "*");
|
|
|
+
|
|
|
+ if (readerKeys != null) { // keys可能返回null
|
|
|
+ for (String key : readerKeys) {
|
|
|
+ // 获取整个hash
|
|
|
+ Map<Object, Object> tags = redisTemplate.opsForHash().entries(key);
|
|
|
+
|
|
|
+ // 过滤过期的entry
|
|
|
+ tags.entrySet().removeIf(entry -> {
|
|
|
+ long timestamp = Long.parseLong(entry.getValue().toString());
|
|
|
+ return (now - timestamp) >= TAG_EXPIRY_MS;
|
|
|
+ });
|
|
|
+
|
|
|
+ // 执行事务操作
|
|
|
+ redisTemplate.execute(new SessionCallback<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object execute(RedisOperations operations) throws DataAccessException {
|
|
|
+ operations.watch(key);
|
|
|
+ operations.multi();
|
|
|
+
|
|
|
+ if (tags.isEmpty()) {
|
|
|
+ operations.delete(key);
|
|
|
+ } else {
|
|
|
+ operations.opsForHash().putAll(key, tags);
|
|
|
+ operations.expire(key, TAG_EXPIRY_MS / 1000, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ return operations.exec();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|