溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python使用Selenium模擬瀏覽器自動(dòng)操作功能

發(fā)布時(shí)間:2020-10-17 02:18:26 來源:腳本之家 閱讀:241 作者:Alan.hsiang 欄目:開發(fā)技術(shù)

概述

在進(jìn)行網(wǎng)站爬取數(shù)據(jù)的時(shí)候,會(huì)發(fā)現(xiàn)很多網(wǎng)站都進(jìn)行了反爬蟲的處理,如JS加密,Ajax加密,反Debug等方法,通過請(qǐng)求獲取數(shù)據(jù)和頁面展示的內(nèi)容完全不同,這時(shí)候就用到Selenium技術(shù),來模擬瀏覽器的操作,然后獲取數(shù)據(jù)。本文以一個(gè)簡單的小例子,簡述Python搭配Tkinter和Selenium進(jìn)行瀏覽器的模擬操作,僅供學(xué)習(xí)分享使用,如有不足之處,還請(qǐng)指正。

什么是Selenium?

Selenium是一個(gè)用于Web應(yīng)用程序測試的工具,Selenium測試直接運(yùn)行在瀏覽器中,就像真正的用戶在操作一樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多種操作系統(tǒng),如Windows、Linux、IOS等,如果需要支持Android,則需要特殊的selenium,本文主要以IE11瀏覽器為例。

安裝Selenium

通過pip install selenium 進(jìn)行安裝即可,如果速度慢,則可以使用國內(nèi)的鏡像進(jìn)行安裝。

涉及知識(shí)點(diǎn)

程序雖小,除了需要掌握的Html ,JavaScript,CSS等基礎(chǔ)知識(shí)外,本例涉及的Python相關(guān)知識(shí)點(diǎn)還是蠻多的,具體如下:

  • Selenium相關(guān):

Selenium進(jìn)行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8種方式。
Selenium獲取單一元素(如:find_element_by_xpath)和獲取元素?cái)?shù)組(如:find_elements_by_xpath)兩種方式。
Selenium元素定位后,可以給元素進(jìn)行賦值和取值,或者進(jìn)行相應(yīng)的事件操作(如:click)。

  • 線程(Thread)相關(guān):

為了防止前臺(tái)頁面卡主,本文用到了線程進(jìn)行后臺(tái)操作,如果要定義一個(gè)新的線程,只需要定義一個(gè)類并繼承threading.Thread,然后重寫run方法即可。
在使用線程的過程中,為了保證線程的同步,本例用到了線程鎖,如:threading.Lock()。

  • 隊(duì)列(queue)相關(guān):

本例將Selenium執(zhí)行的過程信息,保存到對(duì)列中,并通過線程輸出到頁面顯示。queue默認(rèn)先進(jìn)先出方式。
對(duì)列通過put進(jìn)行壓棧,通過get進(jìn)行出棧。通過qsize()用于獲取當(dāng)前對(duì)列元素個(gè)數(shù)。

  • 日志(logging.Logger)相關(guān):

為了保存Selenium執(zhí)行過程中的日志,本例用到了日志模塊,為Pyhton自帶的模塊,不需要額外安裝。
Python的日志共六種級(jí)別,分別是:NOTSET,DEBUG,INFO,WARN,ERROR,F(xiàn)ATAL,CRITICAL。

示例效果圖

本例主要針對(duì)某一配置好的商品ID進(jìn)行輪詢,監(jiān)控是否有貨,有貨則加入購物車,無貨則繼續(xù)輪詢,如下圖所示:

Python使用Selenium模擬瀏覽器自動(dòng)操作功能

核心代碼

本例最核心的代碼,就是利用Selenium進(jìn)行網(wǎng)站的模擬操作,如下所示:

class Smoking:
 """定義Smoking類"""

 # 瀏覽器驅(qū)動(dòng)
 __driver: webdriver = None
 # 配置幫助類
 __cfg_info: dict = {}
 # 日志幫助類
 __log_helper: LogHelper = None
 # 主程序目錄
 __work_path: str = ''
 # 是否正在運(yùn)行
 __running: bool = False
 # 無貨
 __no_stock = 'Currently Out of Stock'
 # 線程等待秒數(shù)
 __wait_sec = 2

 def __init__(self, work_path, cfg_info, log_helper: LogHelper):
  """初始化"""
  self.__cfg_info = cfg_info
  self.__log_helper = log_helper
  self.__work_path = work_path
  self.__wait_sec = int(cfg_info['wait_sec'])
  # 如果小于2,則等于2
  self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec)

 def checkIsExistsById(self, id):
  """通過ID判斷是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_id(id)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_id(id)) > 0
  except BaseException as e:
   return False

 def checkIsExistsByName(self, name):
  """通過名稱判斷是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_name(name)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_name(name)) > 0
  except BaseException as e:
   return False

 def checkIsExistsByPath(self, path):
  """通過xpath判斷是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_xpath(path)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_xpath(path)) > 0
  except BaseException as e:
   return False

 def checkIsExistsByClass(self, cls):
  """通過class名稱判斷是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_class_name(cls)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_class_name(cls)) > 0
  except BaseException as e:
   return False

 def checkIsExistsByLinkText(self, link_text):
  """判斷LinkText是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_link_text(link_text)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_link_text(link_text)) > 0
  except BaseException as e:
   return False

 def checkIsExistsByPartialLinkText(self, link_text):
  """判斷包含LinkText是否存在"""
  try:
   i = 0
   while self.__running and i < 3:
    if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0:
     break
    else:
     time.sleep(self.__wait_sec)
     i = i + 1
   return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0
  except BaseException as e:
   return False

 # def waiting(self, *locator):
 #  """等待完成"""
 #  # self.__driver.switch_to.window(self.__driver.window_handles[1])
 #  Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator))

 def login(self, username, password):
  """登錄"""
  # 5. 點(diǎn)擊鏈接跳轉(zhuǎn)到登錄頁面
  self.__driver.find_element_by_link_text('賬戶登錄').click()
  # 6. 輸入賬號(hào)密碼
  # 判斷是否加載完成
  # self.waiting((By.ID, "email"))
  if self.checkIsExistsById('email'):
   self.__driver.find_element_by_id('email').send_keys(username)
   self.__driver.find_element_by_id('password').send_keys(password)
   # 7. 點(diǎn)擊登錄按鈕
   self.__driver.find_element_by_id('sign-in').click()

 def working(self, item_id):
  """工作狀態(tài)"""
  while self.__running:
   try:
    # 正常獲取信息
    if self.checkIsExistsById('string'):
     self.__driver.find_element_by_id('string').clear()
     self.__driver.find_element_by_id('string').send_keys(item_id)
     self.__driver.find_element_by_id('string').send_keys(Keys.ENTER)
    # 判斷是否查詢到商品
    xpath = "http://div[@class='specialty-header search']/div[@class='specialty-description']/div[" \
      "@class='gt-450']/span[2] "
    if self.checkIsExistsByPath(xpath):
     count = int(self.__driver.find_element_by_xpath(xpath).text)
     if count < 1:
      time.sleep(self.__wait_sec)
      self.__log_helper.put('沒有查詢到item id =' + item_id + '對(duì)應(yīng)的信息')
      continue
    else:
     time.sleep(self.__wait_sec)
     self.__log_helper.put('沒有查詢到item id2 =' + item_id + '對(duì)應(yīng)的信息')
     continue
    # 判斷當(dāng)前庫存是否有貨

    xpath2 = "http://div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \
       "@class='price']/span[@class='noStock'] "
    if self.checkIsExistsByPath(xpath2):
     txt = self.__driver.find_element_by_xpath(xpath2).text
     if txt == self.__no_stock:
      # 當(dāng)前無貨
      time.sleep(self.__wait_sec)
      self.__log_helper.put('查詢一次' + item_id + ',無貨')
      continue

    # 鏈接path2
    xpath3 = "http://div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a"
    # 判斷是否加載完畢
    # self.waiting((By.CLASS_NAME, "imgDiv"))
    if self.checkIsExistsByPath(xpath3):
     self.__driver.find_element_by_xpath(xpath3).click()
     time.sleep(self.__wait_sec)
     # 加入購物車
     if self.checkIsExistsByClass('add-to-cart'):
      self.__driver.find_element_by_class_name('add-to-cart').click()
      self.__log_helper.put('加入購物車成功,商品item-id:' + item_id)
      break
     else:
      self.__log_helper.put('未找到加入購物車按鈕')
    else:
     self.__log_helper.put('沒有查詢到,可能是商品編碼不對(duì),或者已下架')
   except BaseException as e:
    self.__log_helper.put(e)

 def startRun(self):
  """運(yùn)行起來"""
  try:
   self.__running = True
   url: str = self.__cfg_info['url']
   username = self.__cfg_info['username']
   password = self.__cfg_info['password']
   item_id = self.__cfg_info['item_id']
   if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len(
     password) == 0 or item_id is None or len(item_id) == 0:
    self.__log_helper.put('配置信息不全,請(qǐng)檢查config.cfg文件是否為空,然后再重啟')
    return
   if self.__driver is None:
    options = webdriver.IeOptions()
    options.add_argument('encoding=UTF-8')
    options.add_argument('Accept= text / css, * / *')
    options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5')
    options.add_argument('Accept - Encoding= gzip, deflate')
    options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko')
    # 2. 定義瀏覽器驅(qū)動(dòng)對(duì)象
    self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options)
   self.run(url, username, password, item_id)
  except BaseException as e:
   self.__log_helper.put('運(yùn)行過程中出錯(cuò),請(qǐng)重新打開再試')

 def run(self, url, username, password, item_id):
  """運(yùn)行起來"""
  # 3. 訪問網(wǎng)站
  self.__driver.get(url)
  # 4. 最大化窗口
  self.__driver.maximize_window()
  if self.checkIsExistsByLinkText('賬戶登錄'):
   # 判斷是否登錄:未登錄
   self.login(username, password)
  if self.checkIsExistsByPartialLinkText('歡迎回來'):
   # 判斷是否登錄:已登錄
   self.__log_helper.put('登錄成功,下一步開始工作了')
   self.working(item_id)
  else:
   self.__log_helper.put('登錄失敗,請(qǐng)?jiān)O(shè)置賬號(hào)密碼')

 def stop(self):
  """停止"""
  try:
   self.__running = False
   # 如果驅(qū)動(dòng)不為空,則關(guān)閉
   self.close_browser_nicely(self.__driver)
   if self.__driver is not None:
    self.__driver.quit()
    # 關(guān)閉后切要為None,否則啟動(dòng)報(bào)錯(cuò)
    self.__driver = None
  except BaseException as e:
   print('Stop Failure')
  finally:
   self.__driver = None

 def close_browser_nicely(self, browser):
  try:
   browser.execute_script("window.onunload=null; window.onbeforeunload=null")
  except Exception as err:
   print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'")

  socket.setdefaulttimeout(10)
  try:
   browser.quit()
   print("Close browser and firefox by calling quit()")
  except Exception as err:
   print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err)))
  socket.setdefaulttimeout(30)

其他輔助類

日志類(LogHelper),代碼如下:

class LogHelper:
 """日志幫助類"""
 __queue: queue.Queue = None # 隊(duì)列
 __logging: logging.Logger = None # 日志
 __running: bool = False # 是否記錄日志

 def __init__(self, log_path):
  """初始化類"""
  self.__queue = queue.Queue(1000)
  self.init_log(log_path)

 def put(self, value):
  """添加數(shù)據(jù)"""
  # 記錄日志
  self.__logging.info(value)
  # 添加到隊(duì)列
  if self.__queue.qsize() < self.__queue.maxsize:
   self.__queue.put(value)

 def get(self):
  """獲取數(shù)據(jù)"""
  if self.__queue.qsize() > 0:
   try:
    return self.__queue.get(block=False)
   except BaseException as e:
    return None
  else:
   return None

 def init_log(self, log_path):
  """初始化日志"""
  self.__logging = logging.getLogger()
  self.__logging.setLevel(logging.INFO)
  # 日志
  rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time()))
  log_name = log_path + rq + '.log'
  logfile = log_name
  # if not os.path.exists(logfile):
  #  # 創(chuàng)建空文件
  #  open(logfile, mode='r')
  fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8')
  fh.setLevel(logging.DEBUG) # 輸出到file的log等級(jí)的開關(guān)
  # 第三步,定義handler的輸出格式
  formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
  fh.setFormatter(formatter)
  # 第四步,將logger添加到handler里面
  self.__logging.addHandler(fh)

 def get_running(self):
  # 獲取當(dāng)前記錄日志的狀態(tài)
  return self.__running

 def set_running(self, v: bool):
  # 設(shè)置當(dāng)前記錄日志的狀態(tài)

  self.__running = v

配置類(ConfigHelper)

class ConfigHelper:
 """初始化數(shù)據(jù)類"""

 __config_dir = None
 __dic_cfg = {}

 def __init__(self, config_dir):
  """初始化"""
  self.__config_dir = config_dir

 def ReadConfigInfo(self):
  """得到配置項(xiàng)"""
  parser = ConfigParser()
  parser.read(self.__config_dir + r"\config.cfg")
  section = parser.sections()[0]
  items = parser.items(section)
  self.__dic_cfg.clear()
  for item in items:
   self.__dic_cfg.__setitem__(item[0], item[1])

 def getConfigInfo(self):
  """獲取配置信息"""
  if len(self.__dic_cfg) == 0:
   self.ReadConfigInfo()
  return self.__dic_cfg

線程類(MyThread)

class MyThread(threading.Thread):
 """后臺(tái)監(jiān)控線程"""

 def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper):
  """線程初始化"""
  threading.Thread.__init__(self)
  self.threadID = tid
  self.name = name
  self.smoking = smoking
  self.log_helper = log_helper

 def run(self):
  print("開啟線程: " + self.name)
  self.log_helper.put("開啟線程: " + self.name)
  # 獲取鎖,用于線程同步
  # lock = threading.Lock()
  # lock.acquire()
  self.smoking.startRun()
  # 釋放鎖,開啟下一個(gè)線程
  # lock.release()
  print("結(jié)束線程: " + self.name)
  self.log_helper.put("結(jié)束線程: " + self.name)

備注

俠客行 [唐:李白]趙客縵胡纓,吳鉤霜雪明。銀鞍照白馬,颯沓如流星。
十步殺一人,千里不留行。事了拂衣去,深藏身與名。
閑過信陵飲,脫劍膝前橫。將炙啖朱亥,持觴勸侯嬴。
三杯吐然諾,五岳倒為輕。眼花耳熱后,意氣素霓生。
救趙揮金槌,邯鄲先震驚。千秋二壯士,烜赫大梁城。
縱死俠骨香,不慚世上英。誰能書閣下,白首太玄經(jīng)。

到此這篇關(guān)于Python使用Selenium模擬瀏覽器自動(dòng)操作的文章就介紹到這了,更多相關(guān)Python模擬瀏覽器自動(dòng)操作內(nèi)容請(qǐng)搜索億速云以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持億速云!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI