您好,登錄后才能下訂單哦!
進(jìn)程互斥鎖
多進(jìn)程同時(shí)搶購余票
# 并發(fā)運(yùn)行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯(cuò)亂 # data.json文件內(nèi)容為 {"ticket_num": 1} import json import time from multiprocessing import Process def search(user): with open('data.json', 'r', encoding='utf-8') as f: dic = json.load(f) print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...') def buy(user): with open('data.json', 'r', encoding='utf-8') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket_num'] > 0: dic['ticket_num'] -= 1 with open('data.json', 'w', encoding='utf-8') as f: json.dump(dic, f) print(f'用戶{user}搶票成功!') else: print(f'用戶{user}搶票失敗') def run(user): search(user) buy(user) if __name__ == '__main__': for i in range(10): # 模擬10個(gè)用戶搶票 p = Process(target=run, args=(f'用戶{i}', )) p.start()
使用鎖來保證數(shù)據(jù)安全
# data.json文件內(nèi)容為 {"ticket_num": 1} import json import time from multiprocessing import Process, Lock def search(user): with open('data.json', 'r', encoding='utf-8') as f: dic = json.load(f) print(f'用戶{user}查看余票,還剩{dic.get("ticket_num")}...') def buy(user): with open('data.json', 'r', encoding='utf-8') as f: dic = json.load(f) time.sleep(0.2) if dic['ticket_num'] > 0: dic['ticket_num'] -= 1 with open('data.json', 'w', encoding='utf-8') as f: json.dump(dic, f) print(f'用戶{user}搶票成功!') else: print(f'用戶{user}搶票失敗') def run(user, mutex): search(user) mutex.acquire() # 加鎖 buy(user) mutex.release() # 釋放鎖 if __name__ == '__main__': # 調(diào)用Lock()類得到一個(gè)鎖對象 mutex = Lock() for i in range(10): # 模擬10個(gè)用戶搶票 p = Process(target=run, args=(f'用戶{i}', mutex)) p.start()
進(jìn)程互斥鎖:
讓并發(fā)變成串行,犧牲了執(zhí)行效率,保證了數(shù)據(jù)安全
在程序并發(fā)時(shí),需要修改數(shù)據(jù)使用
隊(duì)列
隊(duì)列遵循的是先進(jìn)先出
隊(duì)列:相當(dāng)于內(nèi)存中一個(gè)隊(duì)列空間,可以存放多個(gè)數(shù)據(jù),但數(shù)據(jù)的順序是由先進(jìn)去的排在前面。
q.put() 添加數(shù)據(jù)
q.get() 取數(shù)據(jù),遵循隊(duì)列先進(jìn)先出
q.get_nowait() 獲取隊(duì)列數(shù)據(jù), 隊(duì)列中沒有就會(huì)報(bào)錯(cuò)
q.put_nowait 添加數(shù)據(jù),若隊(duì)列滿了也會(huì)報(bào)錯(cuò)
q.full() 查看隊(duì)列是否滿了
q.empty() 查看隊(duì)列是否為空
from multiprocessing import Queue # 調(diào)用隊(duì)列類,實(shí)例化隊(duì)列對象 q = Queue(5) # 隊(duì)列中存放5個(gè)數(shù)據(jù) # put添加數(shù)據(jù),若隊(duì)列里的數(shù)據(jù)滿了就會(huì)卡住 q.put(1) print('進(jìn)入數(shù)據(jù)1') q.put(2) print('進(jìn)入數(shù)據(jù)2') q.put(3) print('進(jìn)入數(shù)據(jù)3') q.put(4) print('進(jìn)入數(shù)據(jù)4') q.put(5) print('進(jìn)入數(shù)據(jù)5') # 查看隊(duì)列是否滿了 print(q.full()) # 添加數(shù)據(jù), 若隊(duì)列滿了也會(huì)報(bào)錯(cuò) q.put_nowait(6) # q.get() 獲取的數(shù)據(jù)遵循先進(jìn)先出 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # print(q.get()) print(q.get_nowait()) # 獲取隊(duì)列數(shù)據(jù), 隊(duì)列中沒有就會(huì)報(bào)錯(cuò) # 判斷隊(duì)列是否為空 print(q.empty()) q.put(6) print('進(jìn)入數(shù)據(jù)6')
進(jìn)程間通信
IPC(Inter-Process Communication)
進(jìn)程間數(shù)據(jù)是相互隔離的,若想實(shí)現(xiàn)進(jìn)程間通信,可以利用隊(duì)列
from multiprocessing import Process, Queue def task1(q): data = 'hello 你好' q.put(data) print('進(jìn)程1添加數(shù)據(jù)到隊(duì)列') def task2(q): print(q.get()) print('進(jìn)程2從隊(duì)列中獲取數(shù)據(jù)') if __name__ == '__main__': q = Queue() p1 = Process(target=task1, args=(q, )) p2 = Process(target=task2, args=(q, )) p1.start() p2.start() print('主進(jìn)程')
生產(chǎn)者與消費(fèi)者
在程序中,通過隊(duì)列生產(chǎn)者把數(shù)據(jù)添加到隊(duì)列中,消費(fèi)者從隊(duì)列中獲取數(shù)據(jù)
from multiprocessing import Process, Queue import time # 生產(chǎn)者 def producer(name, food, q): for i in range(10): data = food, i msg = f'用戶{name}開始制作{data}' print(msg) q.put(data) time.sleep(0.1) # 消費(fèi)者 def consumer(name, q): while True: data = q.get() if not data: break print(f'用戶{name}開始吃{data}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=('neo', '煎餅', q)) p2 = Process(target=producer, args=('wick', '肉包', q)) c1 = Process(target=consumer, args=('cwz', q)) c2 = Process(target=consumer, args=('woods', q)) p1.start() p2.start() c1.daemon = True c2.daemon = True c1.start() c2.start() print('主')
線程
線程的概念
進(jìn)程與線程都是虛擬單位
進(jìn)程:資源單位
線程:執(zhí)行單位
開啟一個(gè)進(jìn)程,一定會(huì)有一個(gè)線程,線程才是真正執(zhí)行者
開啟進(jìn)程:
開啟線程:
注意:線程不能實(shí)現(xiàn)并行,線程只能實(shí)現(xiàn)并發(fā),進(jìn)程可以實(shí)現(xiàn)并行
線程的兩種創(chuàng)建方式
from threading import Thread import time # 創(chuàng)建線程方式1 def task(): print('線程開啟') time.sleep(1) print('線程結(jié)束') if __name__ == '__main__': t = Thread(target=task) t.start() # 創(chuàng)建線程方式2 class MyThread(Thread): def run(self): print('線程開啟...') time.sleep(1) print('線程結(jié)束...') if __name__ == '__main__': t = MyThread() t.start()
線程對象的方法
from threading import Thread from threading import current_thread import time def task(): print(f'線程開啟{current_thread().name}') time.sleep(1) print(f'線程結(jié)束{current_thread().name}') if __name__ == '__main__': t = Thread(target=task) print(t.isAlive()) # t.daemon = True t.start() print(t.isAlive())
線程互斥鎖
線程之間數(shù)據(jù)是共享的
from threading import Thread from threading import Lock import time mutex = Lock() n = 100 def task(i): print(f'線程{i}啟動(dòng)') global n mutex.acquire() temp = n time.sleep(0.1) n = temp - 1 print(n) mutex.release() if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task, args=(i, )) t_l.append(t) t.start() for t in t_l: t.join() print(n)
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。