溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

利用TensorFlow怎么輸入數(shù)據(jù)

發(fā)布時(shí)間:2021-04-08 16:17:49 來源:億速云 閱讀:202 作者:Leah 欄目:開發(fā)技術(shù)

今天就跟大家聊聊有關(guān)利用TensorFlow怎么輸入數(shù)據(jù),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

TensorFlow輸入數(shù)據(jù)的方式有四種:

  1. tf.data API:可以很容易的構(gòu)建一個(gè)復(fù)雜的輸入通道(pipeline)(首選數(shù)據(jù)輸入方式)(Eager模式必須使用該API來構(gòu)建輸入通道)

  2. Feeding:使用Python代碼提供數(shù)據(jù),然后將數(shù)據(jù)feeding到計(jì)算圖中。

  3. QueueRunner:基于隊(duì)列的輸入通道(在計(jì)算圖計(jì)算前從隊(duì)列中讀取數(shù)據(jù))

  4. Preloaded data:用一個(gè)constant常量將數(shù)據(jù)集加載到計(jì)算圖中(主要用于小數(shù)據(jù)集)

1. tf.data API

關(guān)于tf.data.Dataset的更詳盡解釋請看《programmer's guide》。tf.data API能夠從不同的輸入或文件格式中讀取、預(yù)處理數(shù)據(jù),并且對數(shù)據(jù)應(yīng)用一些變換(例如,batching、shuffling、mapping function over the dataset),tf.data API 是舊的 feeding、QueueRunner的升級。

2. Feeding

注意:Feeding是數(shù)據(jù)輸入效率最低的方式,應(yīng)該只用于小數(shù)據(jù)集和調(diào)試(debugging)

TensorFlow的Feeding機(jī)制允許我們將數(shù)據(jù)輸入計(jì)算圖中的任何一個(gè)Tensor。因此可以用Python來處理數(shù)據(jù),然后直接將處理好的數(shù)據(jù)feed到計(jì)算圖中 。

run()eval()中用feed_dict來將數(shù)據(jù)輸入計(jì)算圖:

with tf.Session():
 input = tf.placeholder(tf.float32)
 classifier = ...
 print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

雖然你可以用feed data替換任何Tensor的值(包括variables和constants),但最好的使用方法是使用一個(gè)tf.placeholder節(jié)點(diǎn)(專門用于feed數(shù)據(jù))。它不用初始化,也不包含數(shù)據(jù)。一個(gè)placeholder沒有被feed數(shù)據(jù),則會(huì)報(bào)錯(cuò)。

使用placeholder和feed_dict的一個(gè)實(shí)例(數(shù)據(jù)集使用的是MNIST)見tensorflow/examples/tutorials/mnist/fully_connected_feed.py

3. QueueRunner

注意:這一部分介紹了基于隊(duì)列(Queue)API構(gòu)建輸入通道(pipelines),這一方法完全可以使用 tf.data API來替代。

一個(gè)基于queue的從文件中讀取records的通道(pipline)一般有以下幾個(gè)步驟:

  1. 文件名列表(The list of filenames)

  2. 文件名打亂(可選)(Optional filename shuffling)

  3. epoch限制(可選)(Optional epoch limit)

  4. 文件名隊(duì)列(Filename queue)

  5. 與文件格式匹配的Reader(A Reader for the file format)

  6. decoder(A decoder for a record read by the reader)

  7. 預(yù)處理(可選)(Optional preprocessing)

  8. Example隊(duì)列(Example queue)

3.1 Filenames, shuffling, and epoch limits

對于文件名列表,有很多方法:1. 使用一個(gè)constant string Tensor(比如:["file0", "file1"])或者 [("file%d" %i) for i in range(2)];2. 使用 tf.train.match_filenames_once 函數(shù);3. 使用 tf.gfile.Glob(path_pattern) 。

將文件名列表傳給 tf.train.string_input_producer 函數(shù)。string_input_producer 創(chuàng)建一個(gè) FIFO 隊(duì)列來保存(holding)文件名,以供Reader使用。

string_input_producer 可以對文件名進(jìn)行shuffle(可選)、設(shè)置一個(gè)最大迭代 epochs 數(shù)。在每個(gè)epoch,一個(gè)queue runner將整個(gè)文件名列表添加到queue,如果shuffle=True,則添加時(shí)進(jìn)行shuffle。This procedure provides a uniform sampling of files, so that examples are not under- or over- sampled relative to each other。

queue runner線程獨(dú)立于reader線程,所以enqueuing和shuffle不會(huì)阻礙reader。

3.2 File formats

要選擇與輸入文件的格式匹配的reader,并且要將文件名隊(duì)列傳遞給reader的 read 方法。read 方法輸出一個(gè) key identifying the file and record(在調(diào)試過程中非常有用,如果你有一些奇怪的 record)

3.2.1 CSV file

為了讀取逗號分隔符分割的text文件(csv),要使用一個(gè) tf.TextLineReader 和一個(gè) tf.decode_csv。例如:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
  value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
 # Start populating the filename queue.
 coord = tf.train.Coordinator()
 threads = tf.train.start_queue_runners(coord=coord)

 for i in range(1200):
  # Retrieve a single instance:
  example, label = sess.run([features, col5])

 coord.request_stop()
 coord.join(threads)

read 方法每執(zhí)行一次,會(huì)從文件中讀取一行。然后 decode_csv 將讀取的內(nèi)容解析成一個(gè)Tensor列表。參數(shù) record_defaults 決定解析產(chǎn)生的Tensor的類型,另外,如果輸入中有缺失值,則用record_defaults 指定的默認(rèn)值來填充。

在使用run或者eval 執(zhí)行 read 方法前,你必須調(diào)用 tf.train.start_queue_runners 去填充 queue。否則,read 方法將會(huì)堵塞(等待 filenames queue 中 enqueue 文件名)。

3.2.2 Fixed length records

為了讀取二進(jìn)制文件(二進(jìn)制文件中,每一個(gè)record都占固定bytes),需要使用一個(gè) tf.FixedLengthRecordReader 和 tf.decode_raw。decode_raw 將 reader 讀取的 string 解析成一個(gè)uint8 tensor。

例如,二進(jìn)制格式的CIFAR-10數(shù)據(jù)集中的每一個(gè)record都占固定bytes:label占1 bytes,然后后面的image數(shù)據(jù)占3072 bytes。當(dāng)你有一個(gè)unit8 tensor時(shí),通過切片便可以得到各部分并reformat成需要的格式。對于CIFAR-10數(shù)據(jù)集的reading和decoding,可以參照:tensorflow_models/tutorials/image/cifar10/cifar10_input.py或這個(gè)教程。

3.2.3 Standard TensorFlow format

另一個(gè)方法是將數(shù)據(jù)集轉(zhuǎn)換為一個(gè)支持的格式。這個(gè)方法使得數(shù)據(jù)集和網(wǎng)絡(luò)的混合和匹配變得簡單(make it easier to mix and match data sets and network architectures)。TensorFlow中推薦的格式是 TFRecords文件,TFRecords中包含 tf.train.Example protocol buffers (在這個(gè)協(xié)議下,特征是一個(gè)字段).

你寫一小段程序來獲取數(shù)據(jù),然后將數(shù)據(jù)填入一個(gè)Example protocol buffer,并將這個(gè) protocol buffer 序列化(serializes)為一個(gè)string,然后用 tf.python_io.TFRcordWriter 將這個(gè)string寫入到一個(gè)TFRecords文件中。例如,tensorflow/examples/how_tos/reading_data/convert_to_records.py 將MNIST數(shù)據(jù)集轉(zhuǎn)化為TFRecord格式。

讀取TFRecord文件的推薦方式是使用 tf.data.TFRecordDataset,像這個(gè)例子一樣:

dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.repeat(num_epochs)

# map takes a python function and applies it to every sample
dataset = dataset.map(decode)

為了完成相同的任務(wù),基于queue的輸入通道需要下面的代碼(使用的decode和上一段代碼一樣):

filename_queue = tf.train.string_input_producer([filename], num_epochs=num_epochs)
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
image,label = decode(serialized_example)

3.3 Preprocessing

然后你可以對examples進(jìn)行你想要的預(yù)處理(preprocessing)。預(yù)處理是獨(dú)立的(不依賴于模型參數(shù))。常見的預(yù)處理有:數(shù)據(jù)的標(biāo)準(zhǔn)化(normalization of your data)、挑選一個(gè)隨機(jī)的切片,添加噪聲(noise)或者畸變(distortions)等。具體的例子見:tensorflow_models/tutorials/image/cifar10/cifar10_input.py

3.4 Batching

在pipeline的末端,我們通過調(diào)用tf.train.shuffle_batch 來創(chuàng)建兩個(gè)queue,一個(gè)將example batch起來 for training、evaluation、inference;另一個(gè)來shuffle examples的順序。

例子:

def read_my_file_format(filename_queue):
 reader = tf.SomeReader()
 key, record_string = reader.read(filename_queue)
 example, label = tf.some_decoder(record_string)
 processed_example = some_processing(example)
 return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
 filename_queue = tf.train.string_input_producer(
   filenames, num_epochs=num_epochs, shuffle=True)
 example, label = read_my_file_format(filename_queue)
 # min_after_dequeue defines how big a buffer we will randomly sample
 #  from -- bigger means better shuffling but slower start up and more
 #  memory used.
 # capacity must be larger than min_after_dequeue and the amount larger
 #  determines the maximum we will prefetch. Recommendation:
 #  min_after_dequeue + (num_threads + a small safety margin) * batch_size
 min_after_dequeue = 10000
 capacity = min_after_dequeue + 3 * batch_size
 example_batch, label_batch = tf.train.shuffle_batch(
   [example, label], batch_size=batch_size, capacity=capacity,
   min_after_dequeue=min_after_dequeue)
 return example_batch, label_batch

如果你需要更多的并行或者打亂不同文件中example,使用多個(gè)reader,然后使用 tf.train.shuffle_batch_join將多個(gè)reader讀取的內(nèi)容整合到一起。(If you need more parallelism or shuffling of examples between files, use multiple reader instances using the tf.train.shuffle_batch_join)

例子:

def read_my_file_format(filename_queue):
 reader = tf.SomeReader()
 key, record_string = reader.read(filename_queue)
 example, label = tf.some_decoder(record_string)
 processed_example = some_processing(example)
 return processed_example, label

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
 filename_queue = tf.train.string_input_producer(
   filenames, num_epochs=num_epochs, shuffle=True)
 example_list = [read_my_file_format(filename_queue)
         for _ in range(read_threads)]
 min_after_dequeue = 10000
 capacity = min_after_dequeue + 3 * batch_size
 example_batch, label_batch = tf.train.shuffle_batch_join(
   example_list, batch_size=batch_size, capacity=capacity,
   min_after_dequeue=min_after_dequeue)
 return example_batch, label_batch

所有的reader共享一個(gè)filename queue。這種方式保證了不同的reader在同一個(gè)epoch,讀取不同的文件,直到所有的文件的已經(jīng)讀取完,然后在下一個(gè)epoch,重新從所有的文件讀?。╕ou still only use a single filename queue that is shared by all the readers. That way we ensure that the different readers use different files from the same epoch until all the files from the epoch have been started. (It is also usually sufficient to have a single thread filling the filename queue.))。

另一個(gè)可選的方法是去通過調(diào)用 tf.train.shuffle_batch 使用單個(gè)的reader,但是將參數(shù) num_threads 參數(shù)設(shè)置為大于1的值。這將使得在同一時(shí)間只能從一個(gè)文件讀取內(nèi)容(但是比 1 線程快),而不是同時(shí)從N個(gè)文件中讀取。這可能很重要:

  1. 如果你的num_threads參數(shù)值比文件的數(shù)量多,那么很有可能:有兩個(gè)threads會(huì)一前一后從同一個(gè)文件中讀取相同的example。這是不好的,應(yīng)該避免。

  2. 或者,如果并行地讀取N個(gè)文件,可能或?qū)е麓罅康拇疟P搜索(意思是,多個(gè)文件存在于磁盤的不同位置,而磁頭只能有一個(gè)位置,所以會(huì)增加磁盤負(fù)擔(dān))

那么需要多少個(gè)線程呢?tf.train.shuffle_batch*函數(shù)會(huì)給計(jì)算圖添加一個(gè)summary來記錄 example queue 的使用情況。如果你有足夠的reading threads,這個(gè)summary將會(huì)總大于0。你可以用TensorBoard來查看訓(xùn)練過程中的summaries

3.5 Creating threads to prefetch using QueueRunner objects

使用QueueRunner對象來創(chuàng)建threads來prefetch數(shù)據(jù)

