[python] SDK连接大华相机,接收卡口抓拍图片,提取人脸,通过1400协议上传平台
相机的坑
平台上加了很多相机,但是发现总有一部分卡口相机没有人脸数据,研究发现这部分相机本身也是支持抓人脸的,但是人脸仅仅只会叠加在卡口图片,使用SDK接收不到,平台也接收不到。于是尝试使用SDK接收相机抓拍图片后,从中分离出人脸图片,再通过GAT1400协议上传平台。
SDK对接相机(camera_conn.py)
官网上下载了python的sdk,是带gui的,稍微调整了一下,使其可以在命令行运行。同时,引入图像拆分、1400对接模块。
# coding=utf-8
import sys
from PyQt5.QtWidgets import QMainWindow,QApplication, QTableWidgetItem, QMessageBox
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QPixmap
from PyQt5.QtCore import QThread,pyqtSignal
import image_get
from IntelligentTrafficUI import Ui_MainWindow
from NetSDK.NetSDK import NetClient
from NetSDK.SDK_Struct import *
from NetSDK.SDK_Enum import *
from NetSDK.SDK_Callback import *
import time
from queue import Queue
import subimage_1400
global wnd
callback_num = 0
class TrafficCallBackAlarmInfo:
def __init__(self):
self.time_str = ""
self.plate_number_str = ""
self.plate_color_str = ""
self.object_subType_str = ""
self.vehicle_color_str = ""
def get_alarm_info(self, alarm_info):
self.time_str = '{}-{}-{} {}:{}:{}'.format(alarm_info.UTC.dwYear, alarm_info.UTC.dwMonth, alarm_info.UTC.dwDay,
alarm_info.UTC.dwHour, alarm_info.UTC.dwMinute, alarm_info.UTC.dwSecond)
self.plate_number_str = str(alarm_info.stTrafficCar.szPlateNumber.decode('gb2312'))
self.plate_color_str = str(alarm_info.stTrafficCar.szPlateColor, 'utf-8')
self.object_subType_str = str(alarm_info.stuVehicle.szObjectSubType, 'utf-8')
self.vehicle_color_str = str(alarm_info.stTrafficCar.szVehicleColor, 'utf-8')
print(alarm_info.byImageIndex)
class BackUpdateUIThread(QThread):
# 通过类成员对象定义信号
update_date = pyqtSignal(int, object, int, bool, bool)
# 处理业务逻辑
def run(self):
pass
@CB_FUNCTYPE(None, C_LLONG, C_DWORD, c_void_p, POINTER(c_ubyte), C_DWORD, C_LDWORD, c_int, c_void_p)
def AnalyzerDataCallBack(lAnalyzerHandle, dwAlarmType, pAlarmInfo, pBuffer, dwBufSize, dwUser, nSequence, reserved):
print('Enter AnalyzerDataCallBack',lAnalyzerHandle,dwAlarmType)
global callback_num
local_path = os.path.abspath('.')
is_global = False
is_small = False
show_info = TrafficCallBackAlarmInfo()
callback_num += 1
alarm_info = cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).contents
show_info.get_alarm_info(alarm_info)
#print('pAlarmInfo',cast(pAlarmInfo, POINTER(DEV_EVENT_TRAFFICJUNCTION_INFO)).value)
print('buffer:',pBuffer)
print(show_info.time_str,show_info.plate_number_str,show_info.plate_color_str,show_info.object_subType_str,show_info.vehicle_color_str)
if alarm_info.stuObject.bPicEnble:
is_global = True
GlobalScene_buf = cast(pBuffer,POINTER(c_ubyte * alarm_info.stuObject.stPicInfo.dwOffSet)).contents
if not os.path.isdir(os.path.join(local_path, 'Global')):
os.mkdir(os.path.join(local_path, 'Global'))
with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
global_pic.write(bytes(GlobalScene_buf))
face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg')
print(callback_num,face_list)
if (alarm_info.stuObject.stPicInfo.dwFileLenth > 0):
is_small = True
small_buf = pBuffer[alarm_info.stuObject.stPicInfo.dwOffSet:alarm_info.stuObject.stPicInfo.dwOffSet +
alarm_info.stuObject.stPicInfo.dwFileLenth]
if not os.path.isdir(os.path.join(local_path, 'Small')):
os.mkdir(os.path.join(local_path, 'Small'))
with open('./Small/Small_Img' + str(callback_num) + '.jpg', 'wb+') as small_pic:
small_pic.write(bytes(small_buf))
elif (dwBufSize > 0):
is_global = True
GlobalScene_buf = cast(pBuffer, POINTER(c_ubyte * dwBufSize)).contents
if not os.path.isdir(os.path.join(local_path, 'Global')):
os.mkdir(os.path.join(local_path, 'Global'))
with open('./Global/Global_Img' + str(callback_num) + '.jpg', 'wb+') as global_pic:
global_pic.write(bytes(GlobalScene_buf))
face_list=image_get.get_face('./Global/Global_Img' + str(callback_num) + '.jpg')
print(callback_num,face_list)
cj_path='Global/Global_Img' + str(callback_num) + '.jpg'
if face_list !=[]:
for i in face_list:
subimage_1400.submit_face('33052200001310094777',cj_path,i)
return
def dingyue(sdk,channel,loginID):
bNeedPicFile = 1
dwUser = 0
attachID = sdk.RealLoadPictureEx(loginID, channel, EM_EVENT_IVS_TYPE.ALL, bNeedPicFile, AnalyzerDataCallBack, dwUser, None)
if not attachID:
print('error:' + str(sdk.GetLastError()))
else:
print("订阅成功(Subscribe success)",attachID)
def login(ip,username,password,port):
loginID = C_LLONG()
playID = C_LLONG()
freePort = c_int()
attachID = C_LLONG()
"""
回调,不涉及
m_DisConnectCallBack = fDisConnect(DisConnectCallBack)
m_ReConnectCallBack = fHaveReConnect(ReConnectCallBack)
m_DecodingCallBack = fDecCBFun(DecodingCallBack)
m_RealDataCallBack = fRealDataCallBackEx2(RealDataCallBack)
"""
# 获取NetSDK对象并初始化
sdk = NetClient()
#sdk.InitEx(m_DisConnectCallBack) #掉线回调,不涉及
sdk.InitEx()
#sdk.SetAutoReconnect(m_ReConnectCallBack)
ip = ip
port = port
username = username
password = password
stuInParam = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY()
stuInParam.dwSize = sizeof(NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY)
stuInParam.szIP = ip.encode()
stuInParam.nPort = port
stuInParam.szUserName = username.encode()
stuInParam.szPassword = password.encode()
stuInParam.emSpecCap = EM_LOGIN_SPAC_CAP_TYPE.TCP
stuInParam.pCapParam = None
stuOutParam = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY()
stuOutParam.dwSize = sizeof(NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY)
loginID, device_info, error_msg = sdk.LoginWithHighLevelSecurity(stuInParam, stuOutParam)
if loginID:
print(loginID,device_info)
for i in range(int(device_info.nChanNum)):
print(i)
dingyue(sdk,i,loginID)
else:
print(error_msg)
if __name__ == '__main__':
login('111.111.111.111','admin','admin123',37777) #连接相机
while True:
time.sleep(1) #防止程序退出后收不到回调事件图像分割(image_get.py)
使用opencv分割图像,取绿色的轮廓(即人脸)。
import cv2
import numpy as np
import os
import hashlib
from functools import partial
fd_path='Global'
file_list=os.listdir(fd_path)
def md5(data,block_size=65536):
m=hashlib.md5()
for item in iter(partial(data.read,block_size),b''):
m.update(item)
str_md5=m.hexdigest()
return str_md5
def get_face(filepath):
# 加载图像
image_path = filepath
image = cv2.imread(image_path)
# 将图像从 BGR 格式转换为 HSV 格式
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# 定义绿色在HSV颜色空间中的范围
lower_green = np.array([50, 50, 50]) # 你可能需要根据实际情况进行调整
upper_green = np.array([80, 255, 255])
# 创建掩码,通过颜色范围过滤图像
green_mask = cv2.inRange(hsv_image, lower_green, upper_green)
# 寻找轮廓
contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
searchflag=0
# 遍历找到的轮廓
i=1
face_list=[]
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
#print(x,y,w,h)
if w >200 and h > 200: # 限制最小方框尺寸
green_box = image[y:y+h, x:x+w]
#str_md5=md5(green_box)
#print(str_md5)
cv2.imwrite('tempfile.jpg', green_box)
#cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) # 在原图上绘制绿色方框
f=open('tempfile.jpg','rb')
str_md5=md5(f)
print(str_md5)
f.close()
cv2.imwrite('face//'+str_md5+'.jpg', green_box)
face_list.append('face//'+str_md5+'.jpg')
searchflag=1
if(searchflag):
print(filepath,'提取成功')
else:
print(filepath,'提取失败')
return face_list
# 保存绘制了绿色方框的图像
#cv2.imwrite('Small\\'+i, image)
"""for i in file_list:
if 'jpg' in i:
get_face(i)
"""1400对接(subimage_1400.py)
时间有限,网上也没有找到python实现GAT1400的模块,参考GAT1400的文档,简单做一下注册、保活、人脸图片上传的流程。
import requests
import json
import base64
import time
from datetime import datetime
import hashlib
import random
import threading
import base64
base_url='http://111.222.111.222:9304'
# 注册URL和认证信息
register_url = base_url+'/VIID/System/Register'
keepalive_url = base_url+'/VIID/System/Keepalive'
username = 'admin '
password = 'admin123' # 替换为实际的密码
device_code='33052200001320646938'
channel_code='33052200001310094777'
# 创建注册请求数据
register_data = {
"RegisterObject": {
"DeviceID": device_code
}
}
# 创建保活请求数据
keepalive_data = {
"KeepaliveObject": {
"DeviceID": device_code
}
}
# 发送第一次注册请求
headers = {
'Accept': 'application/json,application/*+json',
'Content-Type': 'application/VIID+JSON;charset=UTF-8',
'Connection': 'keepalive',
'User-Identify': device_code,
'User-Agent': 'libghttp/1.0'
}
def get_img_base(img_path):
f=open(img_path,'rb')
img=f.read()
img_base=base64.b64encode(img).decode('utf-8')
print(len(img_base))
return img_base
def calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri):
# 计算 HA1
ha1_str = f"{username}:{realm}:{password}"
ha1 = hashlib.md5(ha1_str.encode()).hexdigest()
# 计算 HA2
ha2_str = f"{method}:{uri}"
ha2 = hashlib.md5(ha2_str.encode()).hexdigest()
# 计算 response
if qop:
response_str = f"{ha1}:{nonce}:{'00000001'}:{cnonce}:{qop}:{ha2}"
else:
response_str = f"{ha1}:{nonce}:{ha2}"
response = hashlib.md5(response_str.encode()).hexdigest()
return response
response = requests.post(url=register_url, headers=headers, data=json.dumps(register_data),allow_redirects=False)
print(response.text)
if response.status_code == 401:
# 如果返回未认证,继续进行第二次注册请求
auth_info = response.headers['WWW-Authenticate']
print(auth_info)
nonce = auth_info.split('nonce="')[1].split('"')[0]
#algorithm = auth_info.split('algorithm="')[1].split('"')[0]
algorithm='MD5'
realm = auth_info.split('realm')[1].split('"')[0]
realm='iasserver'
#print(nonce,algorithm,realm)
# 创建第二次注册请求数据
qop = "auth" # 或 "auth-int",根据需要选择
cnonce = "your_cnonce" # 替换为生成的 cnonce
method = "POST" # 或 "GET",根据实际请求方法选择
uri = "/VIID/System/Register" # 替换为实际的请求 URI
cnonce = "luz" # 替换为实际的cnonce
nc = "00000001" # 替换为实际的nc值
response_code = calculate_digest_response(username, password, realm, nonce, qop, cnonce, method, uri)
print("计算得到的 response:", response_code)
response_data = {
"RegisterObject": {
"DeviceID": device_code
}
}
auth_header = f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="/VIID/System/Register", algorithm="{algorithm}", cnonce="{cnonce}", nc={nc}, qop={qop}, response="{response_code}"'
headers['Authorization'] = auth_header
print("第二次参数:",auth_header)
# 发送第二次注册请求
response = requests.post(register_url, headers=headers, data=json.dumps(response_data))
# 保活
def keepalive_1400():
while True:
response = requests.post(keepalive_url, headers=headers, data=json.dumps(keepalive_data))
if response.status_code == 200:
response_json = response.json()
print("保活成功,响应内容:")
print(response_json)
else:
print("保活失败,状态码:", response.status_code)
# 每隔60秒执行一次保活操作
time.sleep(60)
def submit_face(channel_code,cj_path,rl_path):
print(cj_path,rl_path)
# 定义请求的URL
url = base_url+'/VIID/Faces'
# 创建请求头
headers = {
'Accept': 'application/json,application/VIID+json',
'Content-Type': 'application/VIID+JSON;charset=UTF-8',
'Connection': 'keepalive',
'User-Identify': device_code,
'User-Agent': 'libghttp/1.0'
}
# 创建人脸批量增加请求数据
cur_time=str(datetime.now().strftime("%Y%m%d%H%M%S"))
print(cur_time)
begin_code=channel_code+cur_time+str(random.randrange(10,99))
face_data = {
"FaceListObject": {
"FaceObject": [{
#"FaceID": "330522000013204130240220200510200408000010600001",
"FaceID":begin_code,
"InfoKind": 0,
"SourceID": begin_code,
"DeviceID": channel_code,
"LeftTopX": 163,
"LeftTopY": 726,
"RightBtmX": 334,
"RightBtmY": 897,
"AgeUpLimit": 0,
"AgeLowerLimit": 0,
"AccompanyNumber": 0,
"IsDriver": 0,
"IsForeigner": 0,
"IsSuspectedTerrorist": 0,
"IsCriminalInvolved": 0,
"IsDetainees": 0,
"IsVictim": 0,
"IsSuspiciousPerson": 0,
"Attitude": 0,
"m_dSimilaritydegree": 0,
"SubImageList": {
"SubImageInfoObject": [{
"ImageID": begin_code+'3',
"EventSort": 2,
"DeviceID": channel_code,
"Type": "14", #大图
"FileFormat": "Jpeg",
"ShotTime": cur_time,
"Width": 1920,
"Height": 1080,
"FileSize": 156500,
"Data": get_img_base(cj_path)
}, {
"ImageID": begin_code+'2',
"EventSort": 3,
"DeviceID": channel_code,
"Type": "11", #人脸图
"FileFormat": "Jpeg",
"ShotTime": cur_time,
"Width": 461,
"Height": 461,
"FileSize": 101984,
"Data": get_img_base(rl_path)
}]
}
}]
}
}
# 发送人脸批量增加请求
response = requests.post(url, headers=headers, data=json.dumps(face_data))
# 检查响应
if response.status_code == 200:
response_json = response.json()
print("人脸批量增加成功,响应内容:")
print(response_json)
else:
print("人脸批量增加失败,状态码:", response.status_code)
print(response.text)
# 检查注册是否成功
if response.status_code == 200:
response_json = response.json()
print("注册成功,响应内容:")
print(response_json)
thread=threading.Thread(target=keepalive_1400)
thread.start() #另起一个线程做保活,否则主线程运行会阻塞
else:
print("注册失败,状态码:", response.status_code)
print(response.text)运行效果
