import requests import time class Fetcher: def __init__(self, farmId: "str" = "330110004"): self._farm: "str" = farmId self._sess: "requests.Session" = requests.session() def _login(self) -> "None": url = "http://119.3.44.183:8016/admin/my/loginMultilevel" payload = {"accountName": "superadmin", "password": "hm123456", "farmId": self._farm} resp = self._sess.post(url=url, json=payload) assert resp.status_code == 200, f"login failed: {resp.status_code}" self._sess.headers.setdefault("Accesstoken", resp.json()["data"]["token"]) @staticmethod def _num(data): return data if data is not None else 0 def _clean(self, data: "list[dict]") -> "list[tuple]": # envTemp, earTemp1, earTemp2, act, timestamp return [( self._num(itr["envTemp1"]), self._num(itr["earTemp1"]), self._num(itr["earTemp2"]), self._num(itr["act1"]), itr["addTime"] ) for itr in data] def fetch(self, tagId: "str", start: "str", end: "str") -> "list[tuple]": self._login() url = "http://119.3.44.183:8016/manage2/eartagData/getEnvByTime" payload = {"earmark": tagId, "startDate": start, "endDate": end, "farmId": self._farm} resp = self._sess.post(url=url, json=payload) assert resp.status_code == 200, f"> ERROR: fetch <{tagId}> failed!" return self._clean(resp.json()["data"]) class Result: __m = {1: "睡眠状态", 2: "侧躺状态", 3: "活跃状态", 4: "其它状态"} @staticmethod def kind(t: "int") -> "int": if t == 1: return 1 if t == 3: return 2 if t in [8, 10, 12]: return 3 return 4 @staticmethod def _type(_id: "int") -> "str": return Result.__m.get(_id, "unknown") @staticmethod def _time(ts: "str") -> "int": return int(time.mktime(time.strptime(ts, "%Y-%m-%d %H:%M:%S"))) def update(self, _id: "int"): self.id, self.type = _id, self._type(_id) def __init__(self, _id: "int", _time: "str"): self.id, self.type = _id, self._type(_id) self.time, self.secs = _time, self._time(_time) def __repr__(self): return f"" class Action: def __init__( self, body: "float" = 35.5, actHold: "tuple[int, int]" = (8, 10), tempHold: "tuple[float, float]" = (5.5, 6.0) ): self._bodyTemp: "float" = body self._actMin: "int" = actHold[0] self._actMax: "int" = actHold[1] self._tempMin: "float" = tempHold[0] self._tempMax: "float" = tempHold[1] self._size: "int" = 0 self.result: "list[Result]" = [] def _handle(self, sTime: "str", _id: "int"): _id = Result.kind(_id) now = Result(_id, sTime) self.result.append(now) def calc(self) -> "dict": if self._size == 0: return {} last, res = self.result[0].secs, {} for i in range(1, self._size): key = self.result[i].time[:10] if key not in res: res[key] = {1: 0, 2: 0, 3: 0, 4: 0} res[key][self.result[i].id] += self.result[i].secs - last last = self.result[i].secs for key in res: total = sum(res[key].values()) res[key] = [round(num * 100 / total, 2) for num in res[key].values()] return res def analyze(self, data: "list[tuple]") -> "list[Result]": self._size, self.result = len(data), [] for itr in data: envT, earT1, earT2, act, sTime = itr if abs(earT1 - earT2) < self._tempMin: # 温度稳定 if abs(earT1 - self._bodyTemp) < self._tempMin: # 在体温附近稳定 if act < self._actMax: self._handle(sTime, 1) else: self._handle(sTime, 2) elif abs(earT1 - envT) < self._tempMin: # 在环境温度附近稳定 if act < self._actMax: self._handle(sTime, 3) else: self._handle(sTime, 4) else: # 其余稳定 if act < self._actMax: self._handle(sTime, 5) else: self._handle(sTime, 6) else: if abs(earT1 - self._bodyTemp) < self._tempMax: # 在体温附近不稳定 if act < self._actMin: self._handle(sTime, 7) else: self._handle(sTime, 8) elif abs(earT1 - envT) < self._tempMax: # 在环境温度附近不稳定 if act < self._actMin: self._handle(sTime, 9) else: self._handle(sTime, 10) else: # 其余不稳定 if act < self._actMin: self._handle(sTime, 11) else: self._handle(sTime, 12) return self.result def _nni(self, idx: "int", _id: "int") -> "int": # next not while idx < self._size: if self.result[idx].id != _id: return idx idx += 1 return -1 def _ln4(self, idx: "int") -> "int": # last not 4 while idx > -1: if self.result[idx].id != 4: return idx idx -= 1 return -1 def _ex(self, idx: "int", count: "int") -> "int": s, e = max(0, idx - count), min(self._size, idx + count) for i in range(s, e): self.result[i].update(self.result[idx].id) return e def filter(self, suppress: "int" = 5, expand: "int" = 5) -> "list[Result]": idx = 0 while idx < self._size: if self.result[idx].id == 4: # other nid = self._nni(idx, 4) if nid == -1: # all others after idx break if nid - idx <= suppress: # need suppress pid = self._ln4(idx) if pid == -1 or (nid - idx) < (idx - pid): # no pre / next is closer for i in range(idx, nid): self.result[i].update(self.result[nid].id) else: # pre is closer for i in range(idx, nid): self.result[i].update(self.result[pid].id) idx = nid elif self.result[idx].id in [2, 3]: idx = self._ex(idx, expand) else: idx = self._nni(idx, 1) if idx == -1: break return self.result def show(data: "list"): print("[") [print(f"\t{itr},") for itr in data] print("]") def main(): fetcher = Fetcher() data = fetcher.fetch("133202311130004", "2023-12-1 00:00:00", "2023-12-10 00:00:00") # print(data) action = Action() state = action.analyze(data) show(state) print(action.calc()) # print(len(data), len(state), action.calc) after = action.filter() show(after) print(action.calc()) if __name__ == "__main__": main()