提问者:小点点

是否可以使用 fork/join 跨线程边界安全地移植非线程安全值?


我有一些不是线程安全的类:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}

(我在这里使用了一个< code>long作为字段,但是我们应该把它的字段看作是某种线程不安全的类型)。

我现在有一个看起来像这样的类

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}

也就是说,线程不安全类是它的最后一个字段。现在我要这样做:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}

也就是说,从线程T(main),我调用了T'(fork-join-pool)上的一些工作,它创建并变异了我的不安全类的一个实例,然后返回包装在Foo中的结果。请注意,我的线程不安全类的所有突变都发生在单个线程上,T'

问题1:我能保证thread-unsafe-class实例的最终状态在< code>T' ~

问题2:如果我使用并行流这样做会怎样?例如:

Map<Long, Foo> results = 
  Stream
    .of(new ThreadUnsafeClass())
    .parallel()
    .map(tuc -> {
      tuc.incrementAndGet();
      return new Foo(tuc);
    })
    .collect(
      Collectors.toConcurrentMap(
        foo -> foo.c.i,
        Function.identity();
      )
    );
assert(results.get(1) != null)

共1个答案

匿名用户

我认为ForkJoinTask. connect()Future.get()具有相同的内存效果(因为它在connect()Javadoc中说基本上是get()具有中断和异常差异)。并且Future.get()指定为:

在通过另一个线程中的Future.get()检索结果之后,由Future happen-before操作表示的异步计算采取的操作。

换句话说,这基本上是通过Future/FJT的“安全出版物”。这意味着,执行器线程所做的任何操作并通过 FJT 结果发布的内容对 FJT.join() 用户都是可见的。由于该示例仅在执行器线程中分配对象并填充其字段,并且对象从执行器返回后没有任何反应,因此我们只允许查看执行器线程生成的值是有道理的。

请注意,通过< code>final完成整个过程并不会给它带来任何额外的好处。即使你只是去普通的商场,你仍然可以得到保证:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    Future<MyObject> f = s.submit(() -> new MyObject(42));
    assert (f.get().x == 42); // guaranteed!
    s.shutdown();
}

public class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

但请注意,在 Stream 示例中(如果我们假设 Stream.of.parallelExecutor.submit 之间以及 Stream.collectFJT.join/Future.get 之间的对称性),您已经在调用者线程中创建了对象,然后将其传递给执行器以执行某些操作。这是一个微妙的差异,但它仍然没有多大关系,因为我们在提交时也有 HB,这排除了看到对象的旧状态:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    MyObject o = new MyObject(42);
    Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
    f.get(); // get -->hb--> read o.x
    assert (o.x == 43); // guaranteed
    s.shutdown();
}

public static class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

(正式来说,这是因为read(o. x)的所有HB路径都通过执行store(o.x,43)的执行线程的动作)

相关问题