說明:tf.train里的很多函數(shù)會(huì)添加tf.train.QueueRunner對象到你的graph。這些對象需要你在訓(xùn)練或者推理前,調(diào)用tf.train.start_queue_runners,否則數(shù)據(jù)無法讀取到圖中。調(diào)用tf.train.start_queue_runners會(huì)運(yùn)行輸入pipeline需要的線程,這些線程將example enqueue到隊(duì)列中,然后dequeue操作才能成功。這最好和tf.train.Coordinator配合著用,當(dāng)有錯(cuò)誤時(shí),它會(huì)完全關(guān)閉掉開啟的threads。如果你在創(chuàng)建pipline時(shí)設(shè)置了迭代epoch數(shù)限制,將會(huì)創(chuàng)建一個(gè)epoch counter的局部變量(需要初始化)。下面是推薦的代碼使用模板:

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
  while not coord.should_stop():
    # Run training steps or whatever
    sess.run(train_op)

except tf.errors.OutOfRangeError:
  print('Done training -- epoch limit reached')
finally:
  # When done, ask the threads to stop.
  coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()

這里的代碼是怎么工作的?

首先,我們創(chuàng)建整個(gè)圖。它的input pipeline將有幾個(gè)階段,這些階段通過Queue連在一起。第一個(gè)階段將會(huì)產(chǎn)生要讀取的文件的文件名,并將文件名enqueue到filename queue。第二個(gè)階段使用一個(gè)Reader來dequeue文件名并讀取,產(chǎn)生example,并將example enqueue到一個(gè)example queue。根據(jù)你的設(shè)置,你可能有很多第二階段(并行),所以你可以從并行地讀取多個(gè)文件。最后一個(gè)階段是一個(gè)enqueue操作,將example enqueue成一個(gè)queue,然后等待下一步操作。我們想要開啟多個(gè)線程運(yùn)行著些enqueue操作,所以我們的訓(xùn)練loop能夠從example queue中dequeue examples。

利用TensorFlow怎么輸入數(shù)據(jù)

tf.train里的輔助函數(shù)(創(chuàng)建了這些queue、enqueuing操作)會(huì)調(diào)用tf.train.add_queue——runner添加一個(gè)tf.train.QueueRunner到圖中。每一個(gè)QueueRunner負(fù)責(zé)一個(gè)階段。一旦圖構(gòu)建好,tf.train.start_queue_runners函數(shù)會(huì)開始圖中每一個(gè)QueueRunner的入隊(duì)操作。

如果一切進(jìn)行順利,你現(xiàn)在可以運(yùn)行訓(xùn)練step(后臺線程會(huì)填滿queue)。如果你設(shè)置了epoch限制,在達(dá)到固定的epoch時(shí),在進(jìn)行dequeuing會(huì)得到tf.errors.OutOfRangeError。這個(gè)錯(cuò)誤等價(jià)于EOF(end of file),意味著已經(jīng)達(dá)到了固定的epochs。

最后一部分是tf.train.Coordinator。它主要負(fù)責(zé)通知所有的線程是否應(yīng)該停止。在大多數(shù)情況下,這通常是因?yàn)橛龅搅艘粋€(gè)異常(exception)。例如,某一個(gè)線程在運(yùn)行某些操作時(shí)出錯(cuò)了(或者python的異常)。

關(guān)于threading、queues、QueueRunners、Coordinators的更多細(xì)節(jié)見這里

3.6 Filtering records or producing multiple examples per record

一個(gè)example的shape是 [x,y,z],一個(gè)batch的example的shape為 [batch, x, y, z]。如果你想去過濾掉這個(gè)record,你可以把 batch size 設(shè)置為 0;如果你想讓每一個(gè)record產(chǎn)生多個(gè)example,你可以把batch size設(shè)置為大于1。然后,在調(diào)用調(diào)用batching函數(shù)(shuffle_batchshuffle_batch_join)時(shí),設(shè)置enqueue_many=True。

3.7 Sparse input data

queues在SparseTensors的情況下不能很好的工作。如果你使用SparseTensors,你必須在batching后用tf.sparse_example來decode string records(而不是在batching前使用tf.parse_single_example來decode)

4. Preloaded data

這僅僅適用于小數(shù)據(jù)集,小數(shù)據(jù)集可以被整體加載到內(nèi)存。預(yù)加載數(shù)據(jù)集主要有兩種方法:

  1. 將數(shù)據(jù)集存儲(chǔ)成一個(gè)constant

  2. 將數(shù)據(jù)集存儲(chǔ)在一個(gè)variable中,一旦初始化或者assign to后,便不再改變。

使用一個(gè)constant更簡單,但是需要更多的內(nèi)存(因?yàn)樗械某A慷純?chǔ)存在計(jì)算圖中,而計(jì)算圖可能需要進(jìn)行多次復(fù)制)。

training_data = ...
training_labels = ...
with tf.Session():
 input_data = tf.constant(training_data)
 input_labels = tf.constant(training_labels)
 ...

為了使用一個(gè)varibale,在圖構(gòu)建好后,你需要去初始化它。

training_data = ...
training_labels = ...
with tf.Session() as sess:
 data_initializer = tf.placeholder(dtype=training_data.dtype,
                  shape=training_data.shape)
 label_initializer = tf.placeholder(dtype=training_labels.dtype,
                   shape=training_labels.shape)
 input_data = tf.Variable(data_initializer, trainable=False, collections=[])
 input_labels = tf.Variable(label_initializer, trainable=False, collections=[])
 ...
 sess.run(input_data.initializer,
      feed_dict={data_initializer: training_data})
 sess.run(input_labels.initializer,
      feed_dict={label_initializer: training_labels})

設(shè)置trainable=False將使variable不加入GraphKeys.TRAINABLE_VARIABLES容器,所以我們不用在訓(xùn)練過程中更新它。設(shè)置collections=[]將會(huì)使variable不加入GraphKeys.GLOBAL_VARIABLES容器(這個(gè)容器主要用于保存和恢復(fù)checkpoints)。

無論哪種方式,tf.train.slice_input_producer都能夠用來產(chǎn)生一個(gè)slice。這在整個(gè)epoch上shuffle了example,所以batching時(shí),進(jìn)一步的shuffling不再需要。所以不再使用shuffle_batch函數(shù),而使用tf.train.batch函數(shù)。為了使用多個(gè)預(yù)處理線程,設(shè)置num_threads參數(shù)大于1。

MNIST數(shù)據(jù)集上使用constant來preload數(shù)據(jù)的實(shí)例見tensorflow/examples/how_tos/reading_data/fully_connected_preloaded.py;使用variable來preload數(shù)據(jù)的例子見tensorflow/examples/how_tos/reading_data/fully_connected_preloaded_var.py,你可以通過 fully_connected_feed和 fully_connected_feed版本來對比兩種方式。

4. Multiple input pipelines

一般,你想要去在一個(gè)數(shù)據(jù)集上訓(xùn)練,而在另一個(gè)數(shù)據(jù)集上評估模型。實(shí)現(xiàn)這個(gè)想法的一種方式是:以兩個(gè)進(jìn)程,建兩個(gè)獨(dú)立的圖和session:

  1. 訓(xùn)練進(jìn)程讀取訓(xùn)練數(shù)據(jù),并且周期性地將模型的所有訓(xùn)練好的變量保存到checkpoint文件中。

  2. 評估進(jìn)程從checkpoint文件中恢復(fù)得到一個(gè)inference模型,這個(gè)模型讀取評估數(shù)據(jù)。

在estimators里和CIFAR-10模型示例里,采用就是上面的方法。該方法主要有兩個(gè)好處:

  1. 你的評估是在一個(gè)訓(xùn)練好的模型的快照上進(jìn)行的。

  2. 在訓(xùn)練完成或中斷后,你也可以進(jìn)行評估。

你可以在同一個(gè)進(jìn)程中同一個(gè)圖中進(jìn)行訓(xùn)練和評估,并且訓(xùn)練和評估共享訓(xùn)練好的參數(shù)和層。關(guān)于共享變量,詳見這里。

為了支持單個(gè)圖方法(single-graph approach),tf.data也提供了高級的iterator類型,它將允許用戶去在不重新構(gòu)建graph和session的情況下,改變輸入pipeline。

注意:盡管上面的實(shí)現(xiàn)很好,但很多op(比如tf.layers.batch_normalization和tf.layers.dropout)與模型模式有關(guān)(訓(xùn)練和評估時(shí),計(jì)算不一致),你必須很小心地去設(shè)置這些,如果你更改數(shù)據(jù)源。

看完上述內(nèi)容,你們對利用TensorFlow怎么輸入數(shù)據(jù)有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI