123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import os
- import numpy as np
- import pandas as pd
- from data.loader import BaseLoader
- __all__ = ["DataLoader"]
- class DataLoader(BaseLoader):
- def __init__(
- self,
- path: "str" = "csv/both",
- count: "int" = 10,
- hold: "int" = 200,
- win: "int" = 5
- ):
- self.__c, self.__h, self.__w = count, hold, win
- self._rawH: "pd.DataFrame" = pd.DataFrame()
- self._rawN: "pd.DataFrame" = pd.DataFrame()
- super().__init__(path)
- self._trans()
- def _load(self, path: "str") -> "None":
- for name in os.listdir(path):
- data = pd.read_csv(f"{path}/{name}", header=None)
- if name.startswith("1"):
- self._rawH = pd.concat([self._rawH, data])
- else:
- self._rawN = pd.concat([self._rawN, data])
- def _isMax(self, data: "np.ndarray", idx: "int") -> "bool":
- if idx < self.__w or idx > data.size - self.__w:
- return False
- for i in range(1, self.__w + 1):
- if data[idx] < data[idx - i] or data[idx] < data[idx + i]:
- return False
- return True
- def _fft(self, data: "np.ndarray") -> "np.ndarray":
- time = np.linspace(0, 0.38 * data.size / 128, data.size)
- fft = np.fft.fft(data)
- frq = np.fft.fftfreq(data.size, time[1] - time[0])
- pFrq = frq[:frq.size // 2]
- pFft = np.abs(2.0 / data.size * fft[:fft.size // 2])
- result, idx = np.zeros((self.__c, 2)), 0
- for i in range(self.__w, pFft.size - self.__w):
- if self._isMax(pFft, i) and pFft[i] > self.__h:
- result[idx][0], result[idx][1] = pFrq[i], pFft[i]
- idx += 1
- if idx >= self.__c:
- break
- return result.flatten()
- def _trans(self) -> "None":
- have = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawH.to_numpy()])
- none = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawN.to_numpy()])
- np.random.shuffle(have)
- count = int(have.shape[0] * 0.85)
- self.have = np.array(have[:count])
- self.test = np.array(have[count:])
- self.none = np.array(none)
- def toh(self, file: "str") -> "None":
- pass
- @staticmethod
- def _n2s(arr: "list") -> "str":
- res = "\t{"
- for itr in arr:
- res += f"{itr:.6f},"
- return res[:-1] + "}"
- def toc(self, file: "str"):
- temp = (
- "#ifndef LOCAL_DATA_H\n"
- "#define LOCAL_DATA_H\n"
- "\n"
- "#define HaveCount {haveCount}\n"
- "#define TestCount {testCount}\n"
- "#define NoneCount {noneCount}\n"
- "#define Features {features}\n"
- "\n"
- "static double Have[HaveCount][Features] = {{\n"
- "{have}\n"
- "}};\n"
- "\n"
- "static double Test[TestCount][Features] = {{\n"
- "{test}\n"
- "}};\n"
- "\n"
- "static double None[NoneCount][Features] = {{\n"
- "{none}\n"
- "}};\n"
- "\n"
- "#endif"
- )
- have = ",\n".join([self._n2s(line) for line in self.have])
- test = ",\n".join([self._n2s(line) for line in self.test])
- none = ",\n".join([self._n2s(line) for line in self.none])
- content = temp.format(
- haveCount=self.have.shape[0],
- testCount=self.test.shape[0],
- noneCount=self.none.shape[0],
- features=self.have.shape[1],
- have=have, test=test, none=none
- )
- with open(file, "w", encoding="utf-8") as fp:
- fp.write(content)
|