123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- import numpy as np
- import cv2
- import time
- from threading import Thread
- import ctypes
- class Eclipse:
- def __init__(self, a: "float", b: "float"):
- c = np.sqrt(a * a - b * b)
- self.a2l, self.b2l = 2 * np.abs(a), 2 * np.abs(b)
- self.f1, self.f2 = (-c, 0), (c, 0)
- @staticmethod
- def __dis(p1: "tuple", p2: "tuple") -> "np.double":
- return np.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
- def inner_rect(self, *, al: "float" = None, bl: "float" = None) -> "np.double":
- if al is None and bl is None:
- return np.double(0)
- if al is not None:
- al = np.abs(al)
- if al < 1E-5:
- return self.b2l
- if al >= self.a2l - 1E-5:
- return np.double(0)
- start, end, x, y = 0, self.b2l / 2, al / 2, self.b2l / 4
- while True:
- p = (x, y)
- dis = self.__dis(p, self.f1) + self.__dis(p, self.f2)
- if np.abs(dis - self.a2l) < 1E-5:
- return np.double(2 * y)
- if dis < self.a2l:
- start = y
- else:
- end = y
- y = (start + end) / 2
- else:
- bl = np.abs(bl)
- if bl < 1E-5:
- return self.a2l
- if bl >= self.b2l - 1E-5:
- return np.double(0)
- start, end, x, y = 0, self.a2l / 2, self.b2l / 4, bl / 2
- while True:
- p = (x, y)
- dis = self.__dis(p, self.f1) + self.__dis(p, self.f2)
- if np.abs(dis - self.a2l) < 1E-5:
- return np.double(2 * x)
- if dis < self.a2l:
- start = x
- else:
- end = x
- x = (start + end) / 2
- class Egg:
- Long, Short, Bright = 250, 175, 80
- HalfLong, HalfShort, Count = Long / 2, Short / 2, 60
- GapLong, GapShort = HalfLong / Count, HalfShort / Count
- def __init__(self, x, y, r):
- self.center, self.angle = np.array([x, y]), r
- sin, cos = np.sin(r), np.cos(r)
- self.xs = max(cos * self.HalfLong, sin * self.HalfShort)
- self.ys = max(sin * self.HalfLong, cos * self.HalfShort)
- def move(self, speed: "int" = 5):
- self.center[0] -= speed
- def isVisible(self):
- return self.center[0] + self.HalfLong > 0
- def isOverlap(self, other: "Egg") -> "bool":
- distance = np.linalg.norm(self.center - other.center)
- angle_diff = np.abs(self.angle - other.angle)
- if angle_diff > np.pi:
- angle_diff = 2 * np.pi - angle_diff
- return distance <= self.Long and angle_diff <= np.pi / 2
- def __draw(self, can, long: "int", short: "int", color: "int"):
- cv2.ellipse(
- can,
- center=self.center,
- axes=(long, short),
- angle=self.angle,
- startAngle=0,
- endAngle=360,
- color=color,
- thickness=-1,
- )
- def draw(self, can):
- for i in range(0, self.Count):
- self.__draw(
- can,
- int(self.HalfLong - self.GapLong * i), int(self.HalfShort - self.GapShort * i),
- int(self.HalfShort + self.GapShort * i) + self.Bright
- )
- class Canvas:
- Width, Height = 1800, 550 # 画布宽、高
- DetectorHeight, Ratio = 10, Height / 11
- barX, barW, dW, dH = 50, 14, 14, 10
- SX, Speed = barX + dW + 10, 5
- Start, Gap = 55, 110
- def __init__(self):
- self.canvas = np.zeros((self.Height, self.Width), np.uint8)
- self.copy = self.canvas.copy()
- self.curLeft = np.random.randint(400, 550)
- self.eggs: "list[Egg]" = []
- self.sample = np.zeros(5, np.uint8)
- self.total, self.cur, self.real = 0, 0, 0
- # self.adjust_detector()
- def adjust_detector(self):
- hold = Counter.Threshold
- Height = (self.DetectorHeight - hold) * self.Ratio
- height = 2 * Height - Egg.Short
- eclipse = Eclipse(a=Egg.Long / 2, b=Egg.Short / 2)
- long2x = eclipse.inner_rect(bl=height)
- short2x = np.sqrt(Egg.Short ** 2 - height ** 2)
- self.Gap = int(long2x / 4 + short2x / 2)
- self.Start = (self.Height - 4 * self.Gap) // 2
- print(f"Threshold: {hold}, short: {short2x}, long: {long2x}, format: [{self.Start}::{self.Gap}]")
- def have_space(self):
- return self.curLeft < self.Width - Egg.Long
- def gen_one_col(self):
- seed, mx = np.random.random(), 0
- if seed < 0.1: # 0: 0.1
- mx = np.random.randint(30, 50)
- elif seed < 0.3: # 1: 0.2
- while True:
- half = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(10, 40)
- y, r = np.random.randint(Egg.Long, self.Height - Egg.Long), np.random.randint(0, 180)
- egg = Egg(self.curLeft + half, y, r)
- if half > egg.xs:
- mx = 2 * half
- self.eggs.append(egg)
- break
- elif seed < 0.65: # 2: 0.35
- while True:
- half1 = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(10, 40)
- half2 = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(20, 50)
- y1 = np.random.randint(Egg.HalfLong, (self.Height - Egg.Long) / 2)
- y2 = np.random.randint((self.Height + Egg.Long) / 2, self.Height - Egg.HalfLong)
- r1, r2 = np.random.randint(0, 180), np.random.randint(0, 180)
- e1, e2 = Egg(self.curLeft + half1, y1, r1), Egg(self.curLeft + half2, y2, r2)
- if half1 > e1.xs and half2 > e2.xs and not e1.isOverlap(e2):
- self.eggs += [e1, e2]
- mx = 2 * max(half1, half2)
- break
- else: # 3: 0.35
- while True:
- half1 = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(10, 200)
- half2 = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(10, 200)
- half3 = np.random.randint(Egg.HalfShort, Egg.HalfLong) + np.random.randint(10, 200)
- y1 = np.random.randint(Egg.HalfShort, self.Height / 3 - Egg.HalfShort)
- y2 = np.random.randint(self.Height / 3 + Egg.HalfShort, self.Height * 2 / 3 - Egg.HalfShort)
- y3 = np.random.randint(self.Height * 2 / 3 + Egg.HalfShort, self.Height - Egg.HalfShort)
- r1, r2, r3 = np.random.randint(0, 30), np.random.randint(0, 30), np.random.randint(0, 30)
- e1 = Egg(self.curLeft + half1, y1, r1)
- e2 = Egg(self.curLeft + half2, y2, r2)
- e3 = Egg(self.curLeft + half3, y3, r3)
- if half1 > e1.xs and half2 > e2.xs and half3 > e3.xs and not e1.isOverlap(e2) and not e2.isOverlap(e3):
- self.eggs += [e1, e2, e3]
- mx = 2 * max(half1, half2, half3)
- break
- self.curLeft += mx
- def draw(self):
- self.copy = self.canvas.copy()
- [egg.draw(self.copy) for egg in self.eggs] # eggs
- # detector
- cv2.line(self.copy, (self.barX, 0), (self.barX, self.Height), 100, self.barW)
- for y in range(self.Start, self.Height, self.Gap):
- cv2.line(self.copy, (self.barX, y), (self.barX + self.dW, y), 255, self.dH)
- # info
- cv2.putText(
- img=self.copy,
- text=f"Total: {self.total:<4d}, Current: {self.cur}",
- org=(self.Width - 380, 30),
- fontFace=cv2.FONT_HERSHEY_PLAIN,
- fontScale=1.8,
- color=255,
- thickness=2
- )
- # show
- cv2.imshow("Egg Monitor", self.copy)
- cv2.waitKey(1)
- def inner(self):
- while self.have_space():
- self.gen_one_col() # gen_eggs
- while True:
- self.move_eggs()
- if self.have_space():
- self.gen_one_col() # gen_eggs
- self.draw()
- self.sample = self.copy[self.Start::self.Gap, self.SX]
- def run(self):
- def trans(num: "np.double"):
- if num > 0:
- num -= Egg.Bright
- return self.DetectorHeight - num / self.Ratio
- Thread(target=self.inner, name="inner").start()
- while True:
- time.sleep(0.1)
- yield list(map(trans, self.sample.astype(np.double)))
- def move_eggs(self):
- self.curLeft -= self.Speed
- [egg.move(self.Speed) for egg in self.eggs]
- tmp = []
- for egg in self.eggs:
- if egg.isVisible():
- tmp.append(egg)
- else:
- self.real += 1
- self.eggs = tmp
- class Status:
- Names = ["准备", "主态", "辅态", "真空"]
- def __init__(self, sta: "np.unint8" = 0):
- self.sta, self.num, self.paired = sta, np.uint8(0), False
- def has_data(self) -> "bool":
- return self.sta == 1 or self.sta == 2
- def update(self, *, sta=None, num=None, paired: "bool" = None):
- if sta is not None:
- self.sta = sta
- if num is not None:
- self.num = num
- if paired is not None:
- self.paired = paired
- @property
- def name(self) -> "str":
- return self.Names[self.sta]
- class Counter:
- """
- statements:
- Short, Long: 鸡蛋长短轴长
- Short', Long': 检测阈值所在的等比缩放鸡蛋长短轴长
- hold: 检测阈值
- distance: 相邻检测器的距离
- margin: 边缘检测器与传送带边缘的距离
- require:
- 1. distance < Short', 2*distance > Long'
- 2. !!!: 3*distance = 2*Short'+(Short-Short')
- 3. 检测器距离要小于检测鸡蛋的短轴长,防止漏数;
- 同一个鸡蛋仅能被1、2个检测器检测到;
- 两个鸡蛋并排时,必须被3、4个检测器检测到,防止两个鸡蛋被认作一个鸡蛋;
- default:
- hold: 8.2 cm
- distance: 105 px
- margin: 65 px
- Short': 140 px
- Long': 200px
- history:
- 8.0, 25, 125: 99.96%
- 7.25, 29, 123: 99.7%
- 7.09, 51, 112:
- 7.0, 67, 104: 99.91%
- """
- Threshold = 8.0 # => Threshold: 7.25, short: 143.61406616345073, long: 205.1629364490509, format: [29::123]
- def __init__(self):
- self.total: "np.uint16" = np.uint16(0)
- self.current: "np.uint8" = np.uint8(0)
- self.status = [Status(), Status(), Status(), Status(), Status()]
- def detected(self, dis: "np.double") -> bool:
- return dis < self.Threshold
- @staticmethod
- def neighbors(idx: "int", mm: "int" = 4) -> "list[int]":
- if idx == 0:
- return [1]
- if idx == mm:
- return [mm - 1]
- return [idx - 1, idx + 1]
- def __call__(self, sample: "list", nums: "np.uint8" = 5) -> "tuple[np.uint16, np.uint8]": # magic
- for i in range(nums):
- nears = self.neighbors(i, nums - 1)
- if self.detected(sample[i]):
- if self.status[i].sta == 0: # di上次无数据,本次检测到鸡蛋
- joined = False
- for ni in nears:
- if self.status[ni].sta == 1 and not self.status[ni].paired: # 有邻居是未配对的主数据,加入该邻居
- self.status[i].update(sta=2, num=self.status[ni].num, paired=True)
- self.status[ni].paired = True
- joined = True
- break
- if not joined:
- self.total += 1
- self.current += 1
- self.status[i].update(sta=1, num=self.total, paired=False)
- elif self.status[i].has_data(): # di.sta=1/2,di已经是主数据或辅数据
- pass
- else: # di.sta=3,di上次为真空期,本次di为新主
- self.total += 1
- self.current += 1
- self.status[i].update(sta=1, num=self.total, paired=False)
- else: # di本次无数据
- if self.status[i].sta == 0: # di之前无数据
- pass
- elif self.status[i].paired: # di之前可能存在数据,且已经配对
- have = False
- for ni in nears:
- if self.status[ni].num == self.status[i].num: # 该邻居与di为同一个鸡蛋
- have = True
- if self.status[ni].has_data(): # dni.sta=1/2,邻居未进入真空状态
- self.status[i].sta = 3
- elif self.status[ni].sta == 3: # 邻居已经是真空状态
- self.current -= 1
- self.status[i].update(sta=0, paired=False)
- self.status[ni].update(sta=0, paired=False)
- if not have: # 配对的邻居已经进入下一个鸡蛋
- self.current -= 1
- self.status[i].update(sta=0, paired=False)
- else:
- self.current -= 1
- self.status[i].update(sta=0, paired=False)
- return self.total, self.current
- def main():
- canvas = Canvas()
- # C, accuracy: 99.96%
- dll = ctypes.CDLL("./EggCounter.dll")
- dll.CreateWorld.argtypes = [ctypes.c_uint8, ctypes.c_double]
- dll.CreateWorld.restype = ctypes.c_void_p
- dll.CountByData.argtypes = [
- ctypes.c_void_p, ctypes.POINTER(ctypes.c_double),
- ctypes.POINTER(ctypes.c_uint), ctypes.POINTER(ctypes.c_uint8)
- ]
- dll.CountByData.restype = None
- counter = dll.CreateWorld(5, 8.0)
- total = ctypes.c_uint(0)
- cur = ctypes.c_uint8(0)
- for sample in canvas.run():
- sample_arr = (ctypes.c_double * 5)(*sample)
- dll.CountByData(counter, sample_arr, total, cur)
- canvas.total, canvas.cur = total.value, cur.value
- print(
- f"[{sample[0]:<8.2f}, {sample[1]:<8.2f}, {sample[2]:<8.2f}, {sample[3]:<8.2f}, {sample[4]:<8.2f}]"
- f" ==> Real: {canvas.real:<3d}, Total: {canvas.total:<3d}, Current: {canvas.cur:<2d}"
- )
- # python
- # counter = Counter()
- # for sample in canvas.run():
- # # count = counter.call(*sample)
- # canvas.total, canvas.cur = counter(sample)
- # print(
- # f"[{sample[0]:<8.2f}, {sample[1]:<8.2f}, {sample[2]:<8.2f}, {sample[3]:<8.2f}, {sample[4]:<8.2f}]"
- # f" ==> Total: {canvas.total:<3d}, Current: {canvas.cur:<2d}"
- # )
- if __name__ == "__main__":
- main()
|