溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Tensorflow中多線程與多進(jìn)程數(shù)據(jù)加載的示例分析

發(fā)布時(shí)間:2021-08-23 10:30:14 來源:億速云 閱讀:120 作者:小新 欄目:開發(fā)技術(shù)

這篇文章給大家分享的是有關(guān)Tensorflow中多線程與多進(jìn)程數(shù)據(jù)加載的示例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

1. 多線程數(shù)據(jù)讀取

第一種方法是可以直接從csv里讀取數(shù)據(jù),但返回值是tensor,需要在sess里run一下才能返回真實(shí)值,無法實(shí)現(xiàn)真正的并行處理,但如果直接用csv文件或其他什么文件存了特征值,可以直接讀取后進(jìn)行訓(xùn)練,可使用這種方法.

import tensorflow as tf

#這里是返回的數(shù)據(jù)類型,具體內(nèi)容無所謂,類型對(duì)應(yīng)就好了,比如我這個(gè),就是一個(gè)四維的向量,前三維是字符串類型 最后一維是int類型
record_defaults = [[""], [""], [""], [0]]


def decode_csv(line):
 parsed_line = tf.decode_csv(line, record_defaults)
 label = parsed_line[-1]  # label 
 del parsed_line[-1]   # delete the last element from the list
 features = tf.stack(parsed_line) # Stack features so that you can later vectorize forward prop., etc.
 #label = tf.stack(label)   #NOT needed. Only if more than 1 column makes the label...
 batch_to_return = features, label
 return batch_to_return

filenames = tf.placeholder(tf.string, shape=[None])
dataset5 = tf.data.Dataset.from_tensor_slices(filenames)
#在這里設(shè)置線程數(shù)目
dataset5 = dataset5.flat_map(lambda filename: tf.data.TextLineDataset(filename).skip(1).map(decode_csv,num_parallel_calls=15)) 
dataset5 = dataset5.shuffle(buffer_size=1000)
dataset5 = dataset5.batch(32) #batch_size
iterator5 = dataset5.make_initializable_iterator()
next_element5 = iterator5.get_next()

#這里是需要加載的文件名
training_filenames = ["train.csv"]
validation_filenames = ["vali.csv"]

with tf.Session() as sess:

 for _ in range(2):  
 	#通過文件名初始化迭代器
  sess.run(iterator5.initializer, feed_dict={filenames: training_filenames})
  while True:
   try:
   #這里獲得真實(shí)值
    features, labels = sess.run(next_element5)
    # Train...
   # print("(train) features: ")
   # print(features)
   # print("(train) labels: ")
   # print(labels) 
   except tf.errors.OutOfRangeError:
    print("Out of range error triggered (looped through training set 1 time)")
    break

 # Validate (cost, accuracy) on train set
 print("\nDone with the first iterator\n")

 sess.run(iterator5.initializer, feed_dict={filenames: validation_filenames})
 while True:
  try:
   features, labels = sess.run(next_element5)
   # Validate (cost, accuracy) on dev set
  # print("(dev) features: ")
  # print(features)
  # print("(dev) labels: ")
  # print(labels)
  except tf.errors.OutOfRangeError:
   print("Out of range error triggered (looped through dev set 1 time only)")
   break

第二種方法,基于生成器,可以進(jìn)行預(yù)處理操作了,sess里run出來的結(jié)果可以直接進(jìn)行輸入訓(xùn)練,但需要自己寫一個(gè)生成器,我使用的測(cè)試代碼如下:

import tensorflow as tf
import random
import threading
import numpy as np
from data import load_image,load_wave

class SequenceData():
 def __init__(self, path, batch_size=32):
  self.path = path
  self.batch_size = batch_size
  f = open(path)
  self.datas = f.readlines()
  self.L = len(self.datas)
  self.index = random.sample(range(self.L), self.L)
  
 def __len__(self):
  return self.L - self.batch_size
  
 def __getitem__(self, idx):
  batch_indexs = self.index[idx:(idx+self.batch_size)]
  batch_datas = [self.datas[k] for k in batch_indexs]
  img1s,img2s,audios,labels = self.data_generation(batch_datas)
  return img1s,img2s,audios,labels

 def gen(self):
  for i in range(100000):
   t = self.__getitem__(i)
   yield t

 def data_generation(self, batch_datas):
 	#預(yù)處理操作,數(shù)據(jù)在參數(shù)里
  return img1s,img2s,audios,labels

#這里的type要和實(shí)際返回的數(shù)據(jù)類型對(duì)應(yīng),如果在自己的處理代碼里已經(jīng)考慮的batchszie,那這里的batch設(shè)為1即可
dataset = tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen,
           output_types= (tf.float32,tf.float32,tf.float32,tf.int64))
dataset = dataset.map(lambda x,y,z,w : (x,y,z,w), num_parallel_calls=32).prefetch(buffer_size=1000)
X, y,z,w = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:
 for _ in range(100000):
  a,b,c,d = sess.run([X,y,z,w])
  print(a.shape)

不過python的多線程并不是真正的多線程,雖然看起來我是啟動(dòng)了32線程,但運(yùn)行時(shí)的CPU占用如下所示:

Tensorflow中多線程與多進(jìn)程數(shù)據(jù)加載的示例分析

還剩這么多核心空著,然后就是第三個(gè)版本了,使用了queue來緩存數(shù)據(jù),訓(xùn)練需要數(shù)據(jù)時(shí)直接從queue中進(jìn)行讀取,是一個(gè)到多進(jìn)程的過度版本(vscode沒法debug多進(jìn)程,坑啊,還以為代碼寫錯(cuò)了,在vscode里多進(jìn)程直接就沒法運(yùn)行),在初始化時(shí)啟動(dòng)多個(gè)線程進(jìn)行數(shù)據(jù)的預(yù)處理:

import tensorflow as tf
import random
import threading
import numpy as np
from data import load_image,load_wave
from queue import Queue

class SequenceData():
 def __init__(self, path, batch_size=32):
  self.path = path
  self.batch_size = batch_size
  f = open(path)
  self.datas = f.readlines()
  self.L = len(self.datas)
  self.index = random.sample(range(self.L), self.L)
  self.queue = Queue(maxsize=20)

  for i in range(32):
   threading.Thread(target=self.f).start()
 def __len__(self):
  return self.L - self.batch_size
 def __getitem__(self, idx):
  batch_indexs = self.index[idx:(idx+self.batch_size)]
  batch_datas = [self.datas[k] for k in batch_indexs]
  img1s,img2s,audios,labels = self.data_generation(batch_datas)
  return img1s,img2s,audios,labels
 
 def f(self):
  for i in range(int(self.__len__()/self.batch_size)):
   t = self.__getitem__(i)
   self.queue.put(t)

 def gen(self):
  while 1:
   yield self.queue.get()

 def data_generation(self, batch_datas):
  #數(shù)據(jù)預(yù)處理操作
  return img1s,img2s,audios,labels

#這里的type要和實(shí)際返回的數(shù)據(jù)類型對(duì)應(yīng),如果在自己的處理代碼里已經(jīng)考慮的batchszie,那這里的batch設(shè)為1即可
dataset = tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen,
           output_types= (tf.float32,tf.float32,tf.float32,tf.int64))
dataset = dataset.map(lambda x,y,z,w : (x,y,z,w), num_parallel_calls=1).prefetch(buffer_size=1000)
X, y,z,w = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:
 for _ in range(100000):
  a,b,c,d = sess.run([X,y,z,w])
  print(a.shape)

2. 多進(jìn)程數(shù)據(jù)讀取

這里的代碼和多線程的第三個(gè)版本非常類似,修改為啟動(dòng)進(jìn)程和進(jìn)程類里的Queue即可,但千萬不要在vscode里直接debug!在vscode里直接f5運(yùn)行進(jìn)程并不能啟動(dòng).

from __future__ import unicode_literals
from functools import reduce
import tensorflow as tf
import numpy as np
import warnings
import argparse
import skimage.io
import skimage.transform
import skimage
import scipy.io.wavfile
from multiprocessing import Process,Queue

class SequenceData():
 def __init__(self, path, batch_size=32):
  self.path = path
  self.batch_size = batch_size
  f = open(path)
  self.datas = f.readlines()
  self.L = len(self.datas) 
  self.index = random.sample(range(self.L), self.L)
  self.queue = Queue(maxsize=30)
  
  self.Process_num=32
  for i in range(self.Process_num):
   print(i,'start')
   ii = int(self.__len__()/self.Process_num)
   t = Process(target=self.f,args=(i*ii,(i+1)*ii))
   t.start()
 def __len__(self):
  return self.L - self.batch_size
 def __getitem__(self, idx):
  batch_indexs = self.index[idx:(idx+self.batch_size)]
  batch_datas = [self.datas[k] for k in batch_indexs]
  img1s,img2s,audios,labels = self.data_generation(batch_datas)
  return img1s,img2s,audios,labels
 
 def f(self,i_l,i_h):
  for i in range(i_l,i_h):
   t = self.__getitem__(i)
   self.queue.put(t)

 def gen(self):
  while 1:
   t = self.queue.get()
   yield t[0],t[1],t[2],t[3]

 def data_generation(self, batch_datas):
  #數(shù)據(jù)預(yù)處理操作
  return img1s,img2s,audios,labels

epochs = 2

data_g = SequenceData('train_1.csv',batch_size=48)
dataset = tf.data.Dataset().batch(1).from_generator(data_g.gen,
           output_types= (tf.float32,tf.float32,tf.float32,tf.float32))
X, y,z,w = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:

 tf.global_variables_initializer().run()
 for i in range(epochs):
  for j in range(int(len(data_g)/(data_g.batch_size))):
   face1,face2,voice, labels = sess.run([X,y,z,w])
   print(face1.shape)

然后,最后實(shí)現(xiàn)的效果

Tensorflow中多線程與多進(jìn)程數(shù)據(jù)加載的示例分析

感謝各位的閱讀!關(guān)于“Tensorflow中多線程與多進(jìn)程數(shù)據(jù)加載的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI