action2.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import requests
  2. import time
  3. class Fetcher:
  4. def __init__(self, farmId: "str" = "330110004"):
  5. self._farm: "str" = farmId
  6. self._sess: "requests.Session" = requests.session()
  7. def _login(self) -> "None":
  8. url = "http://119.3.44.183:8016/admin/my/loginMultilevel"
  9. payload = {"accountName": "superadmin", "password": "hm123456", "farmId": self._farm}
  10. resp = self._sess.post(url=url, json=payload)
  11. assert resp.status_code == 200, f"login failed: {resp.status_code}"
  12. self._sess.headers.setdefault("Accesstoken", resp.json()["data"]["token"])
  13. @staticmethod
  14. def _num(data):
  15. return data if data is not None else 0
  16. def _clean(self, data: "list[dict]") -> "list[tuple]":
  17. # envTemp, earTemp1, earTemp2, act, timestamp
  18. return [(
  19. self._num(itr["envTemp1"]), self._num(itr["earTemp1"]),
  20. self._num(itr["earTemp2"]), self._num(itr["act1"]), itr["addTime"]
  21. ) for itr in data]
  22. def fetch(self, tagId: "str", start: "str", end: "str") -> "list[tuple]":
  23. self._login()
  24. url = "http://119.3.44.183:8016/manage2/eartagData/getEnvByTime"
  25. payload = {"earmark": tagId, "startDate": start, "endDate": end, "farmId": self._farm}
  26. resp = self._sess.post(url=url, json=payload)
  27. assert resp.status_code == 200, f"> ERROR: fetch <{tagId}> failed!"
  28. return self._clean(resp.json()["data"])
  29. class Result:
  30. __m = {1: "睡眠状态", 2: "侧躺状态", 3: "活跃状态", 4: "其它状态"}
  31. @staticmethod
  32. def kind(t: "int") -> "int":
  33. if t == 1:
  34. return 1
  35. if t == 3:
  36. return 2
  37. if t in [8, 10, 12]:
  38. return 3
  39. return 4
  40. @staticmethod
  41. def _type(_id: "int") -> "str":
  42. return Result.__m.get(_id, "unknown")
  43. @staticmethod
  44. def _time(ts: "str") -> "int":
  45. return int(time.mktime(time.strptime(ts, "%Y-%m-%d %H:%M:%S")))
  46. def update(self, _id: "int"):
  47. self.id, self.type = _id, self._type(_id)
  48. def __init__(self, _id: "int", _time: "str"):
  49. self.id, self.type = _id, self._type(_id)
  50. self.time, self.secs = _time, self._time(_time)
  51. def __repr__(self):
  52. return f"<id: {self.id}, type: \"{self.type}\", time: \"{self.time}\", stamp: {self.secs}>"
  53. class Action:
  54. def __init__(
  55. self,
  56. body: "float" = 35.5,
  57. actHold: "tuple[int, int]" = (8, 10),
  58. tempHold: "tuple[float, float]" = (5.5, 6.0)
  59. ):
  60. self._bodyTemp: "float" = body
  61. self._actMin: "int" = actHold[0]
  62. self._actMax: "int" = actHold[1]
  63. self._tempMin: "float" = tempHold[0]
  64. self._tempMax: "float" = tempHold[1]
  65. self._size: "int" = 0
  66. self.result: "list[Result]" = []
  67. def _handle(self, sTime: "str", _id: "int"):
  68. _id = Result.kind(_id)
  69. now = Result(_id, sTime)
  70. self.result.append(now)
  71. def calc(self) -> "dict":
  72. if self._size == 0:
  73. return {}
  74. last, res = self.result[0].secs, {}
  75. for i in range(1, self._size):
  76. key = self.result[i].time[:10]
  77. if key not in res:
  78. res[key] = {1: 0, 2: 0, 3: 0, 4: 0}
  79. res[key][self.result[i].id] += self.result[i].secs - last
  80. last = self.result[i].secs
  81. for key in res:
  82. total = sum(res[key].values())
  83. res[key] = [round(num * 100 / total, 2) for num in res[key].values()]
  84. return res
  85. def analyze(self, data: "list[tuple]") -> "list[Result]":
  86. self._size, self.result = len(data), []
  87. for itr in data:
  88. envT, earT1, earT2, act, sTime = itr
  89. if abs(earT1 - earT2) < self._tempMin: # 温度稳定
  90. if abs(earT1 - self._bodyTemp) < self._tempMin: # 在体温附近稳定
  91. if act < self._actMax:
  92. self._handle(sTime, 1)
  93. else:
  94. self._handle(sTime, 2)
  95. elif abs(earT1 - envT) < self._tempMin: # 在环境温度附近稳定
  96. if act < self._actMax:
  97. self._handle(sTime, 3)
  98. else:
  99. self._handle(sTime, 4)
  100. else: # 其余稳定
  101. if act < self._actMax:
  102. self._handle(sTime, 5)
  103. else:
  104. self._handle(sTime, 6)
  105. else:
  106. if abs(earT1 - self._bodyTemp) < self._tempMax: # 在体温附近不稳定
  107. if act < self._actMin:
  108. self._handle(sTime, 7)
  109. else:
  110. self._handle(sTime, 8)
  111. elif abs(earT1 - envT) < self._tempMax: # 在环境温度附近不稳定
  112. if act < self._actMin:
  113. self._handle(sTime, 9)
  114. else:
  115. self._handle(sTime, 10)
  116. else: # 其余不稳定
  117. if act < self._actMin:
  118. self._handle(sTime, 11)
  119. else:
  120. self._handle(sTime, 12)
  121. return self.result
  122. def _nni(self, idx: "int", _id: "int") -> "int": # next not <id>
  123. while idx < self._size:
  124. if self.result[idx].id != _id:
  125. return idx
  126. idx += 1
  127. return -1
  128. def _ln4(self, idx: "int") -> "int": # last not 4
  129. while idx > -1:
  130. if self.result[idx].id != 4:
  131. return idx
  132. idx -= 1
  133. return -1
  134. def _ex(self, idx: "int", count: "int") -> "int":
  135. s, e = max(0, idx - count), min(self._size, idx + count)
  136. for i in range(s, e):
  137. self.result[i].update(self.result[idx].id)
  138. return e
  139. def filter(self, suppress: "int" = 5, expand: "int" = 5) -> "list[Result]":
  140. idx = 0
  141. while idx < self._size:
  142. if self.result[idx].id == 4: # other
  143. nid = self._nni(idx, 4)
  144. if nid == -1: # all others after idx
  145. break
  146. if nid - idx <= suppress: # need suppress
  147. pid = self._ln4(idx)
  148. if pid == -1 or (nid - idx) < (idx - pid): # no pre / next is closer
  149. for i in range(idx, nid):
  150. self.result[i].update(self.result[nid].id)
  151. else: # pre is closer
  152. for i in range(idx, nid):
  153. self.result[i].update(self.result[pid].id)
  154. idx = nid
  155. elif self.result[idx].id in [2, 3]:
  156. idx = self._ex(idx, expand)
  157. else:
  158. idx = self._nni(idx, 1)
  159. if idx == -1:
  160. break
  161. return self.result
  162. def show(data: "list"):
  163. print("[")
  164. [print(f"\t{itr},") for itr in data]
  165. print("]")
  166. def main():
  167. fetcher = Fetcher()
  168. data = fetcher.fetch("133202311130004", "2023-12-1 00:00:00", "2023-12-10 00:00:00")
  169. # print(data)
  170. action = Action()
  171. state = action.analyze(data)
  172. show(state)
  173. print(action.calc())
  174. # print(len(data), len(state), action.calc)
  175. after = action.filter()
  176. show(after)
  177. print(action.calc())
  178. if __name__ == "__main__":
  179. main()