您好,登錄后才能下訂單哦!
threading模塊
局域網(wǎng)IP掃描實(shí)例
# 單線(xiàn)程:
import subprocess,time,threading a = time.clock() with open("check_ping.txt",'w') as f: for i in range(1,20): my_ip = ".".join(["192.163.37",str(i)]) try: subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True) except subprocess.CalledProcessError: pass else: f.write("%s 可以ping通\n"%my_ip) b = time.clock() print(b) # 總共花費(fèi)8s多
# 多線(xiàn)程(一)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)函數(shù)
a = time.clock() def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_1.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=check_ping,args=(my_ip,f)) #Thread方法:實(shí)例化一個(gè)線(xiàn)程對(duì)象,把函數(shù)(target)和參數(shù)(args)傳進(jìn)去,然后返回Thread實(shí)例,這里并沒(méi)有執(zhí)行。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() #執(zhí)行線(xiàn)程的start方法,線(xiàn)程開(kāi)始執(zhí)行 for i in num: threads[i].join() #這行線(xiàn)程的join方法,等待線(xiàn)程結(jié)束,如果主進(jìn)程不需要等待線(xiàn)程結(jié)束,可以不需要調(diào)用join方法。 if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費(fèi)0.9s
# 多線(xiàn)程(二)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類(lèi)實(shí)例
a = time.clock() class Thread_func: def __init__(self,func,args): self.func = func self.args = args def __call__(self): #__call__方法:將類(lèi)模擬成函數(shù),實(shí)例化后的類(lèi)再次實(shí)例化相當(dāng)于執(zhí)行了__call__方法。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_2.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=Thread_func(check_ping,(my_ip,f))) #Thread_func實(shí)例化時(shí)已經(jīng)傳入了參數(shù),所以Thread方法中就不用args來(lái)傳參了。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費(fèi)1
# 多線(xiàn)程(三)派生 Thread 的子類(lèi),并創(chuàng)建子類(lèi)的實(shí)例
a = time.clock() class Thread_func(threading.Thread): #繼承Thread,調(diào)用更靈活。 def __init__(self,func,args): threading.Thread.__init__(self) self.func = func self.args = args def run(self): #這里必須使用run方法,當(dāng)線(xiàn)程開(kāi)啟后執(zhí)行。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_3.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = Thread_func(check_ping,(my_ip,f)) threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 花費(fèi)0.7
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。