提问者:小点点

Keras/Tensorflow中的类生成器(继承序列)线程安全吗?


为了加快模型的训练速度,在CPU上填充/生成批处理并在GPU上并行运行模型训练似乎是一种很好的做法。为此,可以用Python编写生成器类,该类继承序列类。

这里是留档的链接:https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence

文件中指出的重要事项是:

序列是进行多重处理的更安全的方法。这种结构保证了网络在每个时代对每个样本只训练一次,而生成器则不是这样。

下面给出了一个简单的代码示例:

from skimage.io import imread
from skimage.transform import resize
import numpy as np
import math

# Here, `x_set` is list of path to the images
# and `y_set` are the associated classes.

class CIFAR10Sequence(Sequence):

    def __init__(self, x_set, y_set, batch_size):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return math.ceil(len(self.x) / self.batch_size)

    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) *
        self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) *
        self.batch_size]

        return np.array([
            resize(imread(file_name), (200, 200))
               for file_name in batch_x]), np.array(batch_y)

据我所知,理想情况下需要在模型中完成的是创建这个生成器类的实例,并将其交给fit_generator(...)函数。

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=True,
                    workers=6)

以下是来自Keras文档的引用:

keras的使用。乌提尔斯。序列保证了顺序,并保证在使用use\u multiprocessing=True时,每个历元的每个输入都可以单独使用。

在这种情况下,我假设这个设置是线程安全的。问题1)我的假设正确吗?

不过,一个令人困惑的问题是,在视窗10上,参数use_multiprocessing可能不会设置为True。Keras不允许它;似乎它只能在Linux上设置为True。(不知道在其他平台怎么样。)但是Worker参数仍然可以设置为大于0的值。

让我们来看看这两个参数的定义:

workers:Integer。使用基于进程的线程时要加速的最大进程数。如果未指定,工人将默认为1。如果为0,将在主线程上执行生成器。

使用多处理:布尔值。如果为True,则使用基于进程的线程。如果未指定,use_multiprocessing将默认为False。请注意,由于此实现依赖于多处理,因此不应将不可拾取的参数传递给生成器,因为它们无法轻松传递给子进程。

因此,通过使用工人参数,似乎可以创建多个进程来加快训练,而不管use_multiprocessing是否为True。

如果要使用生成器类继承序列(在Windows 10上),则必须将use\u multiprocessing设置为False,如下所示:

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=False,  # CHANGED
                    workers=6)

这里仍然有多个进程在运行,因为workers=6。

问题2)在将use_multiprocessing参数设置为False后,这个设置仍然是线程安全的还是线程安全特性丢失了?基于留档我无法说清楚。

问题3)仍与此主题相关。。。当以这种方式进行训练时,数据由CPU生成并在GPU上进行训练,如果正在训练的模型很浅,GPU的利用率会非常低,CPU的利用率会显著提高,因为GPU会一直等待来自CPU的数据。在这种情况下,有没有办法利用一些GPU资源来生成数据?


共2个答案

匿名用户

在那些看过这篇文章的人中,似乎没有人有最终的答案,所以我想给出适合我的答案。由于该领域缺乏文档,我的答案可能缺少一些相关细节。请随意添加更多信息,我在这里没有提到。

看起来,用Python编写继承序列的生成器类在Windows中是不受支持的。(您似乎可以让它在Linux上工作。)要使其工作,您需要设置参数use\u multiprocessing=True(使用类方法)。但它不能像前面提到的那样在Windows上工作,因此您必须将use\u multiprocessing设置为False(在Windows上)。然而,这并不意味着多处理在Windows上不起作用。即使您设置了use\u multiprocessing=False,当使用以下设置运行代码时,仍然可以支持多处理,您只需将workers参数设置为大于1的任何值。

例子:

history = \
   merged_model.fit_generator(generator=train_generator,
                              steps_per_epoch=trainset_steps_per_epoch,
                              epochs=300,
                              verbose=1,
                              use_multiprocessing=False,
                              workers=3,
                              max_queue_size=4)

现在,让我们再次记住Keras文档:

keras的使用。乌提尔斯。当使用use_multiprocessing=True时,Sequence保证顺序并保证每个历元的每个输入的单一使用。

据我所知,如果使用\u multiprocessing=False,则生成器不再是线程安全的,这使得编写继承序列的生成器类变得困难。

为了解决这个问题,我自己写了一个生成器,手动创建线程安全。下面是一个伪代码示例:

import tensorflow as tf
import threading

class threadsafe_iter:
    """Takes an iterator/generator and makes it thread-safe by
    serializing call to the `next` method of given iterator/generator.
    """
    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self): # Py3
        return next(self.it)

    #def next(self):     # Python2 only
    #    with self.lock:
    #        return self.it.next()

def threadsafe_generator(f):
    """A decorator that takes a generator function and makes it thread-safe.
    """
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g


@threadsafe_generator
def generate_data(tfrecord_file_path_list, ...):

    dataset = tf.data.TFRecordDataset(tfrecord_file_path_list)

    # example proto decode
    def _parse_function(example_proto):
      ...
      return batch_data

    # Parse the record into tensors.
    dataset = dataset.map(_parse_function)  

    dataset = dataset.shuffle(buffer_size=100000)

    # Repeat the input indefinitly
    dataset = dataset.repeat()  

    # Generate batches
    dataset = dataset.batch(batch_size)

    # Create an initializable iterator
    iterator = dataset.make_initializable_iterator()

    # Get batch data
    batch_data = iterator.get_next()

    iterator_init_op = iterator.make_initializer(dataset)

    with tf.Session() as sess:

        sess.run(iterator_init_op)

        while True:            
            try:
                batch_data = sess.run(batch_data)
            except tf.errors.OutOfRangeError:
                break
            yield batch_data

嗯,如果这样做真的很优雅,可以讨论一下,但是看起来效果很好。

总结如下:

  • 如果在Windows上编写程序,请将use\u multiprocessing设置为False
  • (据我所知,到今天为止)在Windows上编写代码时,不支持编写继承序列的生成器类。(我想这是一个Tensorflow/Keras问题)
  • 要解决此问题,请编写一个普通生成器,使生成器线程安全,并将workers设置为大于1的数字

重要提示:在此设置中,生成器在CPU上运行,训练在GPU上进行。我可以观察到的一个问题是,如果你正在训练的模型足够浅,GPU的利用率仍然很低,而CPU利用率却很高。如果模型很浅,数据集足够小,那么将所有数据存储在内存中并在GPU上运行所有数据可能是一个不错的选择。它应该会大大加快训练。如果出于任何原因,您希望同时使用CPU和GPU,我的适度建议是尝试使用Tensorflow的tf.dataAPI,这可以显著加快数据预处理和批处理准备。如果生成器只用Python编写,GPU会一直等待数据继续训练。人们可以说Tensorflow/Keras留档的一切,但它确实是高效的代码!

如果有人对API有更全面的了解,并且看到了这篇文章,请随时在这里纠正我,以防我理解错误,或者API被更新以解决Windows上的问题。

匿名用户

我提出了一个可能引起其他人兴趣的“改进”解决方案。请注意,这来自我使用Tensorflow 1.15的经验(我还没有使用版本2)。

在Windows上安装wslVersion2,在Linux环境(如Ubuntu)中安装Tensorflow,然后将use\u multiprocessing设置为True,以使其正常工作。

注意:Windows Linux子shell(WSL)版本2仅在Windows 10、版本1903、版本18362或更高版本中可用。请确保在Windows Update中升级您的Windows版本以使其正常工作。

请参阅在WSL2上安装Tensorflow GPU

对于多任务多线程(即并行并发),我们必须考虑两种操作:

  • forking=父进程创建了一个自身(子进程)的副本,该副本具有它使用的所有内存段的精确副本
  • spawning=父进程创建一个不共享内存的全新子进程,父进程必须等待子进程完成后才能继续

Linux支持分叉,但Windows不支持。Windows仅支持生成

使用use\u multiprocessing=True时Windows挂起的原因是Pythonthreading模块对Windows使用spawn。因此,父进程永远等待子进程完成,因为父进程无法将其内存传输给子进程,因此子进程不知道该做什么。

回答2:它不是线程安全的。在Windows上,如果您曾经尝试使用数据生成器或序列,您可能会看到这样的错误

ValueError: Using a generator with use_multiprocessing=True is not supported on Windows 
(no marshalling of generators across process boundaries). Instead, use single 
thread/process or multithreading.

编组表示将对象的内存表示形式转换为适合传输的数据格式。错误是说与使用fork的Linux不同,use_multiprocessing=True在Windows上不起作用,因为它使用spawn',并且无法将其数据传输到子线程。

此时,您可能会问自己:

“等等...Python全局解释器锁(GIL)怎么样?...如果Python一次只允许一个线程运行,为什么它甚至有线程模块,为什么我们在Tensorflow中关心这个??!"

答案在于CPU绑定任务I/O绑定任务之间的区别:

  • CPU绑定的任务=等待处理数据的任务
  • I/O绑定任务=等待其他进程输入或输出的任务(即数据传输)

在编程中,当我们说两个任务并发时,我们的意思是它们可以在重叠的时间内启动、运行和完成。当我们说它们是并行时,我们的意思是它们实际上是同时运行的。

因此,GIL防止线程并行运行,但不能并发运行。这对Tensorflow很重要的原因是因为并发是关于I/O操作(数据搬迁)的。Tensorflow中一个好的数据流管道应该尝试并发,这样当数据在CPU、GPU和/或RAM之间传输时就不会有滞后时间,训练也会更快完成。我们可以让线程执行图像预处理或其他操作,直到数据返回,而不是让线程坐在那里等待数据返回。)

现在回到线程安全。当运行并发/并行任务时,您必须注意其他事情。两个大问题是:

>

死锁-如果两个线程试图同时访问相同的内存,你会得到一个错误。为了防止这种情况,我们向线程添加一个lock互斥锁(互斥),以防止其他线程在运行时访问相同的内存。然而,如果两个线程需要访问相同的内存,被锁定,并且每个线程依赖于另一个线程来执行,则程序挂起。

我之所以提出这个问题,是因为Tensorflow需要能够picklePython对象,以使代码运行得更快。(pickling是将对象和数据转换为字节码,与整个程序的源代码在Windows上转换为exe的方式非常相似)。Tensorflow迭代器__init_uuz()方法锁定线程并包含线程。Lock()

def __init__(self, n, batch_size, shuffle, seed):
    ...
    self.lock = threading.Lock()
    ...

问题是Python不能pickle线程锁定Windows上的对象(即Windows不能marshall线程锁定到子线程)。

如果您尝试使用生成器并将其传递给fit_generator,您将获得错误(请参阅GitHub问题#10842

TypeError: can't pickle _thread.lock objects

所以,虽然use_multiprocessing=True在Linux上是线程安全的,但在Windows上不是。

解决方案:大约在2020年6月,微软推出了适用于Linux的WindowsSubshell版本2(wsl)。这很重要,因为它支持GPU硬件加速。版本1“只是”Windows NT和Linux之间的一个驱动程序,而wsl现在实际上是一个内核。因此,您现在可以在Windows上安装Linux,从命令提示符打开bash shell,并且(最重要的)可以访问硬件。因此,现在可以在wsl上安装tensorflow gpu。此外,您现在可以使用fork

因此,我建议

  1. 在Windows上安装wslVersion2并添加所需的Linux环境

警告:我还没有测试它是否有效,但就我所知有限,我相信它应该有效。

在这之后,回答问题3应该是一个简单的问题,即调整并发量和并行量,我推荐TensorflowDev 2018峰会视频培训性能:更快收敛的用户指南,以了解如何做到这一点。