123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- import requests
- import time
- class Fetcher:
- def __init__(self, farmId: "str" = "330110004"):
- self._farm: "str" = farmId
- self._sess: "requests.Session" = requests.session()
- def _login(self) -> "None":
- url = "http://119.3.44.183:8016/admin/my/loginMultilevel"
- payload = {"accountName": "superadmin", "password": "hm123456", "farmId": self._farm}
- resp = self._sess.post(url=url, json=payload)
- assert resp.status_code == 200, f"login failed: {resp.status_code}"
- self._sess.headers.setdefault("Accesstoken", resp.json()["data"]["token"])
- @staticmethod
- def _num(data):
- return data if data is not None else 0
- def _clean(self, data: "list[dict]") -> "list[tuple]":
- # envTemp, earTemp1, earTemp2, act, timestamp
- return [(
- self._num(itr["envTemp1"]), self._num(itr["earTemp1"]),
- self._num(itr["earTemp2"]), self._num(itr["act1"]), itr["addTime"]
- ) for itr in data]
- def fetch(self, tagId: "str", start: "str", end: "str") -> "list[tuple]":
- self._login()
- url = "http://119.3.44.183:8016/manage2/eartagData/getEnvByTime"
- payload = {"earmark": tagId, "startDate": start, "endDate": end, "farmId": self._farm}
- resp = self._sess.post(url=url, json=payload)
- assert resp.status_code == 200, f"> ERROR: fetch <{tagId}> failed!"
- return self._clean(resp.json()["data"])
- class Result:
- __m = {1: "睡眠状态", 2: "侧躺状态", 3: "活跃状态", 4: "其它状态"}
- @staticmethod
- def kind(t: "int") -> "int":
- if t == 1:
- return 1
- if t == 3:
- return 2
- if t in [8, 10, 12]:
- return 3
- return 4
- @staticmethod
- def _type(_id: "int") -> "str":
- return Result.__m.get(_id, "unknown")
- @staticmethod
- def _time(ts: "str") -> "int":
- return int(time.mktime(time.strptime(ts, "%Y-%m-%d %H:%M:%S")))
- def update(self, _id: "int"):
- self.id, self.type = _id, self._type(_id)
- def __init__(self, _id: "int", _time: "str"):
- self.id, self.type = _id, self._type(_id)
- self.time, self.secs = _time, self._time(_time)
- def __repr__(self):
- return f"<id: {self.id}, type: \"{self.type}\", time: \"{self.time}\", stamp: {self.secs}>"
- class Action:
- def __init__(
- self,
- body: "float" = 35.5,
- actHold: "tuple[int, int]" = (8, 10),
- tempHold: "tuple[float, float]" = (5.5, 6.0)
- ):
- self._bodyTemp: "float" = body
- self._actMin: "int" = actHold[0]
- self._actMax: "int" = actHold[1]
- self._tempMin: "float" = tempHold[0]
- self._tempMax: "float" = tempHold[1]
- self._size: "int" = 0
- self.result: "list[Result]" = []
- def _handle(self, sTime: "str", _id: "int"):
- _id = Result.kind(_id)
- now = Result(_id, sTime)
- self.result.append(now)
- def calc(self) -> "dict":
- if self._size == 0:
- return {}
- last, res = self.result[0].secs, {}
- for i in range(1, self._size):
- key = self.result[i].time[:10]
- if key not in res:
- res[key] = {1: 0, 2: 0, 3: 0, 4: 0}
- res[key][self.result[i].id] += self.result[i].secs - last
- last = self.result[i].secs
- for key in res:
- total = sum(res[key].values())
- res[key] = [round(num * 100 / total, 2) for num in res[key].values()]
- return res
- def analyze(self, data: "list[tuple]") -> "list[Result]":
- self._size, self.result = len(data), []
- for itr in data:
- envT, earT1, earT2, act, sTime = itr
- if abs(earT1 - earT2) < self._tempMin: # 温度稳定
- if abs(earT1 - self._bodyTemp) < self._tempMin: # 在体温附近稳定
- if act < self._actMax:
- self._handle(sTime, 1)
- else:
- self._handle(sTime, 2)
- elif abs(earT1 - envT) < self._tempMin: # 在环境温度附近稳定
- if act < self._actMax:
- self._handle(sTime, 3)
- else:
- self._handle(sTime, 4)
- else: # 其余稳定
- if act < self._actMax:
- self._handle(sTime, 5)
- else:
- self._handle(sTime, 6)
- else:
- if abs(earT1 - self._bodyTemp) < self._tempMax: # 在体温附近不稳定
- if act < self._actMin:
- self._handle(sTime, 7)
- else:
- self._handle(sTime, 8)
- elif abs(earT1 - envT) < self._tempMax: # 在环境温度附近不稳定
- if act < self._actMin:
- self._handle(sTime, 9)
- else:
- self._handle(sTime, 10)
- else: # 其余不稳定
- if act < self._actMin:
- self._handle(sTime, 11)
- else:
- self._handle(sTime, 12)
- return self.result
- def _nni(self, idx: "int", _id: "int") -> "int": # next not <id>
- while idx < self._size:
- if self.result[idx].id != _id:
- return idx
- idx += 1
- return -1
- def _ln4(self, idx: "int") -> "int": # last not 4
- while idx > -1:
- if self.result[idx].id != 4:
- return idx
- idx -= 1
- return -1
- def _ex(self, idx: "int", count: "int") -> "int":
- s, e = max(0, idx - count), min(self._size, idx + count)
- for i in range(s, e):
- self.result[i].update(self.result[idx].id)
- return e
- def filter(self, suppress: "int" = 5, expand: "int" = 5) -> "list[Result]":
- idx = 0
- while idx < self._size:
- if self.result[idx].id == 4: # other
- nid = self._nni(idx, 4)
- if nid == -1: # all others after idx
- break
- if nid - idx <= suppress: # need suppress
- pid = self._ln4(idx)
- if pid == -1 or (nid - idx) < (idx - pid): # no pre / next is closer
- for i in range(idx, nid):
- self.result[i].update(self.result[nid].id)
- else: # pre is closer
- for i in range(idx, nid):
- self.result[i].update(self.result[pid].id)
- idx = nid
- elif self.result[idx].id in [2, 3]:
- idx = self._ex(idx, expand)
- else:
- idx = self._nni(idx, 1)
- if idx == -1:
- break
- return self.result
- def show(data: "list"):
- print("[")
- [print(f"\t{itr},") for itr in data]
- print("]")
- def main():
- fetcher = Fetcher()
- data = fetcher.fetch("133202311130004", "2023-12-1 00:00:00", "2023-12-10 00:00:00")
- # print(data)
- action = Action()
- state = action.analyze(data)
- show(state)
- print(action.calc())
- # print(len(data), len(state), action.calc)
- after = action.filter()
- show(after)
- print(action.calc())
- if __name__ == "__main__":
- main()
|