Source code for your_project.utils.file_upload

#!/usr/bin/env python3
"""File upload utilities for multipart/form-data requests"""

from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Optional, Union

import requests
from requests_toolbelt import MultipartEncoder


[docs] class FileUploader: """Wrapper for handling file uploads with multipart/form-data""" def __init__(self, session: Optional[requests.Session] = None): self.session = session or requests.Session()
[docs] def upload( self, url: str, file_content: Union[str, bytes], filename: str, file_field_name: str = "file", content_type: Optional[str] = None, additional_fields: Optional[Dict[str, str]] = None, **kwargs, ) -> requests.Response: """Upload a file using multipart/form-data Args: url: Target URL for the upload file_content: File content (string or bytes) filename: Name of the file (can include encoded characters like %00) file_field_name: Form field name for the file (default: "file") content_type: MIME type of the file (default: auto-detect) additional_fields: Additional form fields to include **kwargs: Additional arguments to pass to requests Returns: Response object from the upload request """ if isinstance(file_content, str): file_content = file_content.encode() fields = {} # Add file field fields[file_field_name] = (filename, BytesIO(file_content), content_type or self._guess_content_type(filename)) # Add additional fields if provided if additional_fields: fields.update(additional_fields) # Create multipart encoder encoder = MultipartEncoder(fields=fields) # Prepare headers headers = kwargs.pop("headers", {}) headers["Content-Type"] = encoder.content_type return self.session.post(url, data=encoder, headers=headers, **kwargs)
[docs] def upload_with_bypass( self, url: str, file_content: Union[str, bytes], filename: str, bypass_technique: Optional[str] = None, file_field_name: str = "file", additional_fields: Optional[Dict[str, str]] = None, **kwargs, ) -> requests.Response: """Upload a file with various bypass techniques Args: url: Target URL for the upload file_content: File content filename: Base filename bypass_technique: Technique to use ('null_byte', 'double_extension', 'case_variation', 'mime_mismatch') file_field_name: Form field name for the file additional_fields: Additional form fields **kwargs: Additional arguments to pass to requests Returns: Response object from the upload request """ # Get base name and extension if "." in filename: base, ext = filename.rsplit(".", 1) else: base, ext = filename, "" # Apply bypass technique to filename modified_filename = filename content_type = None if bypass_technique == "null_byte": # Add null byte with safe extension modified_filename = f"{base}.{ext}%00.jpg" content_type = "image/jpeg" elif bypass_technique == "double_extension": # Use double extension with safe outer extension modified_filename = f"{base}.jpg.{ext}" content_type = "image/jpeg" elif bypass_technique == "case_variation": # Use case variation on extension modified_filename = f"{base}.{ext.upper()}" elif bypass_technique == "mime_mismatch": # Keep filename but use different MIME type content_type = "image/jpeg" return self.upload( url=url, file_content=file_content, filename=modified_filename, file_field_name=file_field_name, content_type=content_type, additional_fields=additional_fields, **kwargs, )
[docs] def upload_from_path( self, url: str, file_path: Union[str, Path], custom_filename: Optional[str] = None, file_field_name: str = "file", content_type: Optional[str] = None, additional_fields: Optional[Dict[str, str]] = None, **kwargs, ) -> requests.Response: """Upload a file from disk Args: url: Target URL for the upload file_path: Path to the file on disk custom_filename: Custom filename to use (default: actual filename) file_field_name: Form field name for the file content_type: MIME type of the file (default: auto-detect) additional_fields: Additional form fields **kwargs: Additional arguments to pass to requests Returns: Response object from the upload request """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") with open(file_path, "rb") as f: content = f.read() filename = custom_filename or file_path.name return self.upload( url=url, file_content=content, filename=filename, file_field_name=file_field_name, content_type=content_type, additional_fields=additional_fields, **kwargs, )
@staticmethod def _guess_content_type(filename: str) -> str: """Guess content type from filename""" # Clean filename of encoded characters for extension detection clean_name = filename.lower().replace("%00", "") # Get the last extension (in case of double extensions) if "." in clean_name: ext = clean_name.split(".")[-1] else: return "application/octet-stream" mime_types = { # Web scripts "php": "application/x-php", "asp": "application/x-asp", "aspx": "application/x-aspx", "jsp": "application/x-jsp", "py": "text/x-python", "rb": "text/x-ruby", "pl": "text/x-perl", # Images "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "gif": "image/gif", "bmp": "image/bmp", "svg": "image/svg+xml", # Documents "txt": "text/plain", "html": "text/html", "htm": "text/html", "xml": "text/xml", "pdf": "application/pdf", "doc": "application/msword", "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # Archives "zip": "application/zip", "tar": "application/x-tar", "gz": "application/gzip", "rar": "application/x-rar-compressed", # Others "json": "application/json", "js": "application/javascript", "css": "text/css", } return mime_types.get(ext, "application/octet-stream")
[docs] def quick_upload( session: requests.Session, url: str, content: Union[str, bytes], filename: str, submit_button: Optional[tuple[str, str]] = None, ) -> requests.Response: """Quick helper function for simple file uploads Args: session: Requests session to use url: Target URL content: Content to upload filename: Filename to use submit_button: Optional tuple of (field_name, field_value) for submit button Returns: Response object """ uploader = FileUploader(session) additional_fields = {} if submit_button: additional_fields[submit_button[0]] = submit_button[1] return uploader.upload( url=url, file_content=content, filename=filename, additional_fields=additional_fields, )