我试图理解响应式编程是如何真正工作的。为此,我准备了简单的演示:来自Spring Framework的reactiveWebClient
向简单的rest api发送请求,该客户端在每次操作中打印线程名称。
rest api:
@RestController
@SpringBootApplication
public class RestApiApplication {
public static void main(String[] args) {
SpringApplication.run(RestApiApplication.class, args);
}
@PostMapping("/resource")
public void consumeResource(@RequestBody Resource resource) {
System.out.println(String.format("consumed resource: %s", resource.toString()));
}
}
@Data
@AllArgsConstructor
class Resource {
private final Long id;
private final String name;
}
和最重要的-响应式Web客户端:
@SpringBootApplication
public class ReactorWebclientApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorWebclientApplication.class, args);
}
private final TcpClient tcpClient = TcpClient.create();
private final WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.baseUrl("http://localhost:8080")
.build();
@PostConstruct
void doRequests() {
var longs = LongStream.range(1L, 10_000L)
.boxed()
.toArray(Long[]::new);
var longsStream = Stream.of(longs);
Flux.fromStream(longsStream)
.map(l -> {
System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
return new Resource(l, String.format("name %s", l));
})
.filter(res -> {
System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
return !res.getId().equals(11_000L);
})
.flatMap(res -> {
System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
return webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
.doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
.doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
})
.blockLast();
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
private final Long id;
private final String name;
@JsonCreator
Resource(@JsonProperty("id") Long id, @JsonProperty("name") String name) {
this.id = id;
this.name = name;
}
Long getId() {
return id;
}
String getName() {
return name;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Resource{");
sb.append("id=").append(id);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}
问题是行为与我预测的不同。
我期望. map()
、.filter()
和.flitMap()
的每个调用都将在main
线程上执行,并且.doOn成功()
或.doOnError
的每个调用都将在nio线程池的线程上执行。所以我期望日志如下所示:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)
但我得到的日志是:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------
接下来登录. map()
、.filter()
和.plitMap()
都是在reactor-超文本传输协议-nio的线程上完成的。
接下来不可理解的事实是,在主线程执行的操作和reactor-超文本传输协议-nio之间的比率总是不同的,有时所有的操作. map()
,.filter()
和.plitMap()
都是在主线程执行的。
Retor与RxJava一样,可以被认为与并发无关。也就是说,它不强制执行并发模型。相反,它让您(开发人员)来指挥。然而,这并不妨碍库帮助您解决并发问题。
获取Flux
或Mono
并不一定意味着它在专用的Thread中运行。相反,大多数运算符继续在前一个运算符执行的Thread中工作。除非指定,否则最顶层的运算符(源)本身运行在进行订阅()
调用的Thread上。
项目Reactor相关留档可以在这里找到。
从您的代码中,以下片段:
webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
导致线程从main切换到netty的worker池。之后,所有以下操作都由netty worker线程执行。
如果你想控制这种行为,你应该在你的代码中添加一个PublishOn(…)
语句,例如:
webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
.publishOn(Schedulers.elastic())
这样,任何后续动作都将由弹性调度器线程池执行。
另一个例子是在请求执行之后使用专用调度程序来执行HTTP繁重任务。
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;
@ExtendWith({
WiremockResolver.class,
WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {
private static int concurrency = 1;
private final WebClient webClient = WebClient.create();
@Test
public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {
String requestUri = "/slow-response";
server.stubFor(get(urlEqualTo(requestUri))
.willReturn(aResponse().withStatus(200)
.withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
);
Flux
.generate(() -> Integer.valueOf(1), (i, sink) -> {
System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
sink.next(i);
return i + 1;
})
.subscribeOn(Schedulers.single())
.flatMap(i ->
executeGet(uri + requestUri)
.publishOn(Schedulers.elastic())
.map(response -> {
heavyTask();
return true;
})
, concurrency)
.subscribe();
blockForever();
}
private void blockForever() {
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException ex) {
}
}
}
private Mono<ClientResponse> executeGet(String path) {
System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
return webClient
.get()
.uri(path)
.exchange();
}
private void heavyTask() {
try {
System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
} catch (InterruptedException ex) {
}
}
}