溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

分布式TensorFlow運(yùn)行環(huán)境的示例分析

發(fā)布時(shí)間:2021-08-23 10:30:50 來源:億速云 閱讀:167 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下分布式TensorFlow運(yùn)行環(huán)境的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

1.分布式TensorFlow的角色與原理

在分布式的TensorFlow中的角色分配如下:

PS:作為分布式訓(xùn)練的服務(wù)端,等待各個(gè)終端(supervisors)來連接。

worker:在TensorFlow的代碼注釋中被稱為終端(supervisors),作為分布式訓(xùn)練的計(jì)算資源終端。

chief supervisors:在眾多的運(yùn)算終端中必須選擇一個(gè)作為主要的運(yùn)算終端。該終端在運(yùn)算終端中最先啟動(dòng),它的功能是合并各個(gè)終端運(yùn)算后的學(xué)習(xí)參數(shù),將其保存或者載入。

每個(gè)具體的網(wǎng)絡(luò)標(biāo)識(shí)都是唯一的,即分布在不同IP的機(jī)器上(或者同一個(gè)機(jī)器的不同端口)。在實(shí)際的運(yùn)行中,各個(gè)角色的網(wǎng)絡(luò)構(gòu)建部分代碼必須100%的相同。三者的分工如下:

服務(wù)端作為一個(gè)多方協(xié)調(diào)者,等待各個(gè)運(yùn)算終端來連接。

chief supervisors會(huì)在啟動(dòng)時(shí)同一管理全局的學(xué)習(xí)參數(shù),進(jìn)行初始化或者從模型載入。

其他的運(yùn)算終端只是負(fù)責(zé)得到其對(duì)應(yīng)的任務(wù)并進(jìn)行計(jì)算,并不會(huì)保存檢查點(diǎn),用于TensorBoard可視化中的summary日志等任何參數(shù)信息。

在整個(gè)過程都是通過RPC協(xié)議來進(jìn)行通信的。

2.分布部署TensorFlow的具體方法

配置過程中,首先建立一個(gè)server,在server中會(huì)將ps及所有worker的IP端口準(zhǔn)備好。接著,使用tf.train.Supervisor中的managed_ssion來管理一個(gè)打開的session。session中只是負(fù)責(zé)運(yùn)算,而通信協(xié)調(diào)的事情就都交給supervisor來管理了。

3.部署訓(xùn)練實(shí)例

下面開始實(shí)現(xiàn)一個(gè)分布式訓(xùn)練的網(wǎng)絡(luò)模型,以線性回歸為例,通過3個(gè)端口來建立3個(gè)終端,分別是一個(gè)ps,兩個(gè)worker,實(shí)現(xiàn)TensorFlow的分布式運(yùn)算。

1. 為每個(gè)角色添加IP地址和端口,創(chuàng)建sever,在一臺(tái)機(jī)器上開3個(gè)不同的端口,分別代表PS,chief supervisor和worker。角色的名稱用strjob_name表示,以ps為例,代碼如下:

# 定義IP和端口
strps_hosts = 'localhost:1681'
strworker_hosts = 'localhost:1682,localhost:1683'
# 定義角色名稱
strjob_name = 'ps'
task_index = 0
# 將字符串轉(zhuǎn)數(shù)組
ps_hosts = strps_hosts.split(',')
worker_hosts = strps_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})
# 創(chuàng)建server
server = tf.train.Server({'ps':ps_hosts, 'worker':worker_hosts}, job_name=strjob_name, task_index=task_index)

2為ps角色添加等待函數(shù)

ps角色使用server.join函數(shù)進(jìn)行線程掛起,開始接受連續(xù)消息。

# ps角色使用join進(jìn)行等待
if strjob_name == 'ps':
  print("wait")
  server.join()

3.創(chuàng)建網(wǎng)絡(luò)的結(jié)構(gòu)

與正常的程序不同,在創(chuàng)建網(wǎng)絡(luò)結(jié)構(gòu)時(shí),使用tf.device函數(shù)將全部的節(jié)點(diǎn)都放在當(dāng)前任務(wù)下。在tf.device函數(shù)中的任務(wù)是通過tf.train.replica_device_setter來指定的。在tf.train.replica_device_setter中使用worker_device來定義具體任務(wù)名稱;使用cluster的配置來指定角色及對(duì)應(yīng)的IP地址,從而實(shí)現(xiàn)管理整個(gè)任務(wù)下的圖節(jié)點(diǎn)。代碼如下:

with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d'%task_index,
                       cluster=cluster_spec)):
  X = tf.placeholder('float')
  Y = tf.placeholder('float')
  # 模型參數(shù)
  w = tf.Variable(tf.random_normal([1]), name='weight')
  b = tf.Variable(tf.zeros([1]), name='bias')
  global_step = tf.train.get_or_create_global_step()  # 獲取迭代次數(shù)
  z = tf.multiply(X, w) + b
  tf.summary('z', z)
  cost = tf.reduce_mean(tf.square(Y - z))
  tf.summary.scalar('loss_function', cost)
  learning_rate = 0.001
  optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost, global_step=global_step)
  saver = tf.train.Saver(max_to_keep=1)
  merged_summary_op = tf.summary.merge_all() # 合并所有summary
  init = tf.global_variables_initializer()

4.創(chuàng)建Supercisor,管理session

在tf.train.Supervisor函數(shù)中,is_chief表明為是否為chief Supervisor角色,這里將task_index=0的worker設(shè)置成chief Supervisor。saver需要將保存檢查點(diǎn)的saver對(duì)象傳入。init_op表示使用初始化變量的函數(shù)。

training_epochs = 2000
display_step = 2
sv = tf.train.Supervisor(is_chief=(task_index == 0),# 0號(hào)為chief
             logdir='log/spuer/',
             init_op=init,
             summary_op=None,
             saver=saver,
             global_step=global_step,
             save_model_secs=5)
# 連接目標(biāo)角色創(chuàng)建session
with sv.managed_session(saver.target) as sess:

5迭代訓(xùn)練

session中的內(nèi)容與以前一樣,直接迭代訓(xùn)練即可。由于使用了supervisor管理session,將使用sv.summary_computed函數(shù)來保存summary文件。

print('sess ok')
  print(global_step.eval(session=sess))
  for epoch in range(global_step.eval(session=sess), training_epochs*len(train_x)):
    for (x, y) in zip(train_x, train_y):
      _, epoch = sess.run([optimizer, global_step], feed_dict={X: x, Y: y})
      summary_str = sess.run(merged_summary_op, feed_dict={X: x, Y: y})
      sv.summary_computed(sess, summary_str, global_step=epoch)
      if epoch % display_step == 0:
        loss = sess.run(cost, feed_dict={X:train_x, Y:train_y})
        print("Epoch:", epoch+1, 'loss:', loss, 'W=', sess.run(w), w, 'b=', sess.run(b))
  print(' finished ')
  sv.saver.save(sess, 'log/linear/' + "sv.cpk", global_step=epoch)
sv.stop()

(1)在設(shè)置自動(dòng)保存檢查點(diǎn)文件后,手動(dòng)保存仍然有效,

(2)在運(yùn)行一半后,在運(yùn)行supervisor時(shí)會(huì)自動(dòng)載入模型的參數(shù),不需要手動(dòng)調(diào)用restore。

(3)在session中不需要進(jìn)行初始化的操作。

6.建立worker文件

新建兩個(gè)py文件,設(shè)置task_index分別為0和1,其他的部分和上述的代碼相一致。

strjob_name = 'worker'
task_index = 1
strjob_name = 'worker'
task_index = 0

7.運(yùn)行

我們分別啟動(dòng)寫好的三個(gè)文件,在運(yùn)行結(jié)果中,我們可以看到循環(huán)的次數(shù)不是連續(xù)的,顯示結(jié)果中會(huì)有警告,這是因?yàn)樵跇?gòu)建supervisor時(shí)沒有填寫local_init_op參數(shù),該參數(shù)的含義是在創(chuàng)建worker實(shí)例時(shí),初始化本地變量,上述代碼中沒有設(shè)置,系統(tǒng)會(huì)自動(dòng)初始化,并給出警告提示。

分布運(yùn)算的目的是為了提高整體運(yùn)算速度,如果同步epoch的準(zhǔn)確率需要犧牲總體運(yùn)行速度為代價(jià),自然很不合適。

在ps的文件中,它只是負(fù)責(zé)連接,并不參與運(yùn)算。

以上是“分布式TensorFlow運(yùn)行環(huán)境的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI