溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么使用Python實(shí)現(xiàn)SQL注入檢測(cè)插件

發(fā)布時(shí)間:2021-04-07 12:32:39 來(lái)源:億速云 閱讀:311 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)怎么使用Python實(shí)現(xiàn)SQL注入檢測(cè)插件,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

掃描器需要實(shí)現(xiàn)的功能思維導(dǎo)圖

怎么使用Python實(shí)現(xiàn)SQL注入檢測(cè)插件

爬蟲(chóng)編寫思路

首先需要開(kāi)發(fā)一個(gè)爬蟲(chóng)用于收集網(wǎng)站的鏈接,爬蟲(chóng)需要記錄已經(jīng)爬取的鏈接和待爬取的鏈接,并且去重,用 Python 的set()就可以解決,大概流程是:

  • 輸入 URL

  • 下載解析出 URL

  • URL 去重,判斷是否為本站

  • 加入到待爬列表

  • 重復(fù)循環(huán)

SQL 判斷思路

  • 通過(guò)在 URL 后面加上AND %d=%d或者OR NOT (%d>%d)

  • %d后面的數(shù)字是隨機(jī)可變的

  • 然后搜索網(wǎng)頁(yè)中特殊關(guān)鍵詞,比如:

MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...

通過(guò)這些關(guān)鍵詞就可以判斷出所用的數(shù)據(jù)庫(kù)

  • 還需要判斷一下 waf 之類的東西,有這種東西就直接停止。簡(jiǎn)單的方法就是用特定的 URL 訪問(wèn),如果出現(xiàn)了像IP banned,fierwall之類的關(guān)鍵詞,可以判斷出是waf。具體的正則表達(dá)式是(?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)

  • 開(kāi)發(fā)準(zhǔn)備展開(kāi)目錄

請(qǐng)安裝這些庫(kù)

pip install requests
pip install beautifulsoup4

實(shí)驗(yàn)環(huán)境是 Linux,創(chuàng)建一個(gè)Code目錄,在其中創(chuàng)建一個(gè)work文件夾,將其作為工作目錄

目錄結(jié)構(gòu)

/w8ay.py  // 項(xiàng)目啟動(dòng)主文件
/lib/core // 核心文件存放目錄
/lib/core/config.py // 配置文件
/script   // 插件存放
/exp      // exp和poc存放

步驟

SQL 檢測(cè)腳本編寫

DBMS_ERRORS = {
  'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
  "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
  "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
  "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
  "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
  "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
  "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
  "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}

通過(guò)正則表達(dá)式就可以判斷出是哪個(gè)數(shù)據(jù)庫(kù)了

for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
  if (re.search(regex,_content)):
    return True

下面是我們測(cè)試語(yǔ)句的payload

BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")

用報(bào)錯(cuò)語(yǔ)句返回正確的內(nèi)容和錯(cuò)誤的內(nèi)容進(jìn)行對(duì)比

for test_payload in BOOLEAN_TESTS:
  # Right Page
  RANDINT = random.randint(1, 255)
  _url = url + test_payload % (RANDINT, RANDINT)
  content["true"] = Downloader.get(_url)
  _url = url + test_payload % (RANDINT, RANDINT + 1)
  content["false"] = Downloader.get(_url)
  if content["origin"] == content["true"] != content["false"]:
    return "sql found: %" % url

這句

content["origin"] == content["true"] != content["false"]

意思就是當(dāng)原始網(wǎng)頁(yè)等于正確的網(wǎng)頁(yè)不等于錯(cuò)誤的網(wǎng)頁(yè)內(nèi)容時(shí),就可以判定這個(gè)地址存在注入漏洞

完整代碼:

import re, random
from lib.core import Download
def sqlcheck(url):
  if (not url.find("?")): # Pseudo-static page
    return false;
  Downloader = Download.Downloader()
  BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
  DBMS_ERRORS = {
    # regular expressions used for DBMS recognition based on error message response
    "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
    "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
    "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
    "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
    "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
    "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
    "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
    "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
  }
  _url = url + "%29%28%22%27"
  _content = Downloader.get(_url)
  for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
    if (re.search(regex,_content)):
      return True
  content = {}
  content['origin'] = Downloader.get(_url)
  for test_payload in BOOLEAN_TESTS:
    # Right Page
    RANDINT = random.randint(1, 255)
    _url = url + test_payload % (RANDINT, RANDINT)
    content["true"] = Downloader.get(_url)
    _url = url + test_payload % (RANDINT, RANDINT + 1)
    content["false"] = Downloader.get(_url)
    if content["origin"] == content["true"] != content["false"]:
      return "sql found: %" % url

將這個(gè)文件命名為sqlcheck.py,放在/script目錄中。代碼的第 4 行作用是查找 URL 是否包含?,如果不包含,比方說(shuō)偽靜態(tài)頁(yè)面,可能不太好注入,因此需要過(guò)濾掉

爬蟲(chóng)的編寫

爬蟲(chóng)的思路上面講過(guò)了,先完成 URL 的管理,我們單獨(dú)將它作為一個(gè)類,文件保存在/lib/core/UrlManager.py

#-*- coding:utf-8 -*-

class UrlManager(object):
  def __init__(self):
    self.new_urls = set()
    self.old_urls = set()
    
  def add_new_url(self, url):
    if url is None:
      return
    if url not in self.new_urls and url not in self.old_urls:
      self.new_urls.add(url)
   
  def add_new_urls(self, urls):
    if urls is None or len(urls) == 0:
      return
    for url in urls:
      self.add_new_url(url)
    
  def has_new_url(self):
    return len(self.new_urls) != 0
   
  def get_new_url(self):
    new_url = self.new_urls.pop()
    self.old_urls.add(new_url)
    return new_url

為了方便,我們也將下載功能單獨(dú)作為一個(gè)類使用,文件保存在lib/core/Downloader.py

#-*- coding:utf-8 -*-
import requests

class Downloader(object):
  def get(self, url):
    r = requests.get(url, timeout = 10)
    if r.status_code != 200:
      return None
    _str = r.text
    return _str
  
  def post(self, url, data):
    r = requests.post(url, data)
    _str = r.text
    return _str
  
  def download(self, url, htmls):
    if url is None:
      return None
    _str = {}
    _str["url"] = url
    try:
      r = requests.get(url, timeout = 10)
      if r.status_code != 200:
        return None
      _str["html"] = r.text
    except Exception as e:
      return None
    htmls.append(_str)

特別說(shuō)明,因?yàn)槲覀円獙懙呐老x(chóng)是多線程的,所以類中有個(gè)download方法是專門為多線程下載專用的

在lib/core/Spider.py中編寫爬蟲(chóng)

#-*- coding:utf-8 -*-

from lib.core import Downloader, UrlManager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup

class SpiderMain(object):
  def __init__(self, root, threadNum):
    self.urls = UrlManager.UrlManager()
    self.download = Downloader.Downloader()
    self.root = root
    self.threadNum = threadNum
  
  def _judge(self, domain, url):
    if (url.find(domain) != -1):
      return True
    return False
  
  def _parse(self, page_url, content):
    if content is None:
      return
    soup = BeautifulSoup(content, 'html.parser')
    _news = self._get_new_urls(page_url, soup)
    return _news
    
  def _get_new_urls(self, page_url, soup):
    new_urls = set()
    links = soup.find_all('a')
    for link in links:
      new_url = link.get('href')
      new_full_url = urljoin(page_url, new_url)
      if (self._judge(self.root, new_full_url)):
        new_urls.add(new_full_url)
    return new_urls
    
  def craw(self):
    self.urls.add_new_url(self.root)
    while self.urls.has_new_url():
      _content = []
      th = []
      for i in list(range(self.threadNum)):
        if self.urls.has_new_url() is False:
          break
        new_url = self.urls.get_new_url()
        
        ## sql check
        try:
          if (sqlcheck.sqlcheck(new_url)):
            print("url:%s sqlcheck is valueable" % new_url)
        except:
          pass
            
        print("craw:" + new_url)
        t = threading.Thread(target = self.download.download, args = (new_url, _content))
        t.start()
        th.append(t)
      for t in th:
        t.join()
      for _str in _content:
        if _str is None:
          continue
        new_urls = self._parse(new_url, _str["html"])
        self.urls.add_new_urls(new_urls)

爬蟲(chóng)通過(guò)調(diào)用craw()方法傳入一個(gè)網(wǎng)址進(jìn)行爬行,然后采用多線程的方法下載待爬行的網(wǎng)站,下載之后的源碼用_parse方法調(diào)用BeautifulSoup進(jìn)行解析,之后將解析出的 URL 列表丟入 URL 管理器,這樣循環(huán),最后只要爬完了網(wǎng)頁(yè),爬蟲(chóng)就會(huì)停止

threading庫(kù)可以自定義需要開(kāi)啟的線程數(shù),線程開(kāi)啟后,每個(gè)線程會(huì)得到一個(gè) url 進(jìn)行下載,然后線程會(huì)阻塞,阻塞完畢后線程放行

爬蟲(chóng)和 SQL 檢查的結(jié)合

在lib/core/Spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 URL 地方調(diào)用一下

##sql check
try:
  if(sqlcheck.sqlcheck(new_url)):
    print("url:%s sqlcheck is valueable"%new_url)
except:
  pass

用try檢測(cè)可能出現(xiàn)的異常,繞過(guò)它,在文件w8ay.py中進(jìn)行測(cè)試

#-*- coding:utf-8 -*-
'''
Name: w8ayScan
Author: mathor
Copyright (c) 2019
'''
import sys
from lib.core.Spider import SpiderMain
def main():
  root = "https://wmathor.com"
  threadNum = 50
  w8 = SpiderMain(root, threadNum)
  w8.craw()
 
if __name__ == "__main__":
  main()

很重要的一點(diǎn)!為了使得lib和script文件夾中的.py文件可以可以被認(rèn)作是模塊,請(qǐng)?jiān)趌ib、lib/core和script文件夾中創(chuàng)建__init__.py文件,文件中什么都不需要寫

關(guān)于“怎么使用Python實(shí)現(xiàn)SQL注入檢測(cè)插件”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI