123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import time
- import requests
- from NetSDK.SDK_Callback import fDisConnect, fHaveReConnect
- from NetSDK.NetSDK import NetClient
- from NetSDK.SDK_Enum import *
- from NetSDK.SDK_Struct import *
- from ctypes import *
- import socket
- class GateInfo:
- HOST = "http://192.168.1.6:3080/upload" # "https://huatong.ifarmcloud.com/secureApi/bill-access-door/getDoorData"
- MAP = {
- 0: "未知方式", 1: "密码开锁", 2: "刷卡开锁", 3: "先刷卡后密码开锁", 4: "先密码后刷卡开锁", 5: "远程开锁,如通过室内机或者平台对门口机开锁",
- 6: "开锁按钮进行开锁", 7: "开锁", 8: "密码+刷卡+组合开锁", 10: "密码+组合开锁", 11: "刷卡+组合开锁", 12: "多人开锁",
- 13: "钥匙开门", 14: "胁迫密码开门", 15: "二维码开门", 16: "目标识别开门", 18: "人证对比", 19: "证件+人证比对",
- 20: "蓝牙开门", 21: "个性化密码开门", 22: "UserID+密码", 23: "人脸+密码开锁", 24: "+密码开锁", 25: "+人脸开锁",
- 26: "刷卡+人脸开锁", 27: "人脸或密码开锁", 28: "或密码开锁", 29: "或人脸开锁", 30: "刷卡或人脸开锁", 31: "刷卡或开锁",
- 32: "+人脸+密码开锁", 33: "刷卡+人脸+密码开锁", 34: "刷卡++密码开锁", 35: "卡++人脸组合开锁", 36: "或人脸或密码",
- 37: "卡或人脸或密码开锁", 38: "卡或人脸开锁", 39: "卡++人脸+密码组合开锁", 40: "卡或人脸或密码开锁", 41: "(证件+人证比对)或刷卡或人脸",
- 42: "人证比对或刷卡(二维码)或人脸", 43: "DTMF开锁(包括SIPINFO,RFC2833,INBAND)", 44: "远程二维码开门", 45: "远程人脸开门",
- 46: "人证比对开门()", 47: "临时密码开门", 48: "健康码开门", 49: "目标识别开锁", 50: "目标+密码组合开锁", 51: "人脸+目标组合开锁",
- 52: "卡+目标组合开锁", 53: "目标或密码开锁", 54: "人脸或目标开锁", 55: "卡或目标开锁", 56: "人脸+目标+密码组合开锁",
- 57: "卡+人脸+目标组合开锁", 58: "卡+目标+密码组合开锁", 59: "人脸或目标或密码开锁", 60: "卡或人脸或目标开锁", 61: "卡或目标或密码开锁",
- 62: "卡+人脸+目标+密码组合开锁", 63: "卡或人脸或目标或密码开锁"
- }
- def __init__(self, mac, seq):
- self.mac, self.seq = mac, seq
- self.time, self.result = "", 0
- self.userId, self.method = "", ""
- def parse(self, src, img_buf, buf_size):
- self.time = (
- f"{src.UTC.dwYear}-{src.UTC.dwMonth:02d}-{src.UTC.dwDay:02d} "
- f"{src.UTC.dwHour + 8:02d}:{src.UTC.dwMinute:02d}:{src.UTC.dwSecond:02d}"
- )
- self.result = src.bStatus
- self.method = f"{self.MAP[0]}, code={src.emOpenMethod}"
- if src.emOpenMethod in self.MAP.keys():
- self.method = self.MAP[src.emOpenMethod]
- self.userId = src.szUserID.decode()
- image = b""
- if src.stuObject.bPicEnble:
- image = cast(img_buf, POINTER(c_ubyte * src.stuObject.stPicInfo.dwOffSet)).contents
- with open("./image.jpg", "wb") as fp:
- fp.write(image)
- elif buf_size > 0:
- image = cast(img_buf, POINTER(c_ubyte * buf_size)).contents
- with open("./image.jpg", "wb") as fp:
- fp.write(image)
- print(self.__dict__)
- resp = requests.request("POST", self.HOST, data=self.__dict__, files={"image": ("image.jpg", image)})
- if resp.status_code == 200:
- print("data push success:", resp.json())
- else:
- print("data push error:", resp.text)
- class IG:
- DEVICES, ATTACH2MAC = {}, {}
- INFO, LOGIN_ID, ATTACH_ID, CHANNELS = "Info", "LoginId", "AttachId", "Channels"
- USER, PASS = "admin", "hmkj6688"
- def __init__(self):
- sdk = NetClient()
- sdk.InitEx(fDisConnect(self.__disconnect))
- sdk.SetAutoReconnect(fHaveReConnect(self.__reconnect))
- self.__searchHandler = []
- self.__sdk = sdk
- @staticmethod
- def __disconnect(lLoginID, pchDVRIP, nDVRPort, dwUser):
- print(
- f"Disconnect: lLoginID: {lLoginID}, pchDVRIP: {pchDVRIP}, "
- f"nDVRPort: {nDVRPort}, dwUser{dwUser}"
- )
- @staticmethod
- def __reconnect(lLoginID, pchDVRIP, nDVRPort, dwUser):
- print(
- f"Reconnect: lLoginID: {lLoginID}, pchDVRIP: {pchDVRIP}, "
- f"nDVRPort: {nDVRPort}, dwUser{dwUser}"
- )
- @staticmethod
- def __getIPAddrs():
- IPlist = []
- ip_list = socket.gethostbyname_ex(socket.gethostname())
- for ips in ip_list:
- if type(ips) == list and len(ips) != 0:
- IPlist = ips[0:]
- del ip_list
- return IPlist
- @staticmethod
- @CB_FUNCTYPE(None, C_LLONG, POINTER(DEVICE_NET_INFO_EX2), c_void_p)
- def __search_device_callback(lSearchHandle, pDevNetInfo, pUserData):
- try:
- buf = cast(pDevNetInfo, POINTER(DEVICE_NET_INFO_EX2)).contents
- dev = (
- buf.stuDevInfo.byInitStatus, buf.stuDevInfo.iIPVersion, buf.stuDevInfo.szIP, buf.stuDevInfo.nPort,
- buf.stuDevInfo.szSubmask, buf.stuDevInfo.szGateway, buf.stuDevInfo.szMac, buf.stuDevInfo.szDeviceType,
- buf.stuDevInfo.szDetailType, buf.stuDevInfo.nHttpPort, buf.stuDevInfo.byPwdResetWay, buf.szLocalIP,
- buf.stuDevInfo.szSerialNo
- )
- # 0 1 2 3 4 5 6 7 8 9 10 11 12
- # (150, 4, b'192.168.1.108', 37777, b'255.255.255.0', b'192.168.1.1', b'f4:b1:c2:d5:46:48', b'BSC', b'DH-ASI31A-MW', 80, 5, b'192.168.1.6', b'9G0487EPAJ87DBF')
- # (150, b'192.168.1.108', 37777, b'f4:b1:c2:d5:46:48', 5, b'192.168.1.6', b'9G0487EPAJ87DBF')
- if dev[1] == 4 and dev[6] not in IG.DEVICES.keys():
- IG.DEVICES[dev[6]] = {
- IG.INFO: (dev[0], dev[2], dev[3], dev[6], dev[10], dev[11], dev[12]),
- IG.LOGIN_ID: -1, IG.ATTACH_ID: -1, IG.CHANNELS: -1
- }
- print("device found:", dev)
- except Exception as e:
- print(e)
- def __start_search_device(self):
- IPList = self.__getIPAddrs()
- for i in range(IPList.__len__()):
- startsearch_in = NET_IN_STARTSERACH_DEVICE()
- startsearch_in.dwSize = sizeof(NET_IN_STARTSERACH_DEVICE)
- startsearch_in.emSendType = EM_SEND_SEARCH_TYPE.MULTICAST_AND_BROADCAST
- startsearch_in.cbSearchDevices = self.__search_device_callback
- startsearch_in.szLocalIp = IPList[i].encode()
- startsearch_out = NET_OUT_STARTSERACH_DEVICE()
- startsearch_out.dwSize = sizeof(NET_OUT_STARTSERACH_DEVICE)
- lSearchHandle = self.__sdk.StartSearchDevicesEx(startsearch_in, startsearch_out)
- if lSearchHandle != 0:
- self.__searchHandler.append(lSearchHandle)
- def __stop_search_device(self):
- for handler in self.__searchHandler:
- self.__sdk.StopSearchDevices(handler)
- self.__searchHandler.clear()
- def __init_device(self, device_mac: bytes):
- info = self.DEVICES[device_mac][IG.INFO]
- init_Account_In = NET_IN_INIT_DEVICE_ACCOUNT()
- init_Account_In.dwSize = sizeof(init_Account_In)
- init_Account_In.szMac = device_mac
- init_Account_In.szUserName = self.USER.encode()
- init_Account_In.szPwd = self.PASS.encode()
- init_Account_In.byPwdResetWay = info[4]
- init_Account_Out = NET_OUT_INIT_DEVICE_ACCOUNT()
- init_Account_Out.dwSize = sizeof(init_Account_Out)
- result = self.__sdk.InitDevAccount(init_Account_In, init_Account_Out, 5000, info[5])
- if result:
- print(f"init device success: {device_mac}")
- else:
- print(f"init device error: {self.__sdk.GetLastError()}")
- def __login(self, device_mac: bytes):
- info = self.DEVICES[device_mac][IG.INFO]
- stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY()
- stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY)
- stuInParam.szIP = info[1]
- stuInParam.nPort = info[2]
- stuInParam.szUserName = self.USER.encode()
- stuInParam.szPassword = self.PASS.encode()
- stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP
- stuInParam.pCapParam = None
- stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY()
- stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY)
- loginID, result, error = self.__sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam)
- if loginID != 0:
- self.DEVICES[device_mac][IG.LOGIN_ID] = loginID
- self.DEVICES[device_mac][IG.CHANNELS] = result.nChanNum
- print(f"login <{device_mac}> success: loginID={loginID}, device_channels={result.nChanNum}")
- else:
- print(f"login <{device_mac}> failed: {error}")
- def __logout(self, device_mac: bytes):
- login = self.DEVICES[device_mac][IG.LOGIN_ID]
- if login == -1:
- return
- res = self.__sdk.Logout(login)
- if res:
- print(f"logout <{device_mac}.{login}> success")
- self.DEVICES[device_mac][IG.LOGIN_ID] = -1
- self.DEVICES[device_mac][IG.CHANNELS] = -1
- else:
- print(f"logout <{device_mac}.{login}> failed")
- @staticmethod
- @CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p)
- def __subscribe_callback(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved):
- mac = IG.ATTACH2MAC[lAnalyzerHandle] or b"unknown"
- if mac == b"unknown":
- seq = b"unknown"
- else:
- seq = IG.DEVICES[mac][IG.INFO][-1]
- if dwAlarmType == EM_EVENT_IVS_TYPE.ACCESS_CTL:
- gate_info = GateInfo(mac.decode(), seq.decode())
- data = cast(pAlarmInfo, POINTER(DEV_EVENT_ACCESS_CTL_INFO)).contents
- gate_info.parse(data, pBuffer, dwBufSize)
- def __subscribe(self, device_mac: bytes):
- loginId = self.DEVICES[device_mac][IG.LOGIN_ID]
- channel = self.DEVICES[device_mac][IG.CHANNELS]
- if channel == -1:
- print(f"device <{device_mac}> owns no channel")
- return
- bNeedPicFile, dwUser = 1, 0
- attachId = self.__sdk.RealLoadPictureEx(
- loginId, 0, EM_EVENT_IVS_TYPE.ACCESS_CTL,
- bNeedPicFile, self.__subscribe_callback, dwUser, None
- )
- if not attachId:
- print(f"subscribe <{loginId}> error: {self.__sdk.GetLastError()}")
- else:
- self.DEVICES[device_mac][IG.ATTACH_ID] = attachId
- IG.ATTACH2MAC[attachId] = device_mac
- print(f"subscribe <{loginId}> success")
- def __unsubscribe(self, device_mac: bytes):
- attachId = self.DEVICES[device_mac][IG.ATTACH_ID]
- if attachId == -1:
- return
- self.__sdk.StopLoadPic(attachId)
- self.DEVICES[device_mac][IG.ATTACH_ID] = -1
- del IG.ATTACH2MAC[attachId]
- def start_search(self):
- print("search devices for 5s")
- self.__stop_search_device()
- self.__start_search_device()
- time.sleep(5)
- print("\n === stop search, try init/login/subscribe ===")
- self.__stop_search_device()
- for mac, device in self.DEVICES.items():
- info = device[IG.INFO]
- inited = info[0] & 3 != 1
- print(f"device: {mac}, inited: {inited}")
- if not inited:
- self.__init_device(mac)
- self.__login(mac)
- self.__subscribe(mac)
- while True:
- time.sleep(60)
- def close(self):
- self.__stop_search_device()
- for mac in self.DEVICES.keys():
- self.__unsubscribe(mac)
- self.__logout(mac)
- self.__sdk.Cleanup()
- print("program exit")
- # transformer
- if __name__ == '__main__':
- ig = IG()
- try:
- ig.start_search()
- finally:
- ig.close()
|