溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python隊(duì)列、進(jìn)程間通信、線程案例

發(fā)布時(shí)間:2020-08-24 08:18:20 來源:腳本之家 閱讀:182 作者:SetCreed 欄目:開發(fā)技術(shù)

進(jìn)程互斥鎖

多進(jìn)程同時(shí)搶購余票

# 并發(fā)運(yùn)行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯(cuò)亂
# data.json文件內(nèi)容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process
def search(user):
  with open('data.json', 'r', encoding='utf-8') as f:
    dic = json.load(f)
  print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...')
def buy(user):
  with open('data.json', 'r', encoding='utf-8') as f:
    dic = json.load(f)

  time.sleep(0.1)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('data.json', 'w', encoding='utf-8') as f:
      json.dump(dic, f)
    print(f'用戶{user}搶票成功!')

  else:
    print(f'用戶{user}搶票失敗')
def run(user):
  search(user)
  buy(user)
if __name__ == '__main__':
  for i in range(10): # 模擬10個(gè)用戶搶票
    p = Process(target=run, args=(f'用戶{i}', ))
    p.start()

使用鎖來保證數(shù)據(jù)安全

# data.json文件內(nèi)容為 {"ticket_num": 1}
import json
import time
from multiprocessing import Process, Lock
def search(user):
  with open('data.json', 'r', encoding='utf-8') as f:
    dic = json.load(f)
  print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...')
def buy(user):
  with open('data.json', 'r', encoding='utf-8') as f:
    dic = json.load(f)

  time.sleep(0.2)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('data.json', 'w', encoding='utf-8') as f:
      json.dump(dic, f)
    print(f'用戶{user}搶票成功!')

  else:
    print(f'用戶{user}搶票失敗')
def run(user, mutex):
  search(user)
  mutex.acquire() # 加鎖
  buy(user)
  mutex.release() # 釋放鎖
if __name__ == '__main__':
  # 調(diào)用Lock()類得到一個(gè)鎖對象
  mutex = Lock()

  for i in range(10): # 模擬10個(gè)用戶搶票
    p = Process(target=run, args=(f'用戶{i}', mutex))
    p.start()

進(jìn)程互斥鎖:

讓并發(fā)變成串行,犧牲了執(zhí)行效率,保證了數(shù)據(jù)安全

在程序并發(fā)時(shí),需要修改數(shù)據(jù)使用

隊(duì)列

隊(duì)列遵循的是先進(jìn)先出

隊(duì)列:相當(dāng)于內(nèi)存中一個(gè)隊(duì)列空間,可以存放多個(gè)數(shù)據(jù),但數(shù)據(jù)的順序是由先進(jìn)去的排在前面。

q.put() 添加數(shù)據(jù)

q.get() 取數(shù)據(jù),遵循隊(duì)列先進(jìn)先出

q.get_nowait() 獲取隊(duì)列數(shù)據(jù), 隊(duì)列中沒有就會(huì)報(bào)錯(cuò)

q.put_nowait 添加數(shù)據(jù),若隊(duì)列滿了也會(huì)報(bào)錯(cuò)

q.full() 查看隊(duì)列是否滿了

q.empty() 查看隊(duì)列是否為空

from multiprocessing import Queue

# 調(diào)用隊(duì)列類,實(shí)例化隊(duì)列對象
q = Queue(5)  # 隊(duì)列中存放5個(gè)數(shù)據(jù)

# put添加數(shù)據(jù),若隊(duì)列里的數(shù)據(jù)滿了就會(huì)卡住
q.put(1)
print('進(jìn)入數(shù)據(jù)1')
q.put(2)
print('進(jìn)入數(shù)據(jù)2')
q.put(3)
print('進(jìn)入數(shù)據(jù)3')
q.put(4)
print('進(jìn)入數(shù)據(jù)4')
q.put(5)
print('進(jìn)入數(shù)據(jù)5')

# 查看隊(duì)列是否滿了
print(q.full())

# 添加數(shù)據(jù), 若隊(duì)列滿了也會(huì)報(bào)錯(cuò)
q.put_nowait(6)

# q.get() 獲取的數(shù)據(jù)遵循先進(jìn)先出
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.get_nowait())  # 獲取隊(duì)列數(shù)據(jù), 隊(duì)列中沒有就會(huì)報(bào)錯(cuò)

# 判斷隊(duì)列是否為空
print(q.empty())
q.put(6)
print('進(jìn)入數(shù)據(jù)6')

進(jìn)程間通信

IPC(Inter-Process Communication)

進(jìn)程間數(shù)據(jù)是相互隔離的,若想實(shí)現(xiàn)進(jìn)程間通信,可以利用隊(duì)列

from multiprocessing import Process, Queue
def task1(q):
  data = 'hello 你好'
  q.put(data)
  print('進(jìn)程1添加數(shù)據(jù)到隊(duì)列')
def task2(q):
  print(q.get())
  print('進(jìn)程2從隊(duì)列中獲取數(shù)據(jù)')
if __name__ == '__main__':
  q = Queue()

  p1 = Process(target=task1, args=(q, ))
  p2 = Process(target=task2, args=(q, ))
  p1.start()
  p2.start()
  print('主進(jìn)程')

生產(chǎn)者與消費(fèi)者

在程序中,通過隊(duì)列生產(chǎn)者把數(shù)據(jù)添加到隊(duì)列中,消費(fèi)者從隊(duì)列中獲取數(shù)據(jù)

from multiprocessing import Process, Queue
import time


# 生產(chǎn)者
def producer(name, food, q):
  for i in range(10):
    data = food, i
    msg = f'用戶{name}開始制作{data}'
    print(msg)
    q.put(data)
    time.sleep(0.1)
# 消費(fèi)者
def consumer(name, q):
  while True:
    data = q.get()
    if not data:
      break

    print(f'用戶{name}開始吃{data}')
if __name__ == '__main__':
  q = Queue()
  p1 = Process(target=producer, args=('neo', '煎餅', q))
  p2 = Process(target=producer, args=('wick', '肉包', q))

  c1 = Process(target=consumer, args=('cwz', q))
  c2 = Process(target=consumer, args=('woods', q))

  p1.start()
  p2.start()
  
  c1.daemon = True
  c2.daemon = True
  c1.start()
  c2.start()
  print('主')

線程

線程的概念

進(jìn)程與線程都是虛擬單位

進(jìn)程:資源單位

線程:執(zhí)行單位

開啟一個(gè)進(jìn)程,一定會(huì)有一個(gè)線程,線程才是真正執(zhí)行者

開啟進(jìn)程:

  • 開辟一個(gè)名稱空間,每開啟一個(gè)進(jìn)程都會(huì)占用一份內(nèi)存資源
  • 會(huì)自帶一個(gè)線程

開啟線程:

  • 一個(gè)進(jìn)程可以開啟多個(gè)線程
  • 線程的開銷遠(yuǎn)小于進(jìn)程

注意:線程不能實(shí)現(xiàn)并行,線程只能實(shí)現(xiàn)并發(fā),進(jìn)程可以實(shí)現(xiàn)并行

線程的兩種創(chuàng)建方式

from threading import Thread
import time
# 創(chuàng)建線程方式1
def task():
  print('線程開啟')
  time.sleep(1)
  print('線程結(jié)束')

if __name__ == '__main__':
  t = Thread(target=task)
  t.start()
# 創(chuàng)建線程方式2
class MyThread(Thread):
  def run(self):
    print('線程開啟...')
    time.sleep(1)
    print('線程結(jié)束...')
if __name__ == '__main__':
  t = MyThread()
  t.start()

線程對象的方法

from threading import Thread
from threading import current_thread
import time

def task():
  print(f'線程開啟{current_thread().name}')
  time.sleep(1)
  print(f'線程結(jié)束{current_thread().name}')
if __name__ == '__main__':
  t = Thread(target=task)
  print(t.isAlive())
  # t.daemon = True
  t.start()
  print(t.isAlive())

線程互斥鎖

線程之間數(shù)據(jù)是共享的

from threading import Thread
from threading import Lock
import time

mutex = Lock()
n = 100

def task(i):
  print(f'線程{i}啟動(dòng)')
  global n
  mutex.acquire()
  temp = n
  time.sleep(0.1)
  n = temp - 1
  print(n)
  mutex.release()
  
if __name__ == '__main__':
  t_l = []
  for i in range(100):
    t = Thread(target=task, args=(i, ))
    t_l.append(t)
    t.start()

  for t in t_l:
    t.join()

  print(n)

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI