溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python學(xué)習(xí)之進(jìn)程與線程

發(fā)布時間:2020-07-31 18:17:30 來源:網(wǎng)絡(luò) 閱讀:173 作者:霍金181 欄目:編程語言

進(jìn)程的創(chuàng)建
進(jìn)程 VS 程序
編寫完畢的代碼,在沒有運行的時候,稱之為程序
正在運行著的代碼,就成為進(jìn)程
注意: 進(jìn)程,除了包含代碼以外,還有需要運行的環(huán)境等,所以和程序是有區(qū)別的
進(jìn)程的創(chuàng)建
創(chuàng)建子進(jìn)程:
python學(xué)習(xí)之進(jìn)程與線程
Python的os模塊封裝了常?的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程 序中
輕松創(chuàng)建子進(jìn)程:

01創(chuàng)建子進(jìn)程

import os
import time
#定義一個全局變量money
money = 100
print("當(dāng)前進(jìn)程的pid:", os.getpid())
print("當(dāng)前進(jìn)程的父進(jìn)程pid:", os.getppid())
#time.sleep(115)

p = os.fork()
#子進(jìn)程返回的是0
if p == 0:
    money = 200
    print("子進(jìn)程返回的信息, money=%d" %(money))
#父進(jìn)程返回的是子進(jìn)程的pid
else:
    print("創(chuàng)建子進(jìn)程%s, 父進(jìn)程是%d" %(p,  os.getppid()))
    print(money)

02多進(jìn)程編程方法一實例化對象

Process([group [, target [, name [, args [, kwargs]]]]])
    target:表示這個進(jìn)程實例所調(diào)?對象;
    args:表示調(diào)?對象的位置參數(shù)元組;
    kwargs:表示調(diào)?對象的關(guān)鍵字參數(shù)字典;
    name:為當(dāng)前進(jìn)程實例的別名;
    group:?多數(shù)情況下?不到;

Process類常??法:
    is_alive(): 判斷進(jìn)程實例是否還在執(zhí)?;
    join([timeout]):    是否等待進(jìn)程實例執(zhí)?結(jié)束,或等待多少秒;
    start():        啟動進(jìn)程實例(創(chuàng)建?進(jìn)程);
    run():      如果沒有給定target參數(shù),對這個對象調(diào)?start()?法時,
                    就將執(zhí) ?對象中的run()?法;
    terminate():    不管任務(wù)是否完成,?即終?;

from multiprocessing import Process
import time

def task1():
    print("正在聽音樂")
    time.sleep(1)

def task2():
    print("正在編程......")
    time.sleep(0.5)

def no_multi():
    task1()
    task2()

def use_multi():
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    #p.join()  阻塞當(dāng)前進(jìn)程, 當(dāng)p1.start()之后, p1就提示主進(jìn)程, 需要等待p1進(jìn)程執(zhí)行結(jié)束才能向下執(zhí)行, 那么主進(jìn)程就乖乖等著, 自然不會執(zhí)行p2.start()
    #[process.join() for process in processes]

if __name__ == '__main__':
    # 主進(jìn)程
    start_time= time.time()
    #no_multi()
    use_multi()
    end_time = time.time()
    print(end_time-start_time
    進(jìn)程池

為什么需要進(jìn)程池Pool?
? 當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進(jìn)程,
十幾個還好,但如果是上百個,上千個目標(biāo),手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時
可以發(fā)揮進(jìn)程池的功效。
? Pool可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,
那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,
那么該請求就會等待,直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來它。
python學(xué)習(xí)之進(jìn)程與線程

03多進(jìn)程判斷素數(shù)與進(jìn)程池:

def is_prime(num):
    """判斷素數(shù)"""
    if num == 1:
        return False

    for i in range(2, num):
        if num % i == 0:
            return False
    else:
        return True

def task(num):
    if is_prime(num):
        print("%d是素數(shù)" % (num))

from multiprocessing import Process

#判斷1000-1200之間所有的素數(shù)
def use_mutli():
    ps = []
    #不要開啟太多進(jìn)程, 創(chuàng)建子進(jìn)程會耗費時間和空間(內(nèi)存);
    for num in range(1, 10000):
        # 實例化子進(jìn)程對象
        p = Process(target=task, args=(num,))
        #開啟子進(jìn)程
        p.start()
        #存儲所有的子進(jìn)程對象
        ps.append(p)

    #阻塞子進(jìn)程, 等待所有的子進(jìn)程執(zhí)行結(jié)束, 再執(zhí)行主進(jìn)程;
    [p.join() for p in ps]

#判斷1000-1200之間所有的素數(shù)
def no_mutli():
    for num in range(1, 100000):
        task(num)

def use_pool():
    """使用進(jìn)程池"""
    from multiprocessing import Pool
    from multiprocessing import  cpu_count  # 4個
    p = Pool(cpu_count())
    p.map(task, list(range(1, 100000)))
    p.close()  # 關(guān)閉進(jìn)程池
    p.join()  # 阻塞, 等待所有的子進(jìn)程執(zhí)行結(jié)束, 再執(zhí)行主進(jìn)程;

if __name__ == '__main__':
    import time

    start_time = time.time()
    #數(shù)據(jù)量大小          # 1000-1200             1-10000                 # 1-100000
    #no_mutli()        # 0.0077722072601       # 1.7887046337127686      # 90.75180315971375
     use_mutli()       # 1.806459665298462
    use_pool()          # 0.15455389022827148   # 1.2682361602783203      # 35.63375639915466
    end_time = time.time()
    print(end_time - start_time)

進(jìn)程間通信
How?
python學(xué)習(xí)之進(jìn)程與線程
可以使用multiprocessing模塊的Queue實現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,Queue 本身是
一個消息列隊程序。
Queue.qsize(): 返回當(dāng)前隊列包含的消息數(shù)量;
Queue.empty(): 如果隊列為空,返回True,反之False ;
Queue.full():
如果隊列滿了,返回True,反之False;
Queue.get([block[, timeout]]):
獲取隊列中的一條消息,然后將其從列隊中移除,block默認(rèn)值為True;
Queue.get_nowait():
相當(dāng)Queue.get(False);
Queue.put(item,[block[, timeout]]):
將item消息寫入隊列,block默認(rèn)值 為True;
Queue.put_nowait(item):
相當(dāng)Queue.put(item, False)
多線程編程:
線程(英語:thread)是操作系統(tǒng)能夠進(jìn)行運算調(diào)度的最小單位。它被包含在
進(jìn)程之中,是進(jìn)程中的實際運作單位。
python學(xué)習(xí)之進(jìn)程與線程
每個進(jìn)程至少有一個線程,即進(jìn)程本身。進(jìn)程可以啟動多個線程。操作系統(tǒng)像并
行“進(jìn)程”一樣執(zhí)行這些線程
python學(xué)習(xí)之進(jìn)程與線程
線程和進(jìn)程各自有什么區(qū)別和優(yōu)劣呢?
? 進(jìn)程是資源分配的最小單位,線程是程序執(zhí)行的最小單位。
? 進(jìn)程有自己的獨立地址空間。線程是共享進(jìn)程中的數(shù)據(jù)的,使用相同的地址空間.
? 進(jìn)程之間的通信需要以通信的方式(IPC)進(jìn)行。線程之間的通信更方便,同一進(jìn)程下的線程
共享全局變量、靜態(tài)變量等數(shù)據(jù),難點:處理好同步與互斥。
線程分類
有兩種不同的線程:
? 內(nèi)核線程
? 用戶空間線程或用戶線程
內(nèi)核線程是操作系統(tǒng)的一部分,而內(nèi)核中沒有實現(xiàn)用戶空間線程。
方法一分析
? 多線程程序的執(zhí)行順序是不確定的。
? 當(dāng)執(zhí)行到sleep語句時,線程將被阻塞(Blocked),到sleep結(jié)束后,線程進(jìn)入就緒
(Runnable)狀態(tài),等待調(diào)度。而線程調(diào)度將自行選擇一個線程執(zhí)行。
? 代碼中只能保證每個線程都運行完整個run函數(shù),但是線程的啟動順序、 run函數(shù)中
每次循環(huán)的執(zhí)行順序都不能確定。
python學(xué)習(xí)之進(jìn)程與線程

04 多進(jìn)程編程方法2:

"""
創(chuàng)建子類, 繼承的方式
"""
from multiprocessing import  Process
import time
class MyProcess(Process):
    """
    創(chuàng)建自己的進(jìn)程, 父類是Process
    """
    def __init__(self, music_name):
        super(MyProcess, self).__init__()
        self.music_name = music_name

    def run(self):
        """重寫run方法, 內(nèi)容是你要執(zhí)行的任務(wù)"""

        print("聽音樂%s" %(self.music_name))
        time.sleep(1)

#開啟進(jìn)程: p.start()  ====== p.run()
if __name__ == '__main__':
    for i in range(10):
        p = MyProcess("音樂%d" %(i))
        p.start()

05多線程實現(xiàn)的方法1

 """
通過實例化對象的方式實現(xiàn)多線程
"""
import time
import threading
def task():
    """當(dāng)前要執(zhí)行的任務(wù)"""
    print("聽音樂........")
    time.sleep(1)

if __name__ == '__main__':
    start_time = time.time()
    threads = []
    for  count in range(5):
        t = threading.Thread(target=task)
        #讓線程開始執(zhí)行任務(wù)
        t.start()
        threads.append(t)

    #等待所有的子線程執(zhí)行結(jié)束, 再執(zhí)行主線程;
    [thread.join() for thread in threads]
    end_time = time.time()
    print(end_time-start_time)

項目案例: 基于多線程的批量主機存活探測
項目描述: 如果要在本地網(wǎng)絡(luò)中確定哪些地址處于活動狀態(tài)或哪些計算機處于活動狀態(tài),
則可以使用此腳本。我們將依次ping地址, 每次都要等幾秒鐘才能返回值。這可以在Python
中編程,在IP地址的地址范圍內(nèi)有一個for循環(huán)和一個os.popen(“ping -q -c2”+ ip)。
項目瓶頸: 沒有線程的解決方案效率非常低,因為腳本必須等待每次ping。
優(yōu)點: 在一個進(jìn)程內(nèi)的所有線程共享全局變量,能夠在不使用其他方式的前提 下完成多線
程之間的數(shù)據(jù)共享(這點要比多進(jìn)程要好)
缺點: 線程是對全局變量隨意遂改可能造成多線程之間對全局變量 的混亂(即線程非安全)

07 ip地址歸屬地批量查詢?nèi)蝿?wù):

import requests
import json
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from threading import Thread

def task(ip):
    """獲取指定IP的所在城市和國家并存儲到數(shù)據(jù)庫中"""
    #獲取網(wǎng)址的返回內(nèi)容
    url = 'http://ip-api.com/json/%s' % (ip)
    try:
        response = requests.get(url)
    except Exception as e:
        print("網(wǎng)頁獲取錯誤:", e)
    else:
        # 默認(rèn)返回的是字符串
        """
        {"as":"AS174 Cogent Communications","city":"Beijing","country":"China","countryCode":"CN","isp":"China Unicom Shandong Province network","lat":39.9042,"lon":116.407,"org":"NanJing XinFeng Information Technologies, Inc.","query":"114.114.114.114","region":"BJ","regionName":"Beijing","status":"success","timezone":"Asia/Shanghai","zip":""}
        """
        contentPage = response.text
        #將頁面的json字符串轉(zhuǎn)換成便于處理的字典;
        data_dict = json.loads(contentPage)
        #獲取對應(yīng)的城市和國家
        city = data_dict.get('city', 'null')  # None
        country = data_dict.get('country', 'null')

        print(ip, city, country)
        #存儲到數(shù)據(jù)庫表中ips
        ipObj = IP(ip=ip, city=city, country=country)
        session.add(ipObj)
        session.commit()

if __name__ == '__main__':
    engine = create_engine("mysql+pymysql://root:westos@172.25.254.123/pymysql",
                           encoding='utf8',
                           # echo=True
                           )
    #創(chuàng)建緩存對象
    Session = sessionmaker(bind=engine)
    session = Session()

    #聲明基類
    Base = declarative_base()

    class IP(Base):
        __tablename__ = 'ips'
        id = Column(Integer, primary_key=True, autoincrement=True)
        ip = Column(String(20), nullable=False)
        city = Column(String(30))
        country = Column(String(30))

        def __repr__(self):
            return self.ip

    #創(chuàng)建數(shù)據(jù)表
    Base.metadata.create_all(engine)

    #1.1.1.1 -- 1.1.1.10
    threads = []
    for item in range(10):
        ip = '1.1.1.' + str(item + 1)  # 1.1.1.1 -1.1.1.10
        #task(ip)
        #多線程執(zhí)行任務(wù)
        thread = Thread(target=task, args=(ip,))
        #啟動線程并執(zhí)行任務(wù)
        thread.start()
        #存儲創(chuàng)建的所有線程對象;
        threads.append(thread)

    [thread.join() for thread in threads]
    print("任務(wù)執(zhí)行結(jié)束.........")
    print(session.query(IP).all())

07 多線程實現(xiàn)方法二:

"""
創(chuàng)建子類
"""

from threading import  Thread
class GetHostAliveThread(Thread):
    """
    創(chuàng)建子線程, 執(zhí)行的任務(wù):判斷指定的IP是否存活
    """
    def __init__(self, ip):
        super(GetHostAliveThread, self).__init__()
        self.ip = ip
    def run(self):
        # # 重寫run方法: 判斷指定的IP是否存活
        #"""
        #>>> # os.system()  返回值如果為0, 代表命令正確執(zhí)行,沒有報錯; 如果不為0, 執(zhí)行報錯;
        #...
        #>>> os.system('ping -c1 -w1 172.25.254.49 &> /dev/null')
        #0
        #>>> os.system('ping -c1 -w1 172.25.254.1 &> /dev/null')
        #256
        #"""
        import os
        #需要執(zhí)行的shell命令
        cmd = 'ping -c1 -w1 %s &> /dev/null' %(self.ip)
        result = os.system(cmd)
        #返回值如果為0, 代表命令正確執(zhí)行,沒有報錯; 如果不為0, 執(zhí)行報錯;
        if result != 0:
            print("%s主機沒有ping通" %(self.ip))
if __name__ == '__main__':
    print("打印172.25.254.0網(wǎng)段沒有使用的IP地址".center(50, '*'))
    for i in range(1, 255):
        ip = '172.25.254.' + str(i)
        thread = GetHostAliveThread(ip)
        thread.start()

共享全局變量:如何解決線程不安全問題?
GIL(global interpreter lock): python解釋器中任意時刻都只有一個線程在執(zhí)行;
Python代碼的執(zhí)行由Python 虛擬機(也叫解釋器主循環(huán),CPython版本)來控
制,Python 在設(shè)計之初就考慮到要在解釋器的主循環(huán)中,同時只有一個線程
在執(zhí)行,即在任意時刻,只有一個線程在解釋器中運行。對Python 虛擬機的
訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個
線程在運行。
python學(xué)習(xí)之進(jìn)程與線程
python學(xué)習(xí)之進(jìn)程與線程

08 共享數(shù)據(jù):

money = 0

def add():
    for i in range(1000000):
        global money
        lock.acquire()
        money += 1
        lock.release()
def reduce():
    for i in range(1000000):
        global money
        lock.acquire()
        money -= 1
        lock.release()
if __name__ == '__main__':
          from threading import  Thread, Lock
          # 創(chuàng)建線程鎖
          lock = Lock()
          t1 = Thread(target=add)
          t2 = Thread(target=reduce)
          t1.start()
          t2.start()
          t1.join()
          t2.join()
          print(money)

線程同步
線程同步:即當(dāng)有一個線程在對內(nèi)存進(jìn)行操作時,其他線程都不可以對這個內(nèi)
存地址進(jìn)行操作,直到該線程完成操作, 其他線程才能對該內(nèi)存地址進(jìn)行操作.
同步就是協(xié)同步調(diào),按預(yù)定的先后次序進(jìn)行運行。如:你說完,我再說。
"同"字從字面上容易理解為一起動作 其實不是,
"同"字應(yīng)是指協(xié)同、協(xié)助、互相配合。
死鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時 等待對方的資
源,就會造成死鎖。
python學(xué)習(xí)之進(jìn)程與線程

09 死鎖問題:

"""

在線程間共享多個資源的時候,如果兩個線程分別占有?部分資源并且同時 等待對?的資源,就會造成死鎖。

"""

import time
import threading

class Account(object):
    def __init__(self, id, money, lock):
        self.id = id
        self.money = money
        self.lock = lock

    def reduce(self, money):
        self.money -= money

    def add(self, money):
        self.money += money

def transfer(_from, to, money):
    if _from.lock.acquire():
        _from.reduce(money)

        time.sleep(1)
        if to.lock.acquire():
            to.add(money)
            to.lock.release()
        _from.lock.release()

if __name__ == '__main__':
    a = Account('a', 1000, threading.Lock())  # 900
    b = Account('b', 1000, threading.Lock())  # 1100

    t1 = threading.Thread(target=transfer, args=(a, b, 200))
    t2 = threading.Thread(target=transfer, args=(b, a, 100))
    t1.start()
    t2.start()
    print(a.money)
    print(b.money)

協(xié)程,又稱微線程,纖程。英文名Coroutine。協(xié)程看上去也是子程序,但執(zhí)行過程中,
在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。
python學(xué)習(xí)之進(jìn)程與線程
協(xié)程優(yōu)勢
? 執(zhí)行效率極高,因為子程序切換(函數(shù))不是線程切換,由程序自身控制,
? 沒有切換線程的開銷。所以與多線程相比,線程的數(shù)量越多,協(xié)程性能的優(yōu)
勢越明顯。
?
不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在控
制共享資源時也不需要加鎖,因此執(zhí)行效率高很多
gevent實現(xiàn)協(xié)程
基本思想:
當(dāng)一個greenlet遇到IO操作時,比如訪問網(wǎng)絡(luò),就自動切換到其他的greenlet,等到
IO操作完成,再在適當(dāng)?shù)臅r候切換回來繼續(xù)執(zhí)行。由于IO操作非常耗時,經(jīng)常使程
序處于等待狀態(tài),有了gevent為我們自動切換協(xié)程,就保證總有g(shù)reenlet在運行,而
不是等待IO。

10 gevent實現(xiàn)協(xié)程

import gevent

import requests
import json

from gevent import monkey
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from threading import Thread

from gevent import monkey
#打補丁
monkey.patch_all()

def task(ip):
    """獲取指定IP的所在城市和國家并存儲到數(shù)據(jù)庫中"""
    #獲取網(wǎng)址的返回內(nèi)容
    url = 'http://ip-api.com/json/%s' % (ip)
    try:
        response = requests.get(url)
    except Exception as e:
        print("網(wǎng)頁獲取錯誤:", e)
    else:
        # 默認(rèn)返回的是字符串
        """
        {"as":"AS174 Cogent Communications","city":"Beijing","country":"China","countryCode":"CN","isp":"China Unicom Shandong Province network","lat":39.9042,"lon":116.407,"org":"NanJing XinFeng Information Technologies, Inc.","query":"114.114.114.114","region":"BJ","regionName":"Beijing","status":"success","timezone":"Asia/Shanghai","zip":""}
        """
        contentPage = response.text
        #將頁面的json字符串轉(zhuǎn)換成便于處理的字典;
        data_dict = json.loads(contentPage)
        #獲取對應(yīng)的城市和國家
        city = data_dict.get('city', 'null')  # None
        country = data_dict.get('country', 'null')

        print(ip, city, country)
        #存儲到數(shù)據(jù)庫表中ips
        ipObj = IP(ip=ip, city=city, country=country)
        session.add(ipObj)
        session.commit()

if __name__ == '__main__':
    engine = create_engine("mysql+pymysql://root:westos@172.25.254.123/pymysql",
                           encoding='utf8',
                           # echo=True
                           )
    #創(chuàng)建緩存對象
    Session = sessionmaker(bind=engine)
    session = Session()

    #聲明基類
    Base = declarative_base()

    class IP(Base):
        __tablename__ = 'ips'
        id = Column(Integer, primary_key=True, autoincrement=True)
        ip = Column(String(20), nullable=False)
        city = Column(String(30))
        country = Column(String(30))

        def __repr__(self):
            return self.ip

    #創(chuàng)建數(shù)據(jù)表
    Base.metadata.create_all(engine)

    #使用協(xié)程
    gevents = [gevent.spawn(task, '1.1.1.' + str(ip + 1)) for ip in range(10)]
    gevent.joinall(gevents)
    print("執(zhí)行結(jié)束....")

總結(jié)
python學(xué)習(xí)之進(jìn)程與線程

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI