123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- # coding=utf-8
- import sys
- from PyQt5.QtWidgets import QMainWindow, QApplication, QTableWidgetItem, QMessageBox
- from PyQt5.QtCore import Qt
- from PyQt5.QtGui import QPixmap
- from PyQt5.QtCore import QThread, pyqtSignal
- from IntelligentTrafficUI import Ui_MainWindow
- from NetSDK.NetSDK import NetClient
- from NetSDK.SDK_Struct import *
- from NetSDK.SDK_Enum import *
- from NetSDK.SDK_Callback import *
- import time
- from queue import Queue
- global wnd
- callback_num = 0
- class TrafficCallBackAlarmInfo:
- def __init__(self):
- self.time_str = ""
- self.plate_number_str = ""
- self.plate_color_str = ""
- self.object_subType_str = ""
- self.vehicle_color_str = ""
- def get_alarm_info(self, alarm_info):
- self.time_str = '{}-{}-{} {}:{}:{}'.format(alarm_info.UTC.dwYear, alarm_info.UTC.dwMonth, alarm_info.UTC.dwDay,
- alarm_info.UTC.dwHour, alarm_info.UTC.dwMinute,
- alarm_info.UTC.dwSecond)
- self.plate_number_str = str(alarm_info.stTrafficCar.szPlateNumber.decode('gb2312'))
- self.plate_color_str = str(alarm_info.stTrafficCar.szPlateColor, 'utf-8')
- self.object_subType_str = str(alarm_info.stuVehicle.szObjectSubType, 'utf-8')
- self.vehicle_color_str = str(alarm_info.stTrafficCar.szVehicleColor, 'utf-8')
- class BackUpdateUIThread(QThread):
- # 通过类成员对象定义信号
- update_date = pyqtSignal(int, object, int, bool, bool)
- # 处理业务逻辑
- def run(self):
- pass
- @CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p)
- def AnalyzerDataCallBack(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved):
- print('Enter AnalyzerDataCallBack')
- # 当报警类型是交通卡口事件时在界面上进行显示相关信息
- if (lAnalyzerHandle == wnd.attachID) and (dwAlarmType == EM_EVENT_IVS_TYPE.TRAFFICJUNCTION):
- global callback_num
- local_path = os.path.abspath('.')
- is_global = False
- is_small = False
- show_info = TrafficCallBackAlarmInfo()
- callback_num += 1
- alarm_info = cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).contents
- show_info.get_alarm_info(alarm_info)
- if alarm_info.stuObject.bPicEnble:
- is_global = True
- GlobalScene_buf = cast(pBuffer, POINTER(c_ubyte * alarm_info.stuObject.stPicInfo.dwOffSet)).contents
- if not os.path.isdir(os.path.join(local_path, 'Global')):
- os.mkdir(os.path.join(local_path, 'Global'))
- with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
- global_pic.write(bytes(GlobalScene_buf))
- if (alarm_info.stuObject.stPicInfo.dwFileLenth > 0):
- is_small = True
- small_buf = pBuffer[alarm_info.stuObject.stPicInfo.dwOffSet:alarm_info.stuObject.stPicInfo.dwOffSet +
- alarm_info.stuObject.stPicInfo.dwFileLenth]
- if not os.path.isdir(os.path.join(local_path, 'Small')):
- os.mkdir(os.path.join(local_path, 'Small'))
- with open('./Small/Small_Img' + str(callback_num) + '.jpg', 'wb+') as small_pic:
- small_pic.write(bytes(small_buf))
- elif (dwBufSize > 0):
- is_global = True
- GlobalScene_buf = cast(pBuffer, POINTER(c_ubyte * dwBufSize)).contents
- if not os.path.isdir(os.path.join(local_path, 'Global')):
- os.mkdir(os.path.join(local_path, 'Global'))
- with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
- global_pic.write(bytes(GlobalScene_buf))
- wnd.backthread.update_date.emit(dwAlarmType, show_info, callback_num, is_global, is_small)
- return
- class TrafficWnd(QMainWindow, Ui_MainWindow):
- def __init__(self, parent=None):
- super(TrafficWnd, self).__init__(parent)
- self.setupUi(self)
- # 界面初始化
- self._init_ui()
- # NetSDK用到的相关变量和回调
- self.loginID = C_LLONG()
- self.playID = C_LLONG()
- self.freePort = c_int()
- self.attachID = C_LLONG()
- self.m_DisConnectCallBack = fDisConnect(self.DisConnectCallBack)
- self.m_ReConnectCallBack = fHaveReConnect(self.ReConnectCallBack)
- self.m_DecodingCallBack = fDecCBFun(self.DecodingCallBack)
- self.m_RealDataCallBack = fRealDataCallBackEx2(self.RealDataCallBack)
- # 获取NetSDK对象并初始化
- self.sdk = NetClient()
- self.sdk.InitEx(self.m_DisConnectCallBack)
- self.sdk.SetAutoReconnect(self.m_ReConnectCallBack)
- # 创建线程
- self.backthread = BackUpdateUIThread()
- # 连接信号
- self.backthread.update_date.connect(self.update_UItable)
- self.thread = QThread()
- self.backthread.moveToThread(self.thread)
- # 开始线程
- self.thread.started.connect(self.backthread.run)
- self.thread.start()
- # 初始化界面
- def _init_ui(self):
- self.row = 0
- self.column = 0
- self.Login_pushButton.setEnabled(True)
- self.Play_pushButton.setEnabled(False)
- self.Logout_pushButton.setEnabled(False)
- self.StopPlay_pushButton.setEnabled(False)
- self.Attach_pushButton.setEnabled(False)
- self.Detach_pushButton.setEnabled(False)
- self.IP_lineEdit.setText('192.168.128.61')
- self.Port_lineEdit.setText('37777')
- self.User_lineEdit.setText('admin')
- self.Pwd_lineEdit.setText('admin123')
- self.setWindowFlag(Qt.WindowMinimizeButtonHint)
- self.setWindowFlag(Qt.WindowCloseButtonHint)
- self.setFixedSize(self.width(), self.height())
- self.Login_pushButton.clicked.connect(self.login_btn_onclick)
- self.Logout_pushButton.clicked.connect(self.logout_btn_onclick)
- self.Play_pushButton.clicked.connect(self.play_btn_onclick)
- self.StopPlay_pushButton.clicked.connect(self.stop_play_btn_onclick)
- self.Attach_pushButton.clicked.connect(self.attach_btn_onclick)
- self.Detach_pushButton.clicked.connect(self.detach_btn_onclick)
- # 登录设备
- def login_btn_onclick(self):
- self.Attach_tableWidget.setHorizontalHeaderLabels(
- ['时间(Time)', '事件(Event)', '车牌号(Plate No.)', '车牌颜色(Plate Color)', '车型类型(Vehicle Type)',
- '车身颜色(Vehicle Color)'])
- ip = self.IP_lineEdit.text()
- port = int(self.Port_lineEdit.text())
- username = self.User_lineEdit.text()
- password = self.Pwd_lineEdit.text()
- stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY()
- stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY)
- stuInParam.szIP = ip.encode()
- stuInParam.nPort = port
- stuInParam.szUserName = username.encode()
- stuInParam.szPassword = password.encode()
- stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP
- stuInParam.pCapParam = None
- stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY()
- stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY)
- self.loginID, device_info, error_msg = self.sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam)
- if self.loginID:
- self.setWindowTitle('智能交通(IntelligentTraffic)-在线(OnLine)')
- self.Logout_pushButton.setEnabled(True)
- self.Login_pushButton.setEnabled(False)
- self.Play_pushButton.setEnabled(True)
- self.Attach_pushButton.setEnabled(True)
- for i in range(int(device_info.nChanNum)):
- self.Channel_comboBox.addItem(str(i))
- else:
- QMessageBox.about(self, '提示(prompt)', error_msg)
- # 登出设备
- def logout_btn_onclick(self):
- # 停止拉流
- if self.playID:
- self.sdk.StopRealPlayEx(self.playID)
- self.playID = 0
- self.Video_label.repaint()
- self.sdk.SetDecCallBack(self.freePort, None)
- self.sdk.Stop(self.freePort)
- self.sdk.CloseStream(self.freePort)
- self.sdk.ReleasePort(self.freePort)
- # 停止订阅
- if self.attachID:
- self.sdk.StopLoadPic(self.attachID)
- self.attachID = 0
- # 登出
- result = self.sdk.Logout(self.loginID)
- self.Login_pushButton.setEnabled(True)
- self.Logout_pushButton.setEnabled(False)
- self.Play_pushButton.setEnabled(False)
- self.StopPlay_pushButton.setEnabled(False)
- self.Attach_pushButton.setEnabled(False)
- self.Detach_pushButton.setEnabled(False)
- self.setWindowTitle("智能交通(IntelligentTraffic)-离线(OffLine)")
- self.loginID = C_LLONG(0)
- self.Channel_comboBox.clear()
- self.Attach_tableWidget.clear()
- self.Attach_tableWidget.setRowCount(0)
- self.Video_label.clear()
- self.GlobalScene_label.clear()
- self.SmallScene_label.clear()
- self.row = 0
- self.column = 0
- self.Attach_tableWidget.setHorizontalHeaderLabels(
- ['时间(Time)', '事件(Event)', '车牌号(Plate No.)', '车牌颜色(Plate Color)', '车型类型(Vehicle Type)',
- '车身颜色(Vehicle Color)'])
- # 开始实施预览
- def play_btn_onclick(self):
- result, self.freePort = self.sdk.GetFreePort()
- if not result:
- pass
- self.sdk.OpenStream(self.freePort)
- self.sdk.Play(self.freePort, self.Video_label.winId())
- channel = self.Channel_comboBox.currentIndex()
- self.playID = self.sdk.RealPlayEx(self.loginID, channel, 0)
- if self.playID:
- self.Play_pushButton.setEnabled(False)
- self.StopPlay_pushButton.setEnabled(True)
- self.sdk.SetRealDataCallBackEx2(self.playID, self.m_RealDataCallBack, None, EM_REALDATA_FLAG.RAW_DATA)
- self.sdk.SetDecCallBack(self.freePort, self.m_DecodingCallBack)
- else:
- QMessageBox.about(self, '提示(prompt)', 'error:' + str(self.sdk.GetLastError()))
- # 停止拉流
- def stop_play_btn_onclick(self):
- if self.playID:
- self.sdk.StopRealPlayEx(self.playID)
- self.Play_pushButton.setEnabled(True)
- self.StopPlay_pushButton.setEnabled(False)
- self.playID = 0
- self.Video_label.repaint()
- self.sdk.SetDecCallBack(self.freePort, None)
- self.sdk.Stop(self.freePort)
- self.sdk.CloseStream(self.freePort)
- self.sdk.ReleasePort(self.freePort)
- # 智能交通卡口事件订阅
- def attach_btn_onclick(self):
- self.GlobalScene_label.clear()
- self.SmallScene_label.clear()
- self.Attach_tableWidget.setHorizontalHeaderLabels(
- ['时间(Time)', '事件(Event)', '车牌号(Plate No.)', '车牌颜色(Plate Color)', '车型类型(Vehicle Type)',
- '车身颜色(Vehicle Color)'])
- channel = self.Channel_comboBox.currentIndex()
- bNeedPicFile = 1
- dwUser = 0
- self.attachID = self.sdk.RealLoadPictureEx(self.loginID, channel, EM_EVENT_IVS_TYPE.TRAFFICJUNCTION,
- bNeedPicFile, AnalyzerDataCallBack, dwUser, None)
- if not self.attachID:
- QMessageBox.about(self, '提示(prompt)', 'error:' + str(self.sdk.GetLastError()))
- else:
- self.Attach_pushButton.setEnabled(False)
- self.Detach_pushButton.setEnabled(True)
- QMessageBox.about(self, '提示(prompt)', "订阅成功(Subscribe success)")
- # 取消订阅
- def detach_btn_onclick(self):
- if (self.attachID == 0):
- return
- self.sdk.StopLoadPic(self.attachID)
- self.GlobalScene_label.clear()
- self.SmallScene_label.clear()
- self.attachID = 0
- self.Attach_pushButton.setEnabled(True)
- self.Detach_pushButton.setEnabled(False)
- self.Attach_tableWidget.setHorizontalHeaderLabels(
- ['时间(Time)', '事件(Event)', '车牌号(Plate No.)', '车牌颜色(Plate Color)', '车型类型(Vehicle Type)',
- '车身颜色(Vehicle Color)'])
- def update_UItable(self, dwAlarmType, show_info, detect_object_id, is_global, is_small):
- self.GlobalScene_label.clear()
- self.SmallScene_label.clear()
- if (dwAlarmType == EM_EVENT_IVS_TYPE.TRAFFICJUNCTION):
- if (self.row > 499):
- self.Attach_tableWidget.clear()
- self.Attach_tableWidget.setRowCount(0)
- self.Attach_tableWidget.setHorizontalHeaderLabels(
- ['时间(Time)', '事件(Event)', '车牌号(Plate No.)', '车牌颜色(Plate Color)', '车型类型(Vehicle Type)',
- '车身颜色(Vehicle Color)'])
- self.row = 0
- # ui更新
- self.Attach_tableWidget.viewport().update()
- self.Attach_tableWidget.setRowCount(self.row + 1)
- item1 = QTableWidgetItem(show_info.time_str)
- self.Attach_tableWidget.setItem(self.row, self.column, item1)
- item2 = QTableWidgetItem('交通路口事件(Traffic junction event)')
- self.Attach_tableWidget.setItem(self.row, self.column + 1, item2)
- item3 = QTableWidgetItem(show_info.plate_number_str)
- self.Attach_tableWidget.setItem(self.row, self.column + 2, item3)
- item4 = QTableWidgetItem(show_info.plate_color_str)
- self.Attach_tableWidget.setItem(self.row, self.column + 3, item4)
- item5 = QTableWidgetItem(show_info.object_subType_str)
- self.Attach_tableWidget.setItem(self.row, self.column + 4, item5)
- item6 = QTableWidgetItem(show_info.vehicle_color_str)
- self.Attach_tableWidget.setItem(self.row, self.column + 5, item6)
- if (self.attachID != 0):
- if is_global:
- image = QPixmap('./Global/Global_Img' + str(detect_object_id) + '.jpg').scaled(
- self.GlobalScene_label.width(),
- self.GlobalScene_label.height())
- self.GlobalScene_label.setPixmap(image)
- if is_small:
- image = QPixmap('./Small/Small_Img' + str(detect_object_id) + '.jpg').scaled(
- self.SmallScene_label.width(),
- self.SmallScene_label.height())
- self.SmallScene_label.setPixmap(image)
- self.row += 1
- self.Attach_tableWidget.viewport().update()
- # 实现断线回调函数功能
- def DisConnectCallBack(self, lLoginID, pchDVRIP, nDVRPort, dwUser):
- self.setWindowTitle("智能交通(IntelligentTraffic)-离线(OffLine)")
- # 实现断线重连回调函数功能
- def ReConnectCallBack(self, lLoginID, pchDVRIP, nDVRPort, dwUser):
- self.setWindowTitle('智能交通(IntelligentTraffic)-在线(OnLine)')
- # 拉流回调函数功能
- def RealDataCallBack(self, lRealHandle, dwDataType, pBuffer, dwBufSize, param, dwUser):
- if lRealHandle == self.playID:
- data_buffer = cast(pBuffer, POINTER(c_ubyte * dwBufSize)).contents
- with open('./data.dav', 'ab+') as data_file:
- data_file.write(data_buffer)
- self.sdk.InputData(self.freePort, pBuffer, dwBufSize)
- # PLAYSDK解码回调函数功能
- def DecodingCallBack(self, nPort, pBuf, nSize, pFrameInfo, pUserData, nReserved2):
- # here get YUV data, pBuf is YUV data IYUV/YUV420 ,size is nSize, pFrameInfo is frame info with height, width.
- data = cast(pBuf, POINTER(c_ubyte * nSize)).contents
- info = pFrameInfo.contents
- # info.nType == 3 is YUV data,others ard audio data.
- # you can parse YUV420 data to RGB
- if info.nType == 3:
- pass
- # 关闭主窗口时清理资源
- def closeEvent(self, event):
- event.accept()
- if self.attachID:
- self.sdk.StopLoadPic(self.attachID)
- self.attachID = 0
- if self.playID:
- self.sdk.StopRealPlayEx(self.playID)
- self.playID = 0
- if self.loginID:
- self.sdk.Logout(self.loginID)
- self.loginID = 0
- self.sdk.Cleanup()
- self.Video_label.repaint()
- self.Attach_tableWidget.clear()
- self.Attach_tableWidget.setRowCount(0)
- if __name__ == '__main__':
- app = QApplication(sys.argv)
- my_wnd = TrafficWnd()
- wnd = my_wnd
- my_wnd.show()
- sys.exit(app.exec_())
|