您好,登錄后才能下訂單哦!
小編給大家分享一下分布式TensorFlow運(yùn)行環(huán)境的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
1.分布式TensorFlow的角色與原理
在分布式的TensorFlow中的角色分配如下:
PS:作為分布式訓(xùn)練的服務(wù)端,等待各個(gè)終端(supervisors)來連接。
worker:在TensorFlow的代碼注釋中被稱為終端(supervisors),作為分布式訓(xùn)練的計(jì)算資源終端。
chief supervisors:在眾多的運(yùn)算終端中必須選擇一個(gè)作為主要的運(yùn)算終端。該終端在運(yùn)算終端中最先啟動(dòng),它的功能是合并各個(gè)終端運(yùn)算后的學(xué)習(xí)參數(shù),將其保存或者載入。
每個(gè)具體的網(wǎng)絡(luò)標(biāo)識(shí)都是唯一的,即分布在不同IP的機(jī)器上(或者同一個(gè)機(jī)器的不同端口)。在實(shí)際的運(yùn)行中,各個(gè)角色的網(wǎng)絡(luò)構(gòu)建部分代碼必須100%的相同。三者的分工如下:
服務(wù)端作為一個(gè)多方協(xié)調(diào)者,等待各個(gè)運(yùn)算終端來連接。
chief supervisors會(huì)在啟動(dòng)時(shí)同一管理全局的學(xué)習(xí)參數(shù),進(jìn)行初始化或者從模型載入。
其他的運(yùn)算終端只是負(fù)責(zé)得到其對(duì)應(yīng)的任務(wù)并進(jìn)行計(jì)算,并不會(huì)保存檢查點(diǎn),用于TensorBoard可視化中的summary日志等任何參數(shù)信息。
在整個(gè)過程都是通過RPC協(xié)議來進(jìn)行通信的。
2.分布部署TensorFlow的具體方法
配置過程中,首先建立一個(gè)server,在server中會(huì)將ps及所有worker的IP端口準(zhǔn)備好。接著,使用tf.train.Supervisor中的managed_ssion來管理一個(gè)打開的session。session中只是負(fù)責(zé)運(yùn)算,而通信協(xié)調(diào)的事情就都交給supervisor來管理了。
3.部署訓(xùn)練實(shí)例
下面開始實(shí)現(xiàn)一個(gè)分布式訓(xùn)練的網(wǎng)絡(luò)模型,以線性回歸為例,通過3個(gè)端口來建立3個(gè)終端,分別是一個(gè)ps,兩個(gè)worker,實(shí)現(xiàn)TensorFlow的分布式運(yùn)算。
1. 為每個(gè)角色添加IP地址和端口,創(chuàng)建sever,在一臺(tái)機(jī)器上開3個(gè)不同的端口,分別代表PS,chief supervisor和worker。角色的名稱用strjob_name表示,以ps為例,代碼如下:
# 定義IP和端口 strps_hosts = 'localhost:1681' strworker_hosts = 'localhost:1682,localhost:1683' # 定義角色名稱 strjob_name = 'ps' task_index = 0 # 將字符串轉(zhuǎn)數(shù)組 ps_hosts = strps_hosts.split(',') worker_hosts = strps_hosts.split(',') cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts}) # 創(chuàng)建server server = tf.train.Server({'ps':ps_hosts, 'worker':worker_hosts}, job_name=strjob_name, task_index=task_index)
2為ps角色添加等待函數(shù)
ps角色使用server.join函數(shù)進(jìn)行線程掛起,開始接受連續(xù)消息。
# ps角色使用join進(jìn)行等待 if strjob_name == 'ps': print("wait") server.join()
3.創(chuàng)建網(wǎng)絡(luò)的結(jié)構(gòu)
與正常的程序不同,在創(chuàng)建網(wǎng)絡(luò)結(jié)構(gòu)時(shí),使用tf.device函數(shù)將全部的節(jié)點(diǎn)都放在當(dāng)前任務(wù)下。在tf.device函數(shù)中的任務(wù)是通過tf.train.replica_device_setter來指定的。在tf.train.replica_device_setter中使用worker_device來定義具體任務(wù)名稱;使用cluster的配置來指定角色及對(duì)應(yīng)的IP地址,從而實(shí)現(xiàn)管理整個(gè)任務(wù)下的圖節(jié)點(diǎn)。代碼如下:
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d'%task_index, cluster=cluster_spec)): X = tf.placeholder('float') Y = tf.placeholder('float') # 模型參數(shù) w = tf.Variable(tf.random_normal([1]), name='weight') b = tf.Variable(tf.zeros([1]), name='bias') global_step = tf.train.get_or_create_global_step() # 獲取迭代次數(shù) z = tf.multiply(X, w) + b tf.summary('z', z) cost = tf.reduce_mean(tf.square(Y - z)) tf.summary.scalar('loss_function', cost) learning_rate = 0.001 optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost, global_step=global_step) saver = tf.train.Saver(max_to_keep=1) merged_summary_op = tf.summary.merge_all() # 合并所有summary init = tf.global_variables_initializer()
4.創(chuàng)建Supercisor,管理session
在tf.train.Supervisor函數(shù)中,is_chief表明為是否為chief Supervisor角色,這里將task_index=0的worker設(shè)置成chief Supervisor。saver需要將保存檢查點(diǎn)的saver對(duì)象傳入。init_op表示使用初始化變量的函數(shù)。
training_epochs = 2000 display_step = 2 sv = tf.train.Supervisor(is_chief=(task_index == 0),# 0號(hào)為chief logdir='log/spuer/', init_op=init, summary_op=None, saver=saver, global_step=global_step, save_model_secs=5) # 連接目標(biāo)角色創(chuàng)建session with sv.managed_session(saver.target) as sess:
5迭代訓(xùn)練
session中的內(nèi)容與以前一樣,直接迭代訓(xùn)練即可。由于使用了supervisor管理session,將使用sv.summary_computed函數(shù)來保存summary文件。
print('sess ok') print(global_step.eval(session=sess)) for epoch in range(global_step.eval(session=sess), training_epochs*len(train_x)): for (x, y) in zip(train_x, train_y): _, epoch = sess.run([optimizer, global_step], feed_dict={X: x, Y: y}) summary_str = sess.run(merged_summary_op, feed_dict={X: x, Y: y}) sv.summary_computed(sess, summary_str, global_step=epoch) if epoch % display_step == 0: loss = sess.run(cost, feed_dict={X:train_x, Y:train_y}) print("Epoch:", epoch+1, 'loss:', loss, 'W=', sess.run(w), w, 'b=', sess.run(b)) print(' finished ') sv.saver.save(sess, 'log/linear/' + "sv.cpk", global_step=epoch) sv.stop()
(1)在設(shè)置自動(dòng)保存檢查點(diǎn)文件后,手動(dòng)保存仍然有效,
(2)在運(yùn)行一半后,在運(yùn)行supervisor時(shí)會(huì)自動(dòng)載入模型的參數(shù),不需要手動(dòng)調(diào)用restore。
(3)在session中不需要進(jìn)行初始化的操作。
6.建立worker文件
新建兩個(gè)py文件,設(shè)置task_index分別為0和1,其他的部分和上述的代碼相一致。
strjob_name = 'worker' task_index = 1 strjob_name = 'worker' task_index = 0
7.運(yùn)行
我們分別啟動(dòng)寫好的三個(gè)文件,在運(yùn)行結(jié)果中,我們可以看到循環(huán)的次數(shù)不是連續(xù)的,顯示結(jié)果中會(huì)有警告,這是因?yàn)樵跇?gòu)建supervisor時(shí)沒有填寫local_init_op參數(shù),該參數(shù)的含義是在創(chuàng)建worker實(shí)例時(shí),初始化本地變量,上述代碼中沒有設(shè)置,系統(tǒng)會(huì)自動(dòng)初始化,并給出警告提示。
分布運(yùn)算的目的是為了提高整體運(yùn)算速度,如果同步epoch的準(zhǔn)確率需要犧牲總體運(yùn)行速度為代價(jià),自然很不合適。
在ps的文件中,它只是負(fù)責(zé)連接,并不參與運(yùn)算。
以上是“分布式TensorFlow運(yùn)行環(huán)境的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。