import time import requests from NetSDK.SDK_Callback import fDisConnect, fHaveReConnect from NetSDK.NetSDK import NetClient from NetSDK.SDK_Enum import * from NetSDK.SDK_Struct import * from ctypes import * import socket class GateInfo: HOST = "http://192.168.1.6:3080/upload" # "https://huatong.ifarmcloud.com/secureApi/bill-access-door/getDoorData" MAP = { 0: "未知方式", 1: "密码开锁", 2: "刷卡开锁", 3: "先刷卡后密码开锁", 4: "先密码后刷卡开锁", 5: "远程开锁,如通过室内机或者平台对门口机开锁", 6: "开锁按钮进行开锁", 7: "开锁", 8: "密码+刷卡+组合开锁", 10: "密码+组合开锁", 11: "刷卡+组合开锁", 12: "多人开锁", 13: "钥匙开门", 14: "胁迫密码开门", 15: "二维码开门", 16: "目标识别开门", 18: "人证对比", 19: "证件+人证比对", 20: "蓝牙开门", 21: "个性化密码开门", 22: "UserID+密码", 23: "人脸+密码开锁", 24: "+密码开锁", 25: "+人脸开锁", 26: "刷卡+人脸开锁", 27: "人脸或密码开锁", 28: "或密码开锁", 29: "或人脸开锁", 30: "刷卡或人脸开锁", 31: "刷卡或开锁", 32: "+人脸+密码开锁", 33: "刷卡+人脸+密码开锁", 34: "刷卡++密码开锁", 35: "卡++人脸组合开锁", 36: "或人脸或密码", 37: "卡或人脸或密码开锁", 38: "卡或人脸开锁", 39: "卡++人脸+密码组合开锁", 40: "卡或人脸或密码开锁", 41: "(证件+人证比对)或刷卡或人脸", 42: "人证比对或刷卡(二维码)或人脸", 43: "DTMF开锁(包括SIPINFO,RFC2833,INBAND)", 44: "远程二维码开门", 45: "远程人脸开门", 46: "人证比对开门()", 47: "临时密码开门", 48: "健康码开门", 49: "目标识别开锁", 50: "目标+密码组合开锁", 51: "人脸+目标组合开锁", 52: "卡+目标组合开锁", 53: "目标或密码开锁", 54: "人脸或目标开锁", 55: "卡或目标开锁", 56: "人脸+目标+密码组合开锁", 57: "卡+人脸+目标组合开锁", 58: "卡+目标+密码组合开锁", 59: "人脸或目标或密码开锁", 60: "卡或人脸或目标开锁", 61: "卡或目标或密码开锁", 62: "卡+人脸+目标+密码组合开锁", 63: "卡或人脸或目标或密码开锁" } def __init__(self, mac, seq): self.mac, self.seq = mac, seq self.time, self.result = "", 0 self.userId, self.method = "", "" def parse(self, src, img_buf, buf_size): self.time = ( f"{src.UTC.dwYear}-{src.UTC.dwMonth:02d}-{src.UTC.dwDay:02d} " f"{src.UTC.dwHour + 8:02d}:{src.UTC.dwMinute:02d}:{src.UTC.dwSecond:02d}" ) self.result = src.bStatus self.method = f"{self.MAP[0]}, code={src.emOpenMethod}" if src.emOpenMethod in self.MAP.keys(): self.method = self.MAP[src.emOpenMethod] self.userId = src.szUserID.decode() image = b"" if src.stuObject.bPicEnble: image = cast(img_buf, POINTER(c_ubyte * src.stuObject.stPicInfo.dwOffSet)).contents with open("./image.jpg", "wb") as fp: fp.write(image) elif buf_size > 0: image = cast(img_buf, POINTER(c_ubyte * buf_size)).contents with open("./image.jpg", "wb") as fp: fp.write(image) print(self.__dict__) resp = requests.request("POST", self.HOST, data=self.__dict__, files={"image": ("image.jpg", image)}) if resp.status_code == 200: print("data push success:", resp.json()) else: print("data push error:", resp.text) class IG: DEVICES, ATTACH2MAC = {}, {} INFO, LOGIN_ID, ATTACH_ID, CHANNELS = "Info", "LoginId", "AttachId", "Channels" USER, PASS = "admin", "hmkj6688" def __init__(self): sdk = NetClient() sdk.InitEx(fDisConnect(self.__disconnect)) sdk.SetAutoReconnect(fHaveReConnect(self.__reconnect)) self.__searchHandler = [] self.__sdk = sdk @staticmethod def __disconnect(lLoginID, pchDVRIP, nDVRPort, dwUser): print( f"Disconnect: lLoginID: {lLoginID}, pchDVRIP: {pchDVRIP}, " f"nDVRPort: {nDVRPort}, dwUser{dwUser}" ) @staticmethod def __reconnect(lLoginID, pchDVRIP, nDVRPort, dwUser): print( f"Reconnect: lLoginID: {lLoginID}, pchDVRIP: {pchDVRIP}, " f"nDVRPort: {nDVRPort}, dwUser{dwUser}" ) @staticmethod def __getIPAddrs(): IPlist = [] ip_list = socket.gethostbyname_ex(socket.gethostname()) for ips in ip_list: if type(ips) == list and len(ips) != 0: IPlist = ips[0:] del ip_list return IPlist @staticmethod @CB_FUNCTYPE(None, C_LLONG, POINTER(DEVICE_NET_INFO_EX2), c_void_p) def __search_device_callback(lSearchHandle, pDevNetInfo, pUserData): try: buf = cast(pDevNetInfo, POINTER(DEVICE_NET_INFO_EX2)).contents dev = ( buf.stuDevInfo.byInitStatus, buf.stuDevInfo.iIPVersion, buf.stuDevInfo.szIP, buf.stuDevInfo.nPort, buf.stuDevInfo.szSubmask, buf.stuDevInfo.szGateway, buf.stuDevInfo.szMac, buf.stuDevInfo.szDeviceType, buf.stuDevInfo.szDetailType, buf.stuDevInfo.nHttpPort, buf.stuDevInfo.byPwdResetWay, buf.szLocalIP, buf.stuDevInfo.szSerialNo ) # 0 1 2 3 4 5 6 7 8 9 10 11 12 # (150, 4, b'192.168.1.108', 37777, b'255.255.255.0', b'192.168.1.1', b'f4:b1:c2:d5:46:48', b'BSC', b'DH-ASI31A-MW', 80, 5, b'192.168.1.6', b'9G0487EPAJ87DBF') # (150, b'192.168.1.108', 37777, b'f4:b1:c2:d5:46:48', 5, b'192.168.1.6', b'9G0487EPAJ87DBF') if dev[1] == 4 and dev[6] not in IG.DEVICES.keys(): IG.DEVICES[dev[6]] = { IG.INFO: (dev[0], dev[2], dev[3], dev[6], dev[10], dev[11], dev[12]), IG.LOGIN_ID: -1, IG.ATTACH_ID: -1, IG.CHANNELS: -1 } print("device found:", dev) except Exception as e: print(e) def __start_search_device(self): IPList = self.__getIPAddrs() for i in range(IPList.__len__()): startsearch_in = NET_IN_STARTSERACH_DEVICE() startsearch_in.dwSize = sizeof(NET_IN_STARTSERACH_DEVICE) startsearch_in.emSendType = EM_SEND_SEARCH_TYPE.MULTICAST_AND_BROADCAST startsearch_in.cbSearchDevices = self.__search_device_callback startsearch_in.szLocalIp = IPList[i].encode() startsearch_out = NET_OUT_STARTSERACH_DEVICE() startsearch_out.dwSize = sizeof(NET_OUT_STARTSERACH_DEVICE) lSearchHandle = self.__sdk.StartSearchDevicesEx(startsearch_in, startsearch_out) if lSearchHandle != 0: self.__searchHandler.append(lSearchHandle) def __stop_search_device(self): for handler in self.__searchHandler: self.__sdk.StopSearchDevices(handler) self.__searchHandler.clear() def __init_device(self, device_mac: bytes): info = self.DEVICES[device_mac][IG.INFO] init_Account_In = NET_IN_INIT_DEVICE_ACCOUNT() init_Account_In.dwSize = sizeof(init_Account_In) init_Account_In.szMac = device_mac init_Account_In.szUserName = self.USER.encode() init_Account_In.szPwd = self.PASS.encode() init_Account_In.byPwdResetWay = info[4] init_Account_Out = NET_OUT_INIT_DEVICE_ACCOUNT() init_Account_Out.dwSize = sizeof(init_Account_Out) result = self.__sdk.InitDevAccount(init_Account_In, init_Account_Out, 5000, info[5]) if result: print(f"init device success: {device_mac}") else: print(f"init device error: {self.__sdk.GetLastError()}") def __login(self, device_mac: bytes): info = self.DEVICES[device_mac][IG.INFO] stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY() stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY) stuInParam.szIP = info[1] stuInParam.nPort = info[2] stuInParam.szUserName = self.USER.encode() stuInParam.szPassword = self.PASS.encode() stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP stuInParam.pCapParam = None stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY() stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY) loginID, result, error = self.__sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam) if loginID != 0: self.DEVICES[device_mac][IG.LOGIN_ID] = loginID self.DEVICES[device_mac][IG.CHANNELS] = result.nChanNum print(f"login <{device_mac}> success: loginID={loginID}, device_channels={result.nChanNum}") else: print(f"login <{device_mac}> failed: {error}") def __logout(self, device_mac: bytes): login = self.DEVICES[device_mac][IG.LOGIN_ID] if login == -1: return res = self.__sdk.Logout(login) if res: print(f"logout <{device_mac}.{login}> success") self.DEVICES[device_mac][IG.LOGIN_ID] = -1 self.DEVICES[device_mac][IG.CHANNELS] = -1 else: print(f"logout <{device_mac}.{login}> failed") @staticmethod @CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p) def __subscribe_callback(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved): mac = IG.ATTACH2MAC[lAnalyzerHandle] or b"unknown" if mac == b"unknown": seq = b"unknown" else: seq = IG.DEVICES[mac][IG.INFO][-1] if dwAlarmType == EM_EVENT_IVS_TYPE.ACCESS_CTL: gate_info = GateInfo(mac.decode(), seq.decode()) data = cast(pAlarmInfo, POINTER(DEV_EVENT_ACCESS_CTL_INFO)).contents gate_info.parse(data, pBuffer, dwBufSize) def __subscribe(self, device_mac: bytes): loginId = self.DEVICES[device_mac][IG.LOGIN_ID] channel = self.DEVICES[device_mac][IG.CHANNELS] if channel == -1: print(f"device <{device_mac}> owns no channel") return bNeedPicFile, dwUser = 1, 0 attachId = self.__sdk.RealLoadPictureEx( loginId, 0, EM_EVENT_IVS_TYPE.ACCESS_CTL, bNeedPicFile, self.__subscribe_callback, dwUser, None ) if not attachId: print(f"subscribe <{loginId}> error: {self.__sdk.GetLastError()}") else: self.DEVICES[device_mac][IG.ATTACH_ID] = attachId IG.ATTACH2MAC[attachId] = device_mac print(f"subscribe <{loginId}> success") def __unsubscribe(self, device_mac: bytes): attachId = self.DEVICES[device_mac][IG.ATTACH_ID] if attachId == -1: return self.__sdk.StopLoadPic(attachId) self.DEVICES[device_mac][IG.ATTACH_ID] = -1 del IG.ATTACH2MAC[attachId] def start_search(self): print("search devices for 5s") self.__stop_search_device() self.__start_search_device() time.sleep(5) print("\n === stop search, try init/login/subscribe ===") self.__stop_search_device() for mac, device in self.DEVICES.items(): info = device[IG.INFO] inited = info[0] & 3 != 1 print(f"device: {mac}, inited: {inited}") if not inited: self.__init_device(mac) self.__login(mac) self.__subscribe(mac) while True: time.sleep(60) def close(self): self.__stop_search_device() for mac in self.DEVICES.keys(): self.__unsubscribe(mac) self.__logout(mac) self.__sdk.Cleanup() print("program exit") # transformer if __name__ == '__main__': ig = IG() try: ig.start_search() finally: ig.close()