浙江大华9系平台鉴权与HLS拉流地址获取 python demo
官方文档
https://open-gov.dahuatech.com/#/interfaces?id=141&parentID=135
创建会话
接口描述
通过两次交互,最终完成登录,并获取到token,在用户后续的请求交互中需要携带该token进行身份和权限的校验。
登录流程:
第一次登陆,客户端只传用户名,服务端返回realm、readomKey和encryptType信息。
第二次登录,客户端根据返回的信息,按照指定的加密算法计算签名,再带着用户名和签名登陆一次。
参数说明:
username-value:用户名
passwd-value:明文密码
encryptType-value:服务端返回的哈希算法,可以是MD5,SHA1等,目前只支持MD5,且计算结果的a-f使用小写字母表示。
realm-value:服务端返回的域信息,服务端可以使用集群UID作为域信息
randomKey-value:随机秘钥种子
通用加密流程:(第一次登陆返回没有method字段)
passwd-value0 = encryptType-value(passwd-value) 其中passwd-value是密码明文。
passwd-value1 = encryptType-value(username-value:passwd-value0) 中间无:号
passwd-tmp = encryptType-value(passwd-value1)
encrypted-passwd = encryptType-value(username-value:realm-value:passwd-tmp) 中间含:号
signature = encryptType-value(encrypted-passwd:randomKey-value) 中间含:号
简易加密流程:(第一次登陆返回method字段为simple)
encrypted-passwd = encryptType-value(username-value:realm-value:passwd-value) 中间含:号,passwd-value是密码明文。
signature = encryptType-value(encrypted-passwd:randomKey-value) 中间含:号
请求
请求语法
POST /videoService/accounts/authorize
请求参数
无
第一次交互
请求内容
{
"userName" : "system",
"clientType" : "winpc",
"ipAddress" : "10.10.10.10"
}请求内容参数
userName: 类型
string,必填。用户名,最长32字节。clientType: 类型
string,必填。客户端类型。固定填 winpc 。ipAddress: 类型
string,选填。客户端的ip地址,用于日志审计。
响应内容
{
"realm" : "TheNextService",
"randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3",
"encryptType" : "MD5",
"method" : "simple"
}响应内容参数
realm: 类型
string。域信息,加密过程使用。randomKey: 类型
string。随机密钥种子。encryptType: 类型
string。加密算法method : 类型
string。使用哪种加密流程。没有该字段,按通用加密流程计算,"simple"表示按简易加密流程计算。
第二次交互
请求内容
{
"userName" : "system",
"signature" : "ad1290902f8bfb0bd7983111d91ddb6b",
"randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3",
"encryptType" : "MD5",
"clientType" : "winpc",
"ipAddress" : "10.10.10.10"
}请求内容参数
userName : 类型
string,必填。用户名,最长32字节。signature : 类型
string,必填。根据签名计算方法得到的签名值。randomKey : 类型
string,必填。随机密钥种子。encryptType : 类型
string,必填。加密算法。clientType : 类型
string,必填。客户端类型。ipAddress : 类型
string,选填。客户端ip地址,用于日志审计。
响应内容
{
"duration" : 120,
"token" : "S4NbecfYB19QUJHT4T8M7G_Y5524jUq1yNQtu+FruA87hAErnAHoQTEXtUn1AEZz0TKwKi0EIriyrT8ul2ejSGcy7lxWndsP4+GWch2C/fcgy+YAz2GJnbQtRFFbq3XNfi7iAXeEG6O40GKODI2ICTq",
"userName": "system",
"userId" : "S4NbecfYB19QUJHT4T8M7G",
"lastLoginIp": "10.10.10.10"
}响应内容参数
duration : 类型
int。有效时间,由服务端指定,单位秒,建议3/4 duration时就进行会话更新。token : 类型
string。返回登陆令牌,令牌字符串由服务端发布。之后的其它请求,在HTTP头的 X-Subject-Token: 带上这个令牌进行鉴权。userName : 类型
string。创建该会话的用户名。userId : 类型
string。用户名对应的用户Id,如果需要对该用户信息进行修改,使用用户Id。lastLoginIp : 类型
string。上次登陆的IP。version : 类型
string。视频云版本
注意事项
必须设置HTTP头的
Content-Type: application/json;charset=UTF-8,否则返回415。第一次登录成功,响应返回的http 状态码为401
为便于理解,附上计算通用加密流程签名的Java伪代码片断
JSON response = firstLogin(); String randomKey = response["randomKey"]; String realm = response["realm"]; String userName = xxx; /// 用户名 String password = yyy; /// 该用户的明文密码 /// 一共计算五次MD5 String signature = encrypt(password, "MD5"); signature = encrypt(userName+signature, "MD5"); signature = encrypt(signature, "MD5"); signature = encrypt(userName+":"+realm+":"+signature, "MD5"); signature = encrypt(signature+":"+randomKey, "MD5");
示例
第一次请求示例
POST /videoService/accounts/authorize HTTP/1.1
Content-Type: application/json;charset=UTF-8
Content-Length: 103
{
"userName" : "system",
"clientType" : "winpc",
"ipAddress" : "10.10.10.10"
}第一次响应示例
HTTP/1.1 401 Unauthorized
{
"realm" : "TheNextService",
"randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad",
"encryptType" : "MD5"
}第二次请求示例
POST /videoService/accounts/authorize HTTP/1.1
Content-Type: application/json;charset=UTF-8
Content-Length: 217
{
"userName" : "system",
"signature" : "ad1290902f8bfb0bd7983111d91ddb6b",
"randomKey" : "54b53072540eeeb8f8e9343e71f28176_d0e2bdad-1777-446b-b863-51257f5558d3",
"encryptType" : "MD5",
"clientType" : "winpc",
"ipAddress" : "10.10.10.10"
}第二次响应示例
HTTP/1.1 200 OK
{
"duration" : 120,
"token" : "S4NbecfYB19QUJHT4T8M7G_Y5524jUq1yNQtu+FruA87hAErnAHoQTEXtUn1AEZz0TKwKi0EIriyrT8ul2ejSGcy7lxWndsP4+GWch2C/fcgy+YAz2GJnbQtRFFbq3XNfi7iAXeEG6O40GKODI2ICTq",
"userName": "system",
"userId" : "S4NbecfYB19QUJHT4T8M7G",
"lastLoginIp": "10.10.10.10"
}demo示例
import hashlib
import requests
import json
API_ENDPOINT = "/videoService/accounts/authorize"
baseUrl="http://1.1.1.1:8314" #平台Web地址
def send_first_login_request():
# Prepare and send the first login request to the API endpoint
url = baseUrl + API_ENDPOINT # Replace "API_BASE_URL" with the actual base URL of the API
payload = {
"userName": "system",
"clientType": "winpc",
"ipAddress": "10.10.10.10"
}
headers = {
"Content-Type": "application/json;charset=UTF-8"
}
response = requests.post(url, json=payload, headers=headers)
return response.json()
def calculate_signature(username, password, realm, random_key, encrypt_type, method):
password_value_0 = encrypt(password, encrypt_type)
password_value_1 = encrypt(username + password_value_0, encrypt_type)
password_tmp = encrypt(password_value_1, encrypt_type)
encrypted_password = encrypt(username + ":" + realm + ":" + password_tmp, encrypt_type)
return encrypt(encrypted_password + ":" + random_key, encrypt_type)
def send_second_login_request(username, signature, random_key, encrypt_type):
# Prepare and send the second login request to the API endpoint
url = baseUrl + API_ENDPOINT # Replace "API_BASE_URL" with the actual base URL of the API
payload = {
"userName": username,
"signature": signature,
"randomKey": random_key,
"encryptType": encrypt_type,
"clientType": "winpc",
"ipAddress": "10.10.10.10"
}
headers = {
"Content-Type": "application/json;charset=UTF-8"
}
response = requests.post(url, json=payload, headers=headers)
return response.json()
def encrypt(input_string, encrypt_type):
hashed = hashlib.new(encrypt_type, input_string.encode()).hexdigest()
return hashed
# Step 1: Send first login request
first_login_response = send_first_login_request()
print(first_login_response)
# Extract response parameters
realm = first_login_response["realm"]
random_key = first_login_response["randomKey"]
encrypt_type = first_login_response["encryptType"]
method = first_login_response.get("method", "")
# Step 2: Prepare second login request
username = "system"
password = "XXXXXX" # Replace with the actual password
signature = calculate_signature(username, password, realm, random_key, encrypt_type, method)
print(signature)
# Step 3: Send second login request
second_login_response = send_second_login_request(username, signature, random_key, encrypt_type)
print(second_login_response)
# Process second login response
# Extract the "token", "userName", "userId", "lastLoginIp", and other required fields from the response JSON
# and perform the necessary actions.
def get_hls_url(channel_id, token):
# Prepare and send the request to get HLS URL
base_url = baseUrl # Replace with the actual base URL
endpoint = "/videoService/realmonitor/uri"
params = {
"channelId": channel_id,
"scheme": "HLS"
}
headers = {
"X-Subject-Token": token
}
response = requests.get(base_url + endpoint, params=params, headers=headers).text
return json.loads(response)
# Example usage
channel_id = "xxxxx" #测试通道编码
token = second_login_response['token']
print(token)
# Get HLS URL
hls_url_response = get_hls_url(channel_id, token)
print(hls_url_response)
# Extract the HLS URL
hls_url = hls_url_response["url"]
print("HLS URL:", hls_url)