您好,登錄后才能下訂單哦!
本文小編為大家詳細(xì)介紹“Python Paramiko上傳下載sftp文件及遠(yuǎn)程執(zhí)行命令是什么”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Python Paramiko上傳下載sftp文件及遠(yuǎn)程執(zhí)行命令是什么”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。
Paramiko模塊是基于Python實(shí)現(xiàn)的SSH遠(yuǎn)程安全連接,用于SSH遠(yuǎn)程執(zhí)行命令、文件傳輸?shù)裙δ堋?/p>
默認(rèn)Python沒(méi)有自帶,需要手動(dòng)安裝:
pip3 install paramiko
#!/usr/bin/env python3 # coding: utf-8 import paramiko def sftp_upload_file(host,user,password,server_path, local_path,timeout=10): """ 上傳文件,注意:不支持文件夾 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param server_path: 遠(yuǎn)程路徑,比如:/home/sdn/tmp.txt :param local_path: 本地路徑,比如:D:/text.txt :param timeout: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: bool """ try: t = paramiko.Transport((host, 22)) t.banner_timeout = timeout t.connect(username=user, password=password) sftp = paramiko.SFTPClient.from_transport(t) sftp.put(local_path, server_path) t.close() return True except Exception as e: print(e) return False
測(cè)試一下上傳,完整代碼如下:
#!/usr/bin/env python3 # coding: utf-8 import paramiko def sftp_upload_file(host, user, password, server_path, local_path, timeout=10): """ 上傳文件,注意:不支持文件夾 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param server_path: 遠(yuǎn)程路徑,比如:/home/sdn/tmp.txt :param local_path: 本地路徑,比如:D:/text.txt :param timeout: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: bool """ try: t = paramiko.Transport((host, 22)) t.banner_timeout = timeout t.connect(username=user, password=password) sftp = paramiko.SFTPClient.from_transport(t) sftp.put(local_path, server_path) t.close() return True except Exception as e: print(e) return False if __name__ == '__main__': host = '192.168.10.1' user = 'xiao' password = 'xiao@1234' server_path = '/tmp/tmp.txt' local_path = 'D:/text.txt' res = sftp_upload_file(host, user, password, server_path, local_path) if not res: print("上傳文件: %s 失敗"%local_path) else: print("上傳文件: %s 成功" % local_path)
執(zhí)行輸出:
上傳文件: D:/text.txt 成功
def sftp_down_file(host,user,password,server_path, local_path,timeout=10): """ 下載文件,注意:不支持文件夾 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param server_path: 遠(yuǎn)程路徑,比如:/home/sdn/tmp.txt :param local_path: 本地路徑,比如:D:/text.txt :param timeout: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: bool """ try: t = paramiko.Transport((host,22)) t.banner_timeout = timeout t.connect(username=user,password=password) sftp = paramiko.SFTPClient.from_transport(t) sftp.get(server_path, local_path) t.close() return True except Exception as e: print(e) return False
測(cè)試一下,下載文件功能,完整代碼如下:
#!/usr/bin/env python3 # coding: utf-8 import paramiko def sftp_down_file(host,user,password,server_path, local_path,timeout=10): """ 下載文件,注意:不支持文件夾 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param server_path: 遠(yuǎn)程路徑,比如:/home/sdn/tmp.txt :param local_path: 本地路徑,比如:D:/text.txt :param timeout: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: bool """ try: t = paramiko.Transport((host,22)) t.banner_timeout = timeout t.connect(username=user,password=password) sftp = paramiko.SFTPClient.from_transport(t) sftp.get(server_path, local_path) t.close() return True except Exception as e: print(e) return False if __name__ == '__main__': host = '192.168.10.1' user = 'xiao' password = 'xiao@1234' server_path = '/tmp/tmp.txt' local_path = 'D:/text.txt' res = sftp_down_file(host, user, password, server_path, local_path) if not res: print("下載文件: %s 失敗"%server_path) else: print("下載文件: %s 成功" % server_path)
執(zhí)行輸出:
下載文件: /tmp/tmp.txt 成功
def ssh_exec_command(host,user,password, cmd,timeout=10): """ 使用ssh連接遠(yuǎn)程服務(wù)器執(zhí)行命令 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param cmd: 執(zhí)行的命令 :param seconds: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: dict """ result = {'status': 1, 'data': None} # 返回結(jié)果 try: ssh = paramiko.SSHClient() # 創(chuàng)建一個(gè)新的SSHClient實(shí)例 ssh.banner_timeout = timeout # 設(shè)置host key,如果在"known_hosts"中沒(méi)有保存相關(guān)的信息, SSHClient 默認(rèn)行為是拒絕連接, 會(huì)提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, 22, user, password, timeout=timeout) # 連接遠(yuǎn)程服務(wù)器,超時(shí)時(shí)間1秒 stdin, stdout, stderr = ssh.exec_command(cmd,get_pty=True,timeout=timeout) # 執(zhí)行命令 out = stdout.readlines() # 執(zhí)行結(jié)果,readlines會(huì)返回列表 # 執(zhí)行狀態(tài),0表示成功,1表示失敗 channel = stdout.channel status = channel.recv_exit_status() ssh.close() # 關(guān)閉ssh連接 # 修改返回結(jié)果 result['status'] = status result['data'] = out return result except Exception as e: print(e) print("錯(cuò)誤, 登錄服務(wù)器或者執(zhí)行命令超時(shí)!!! ip: {} 命令: {}".format(ip,cmd))return False
測(cè)試一下,遠(yuǎn)程執(zhí)行命令功能,完整代碼如下:
#!/usr/bin/env python3 # coding: utf-8 import paramiko def ssh_exec_command(host,user,password, cmd,timeout=10): """ 使用ssh連接遠(yuǎn)程服務(wù)器執(zhí)行命令 :param host: 主機(jī)名 :param user: 用戶(hù)名 :param password: 密碼 :param cmd: 執(zhí)行的命令 :param seconds: 超時(shí)時(shí)間(默認(rèn)),必須是int類(lèi)型 :return: dict """ result = {'status': 1, 'data': None} # 返回結(jié)果 try: ssh = paramiko.SSHClient() # 創(chuàng)建一個(gè)新的SSHClient實(shí)例 ssh.banner_timeout = timeout # 設(shè)置host key,如果在"known_hosts"中沒(méi)有保存相關(guān)的信息, SSHClient 默認(rèn)行為是拒絕連接, 會(huì)提示yes/no ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host, 22, user, password, timeout=timeout) # 連接遠(yuǎn)程服務(wù)器,超時(shí)時(shí)間1秒 stdin, stdout, stderr = ssh.exec_command(cmd,get_pty=True,timeout=timeout) # 執(zhí)行命令 out = stdout.readlines() # 執(zhí)行結(jié)果,readlines會(huì)返回列表 # 執(zhí)行狀態(tài),0表示成功,1表示失敗 channel = stdout.channel status = channel.recv_exit_status() ssh.close() # 關(guān)閉ssh連接 # 修改返回結(jié)果 result['status'] = status result['data'] = out return result except Exception as e: print(e) print("錯(cuò)誤, 登錄服務(wù)器或者執(zhí)行命令超時(shí)!!! ip: {} 命令: {}".format(ip,cmd)) return False if __name__ == '__main__': host = '192.168.10.1' user = 'xiao' password = 'xiao@1234' cmd = "cat /etc/issue | awk '{print $1,$2,$3}'" res = ssh_exec_command(host, user, password, cmd) # print(res) if not res or not res['data'] or res['status'] != 0: print("錯(cuò)誤, ip: {} 執(zhí)行命令: {} 失敗".format(host, cmd), "red") exit() value = res['data'][0].strip() # 獲取實(shí)際值 print("操作系統(tǒng)為: %s"%value)
執(zhí)行輸出:
操作系統(tǒng)為: Ubuntu 16.04.2 LTS
1. EllipticCurvePublicKey.public_bytes
Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding.
paramiko 2.4.2 依賴(lài) cryptography,而最新的cryptography==2.5里有一些棄用的API。
刪掉cryptography,安裝2.4.2,就不會(huì)報(bào)錯(cuò)了。
pip uninstall cryptography pip install cryptography==2.4.2
2. Error reading SSH protocol banner
Traceback (most recent call last):
File "/python3/lib/python3.5/site-packages/paramiko/transport.py", line 1966, in run
self._check_banner()
File "/python3/lib/python3.5/site-packages/paramiko/transport.py", line 2143, in _check_banner
"Error reading SSH protocol banner" + str(e)
paramiko.ssh_exception.SSHException: Error reading SSH protocol banner
Error reading SSH protocol banner
要解決這個(gè)問(wèn)題, 需要將paramiko的響應(yīng)等待時(shí)間調(diào)長(zhǎng)。 修改paramiko/transport.py文件中的
self.banner_timeout
值, 將其設(shè)為300或者其他較長(zhǎng)的值即可解決這個(gè)問(wèn)題。
讀到這里,這篇“Python Paramiko上傳下載sftp文件及遠(yuǎn)程執(zhí)行命令是什么”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。