Java源码示例:org.eclipse.jetty.reactive.client.ContentChunk
示例1
private DataBuffer toDataBuffer(ContentChunk chunk) {
// We must copy until this is resolved:
// https://github.com/eclipse/jetty.project/issues/2429
// Use copy instead of buffer wrapping because Callback#succeeded() is
// used not only to release the buffer but also to request more data
// which is a problem for codecs that buffer data.
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
chunk.callback.succeeded();
return buffer;
}
示例2
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
示例3
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
示例4
private ContentChunk toContentChunk(DataBuffer buffer) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
}
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
}
});
}
示例5
private DataBuffer toDataBuffer(ContentChunk chunk) {
// We must copy until this is resolved:
// https://github.com/eclipse/jetty.project/issues/2429
// Use copy instead of buffer wrapping because Callback#succeeded() is
// used not only to release the buffer but also to request more data
// which is a problem for codecs that buffer data.
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
chunk.callback.succeeded();
return buffer;
}
示例6
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
示例7
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
示例8
private ContentChunk toContentChunk(DataBuffer buffer) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
}
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
}
});
}
示例9
protected ReactiveRequest.Content provideBody(ReactiveHttpRequest request) {
Publisher<ContentChunk> bodyPublisher;
String contentType;
if(request.body() instanceof Mono){
if(bodyActualClass == ByteBuffer.class){
bodyPublisher = ((Mono)request.body()).map(this::toByteBufferChunk);
contentType = APPLICATION_OCTET_STREAM;
}
else if(bodyActualClass == byte[].class){
bodyPublisher = Flux.from(request.body()).map(this::toByteArrayChunk);
contentType = APPLICATION_OCTET_STREAM;
}
else if (CharSequence.class.isAssignableFrom(bodyActualClass)){
bodyPublisher = Flux.from(request.body()).map(this::toCharSequenceChunk);
contentType = TEXT_UTF_8;
}
else {
bodyPublisher = Flux.from(request.body()).map(data -> toJsonChunk(data, false));
contentType = APPLICATION_JSON_UTF_8;
}
} else {
if(bodyActualClass == ByteBuffer.class){
bodyPublisher = Flux.from(request.body()).map(this::toByteBufferChunk);
contentType = APPLICATION_OCTET_STREAM;
}
else if(bodyActualClass == byte[].class){
bodyPublisher = Flux.from(request.body()).map(this::toByteArrayChunk);
contentType = APPLICATION_OCTET_STREAM;
}
else {
bodyPublisher = Flux.from(request.body()).map(data -> toJsonChunk(data, true));
contentType = APPLICATION_STREAM_JSON_UTF_8;
}
}
return ReactiveRequest.Content.fromPublisher(bodyPublisher, contentType);
}
示例10
protected ContentChunk toJsonChunk(Object data, boolean stream){
try {
ByteArrayBuilder byteArrayBuilder = new ByteArrayBuilder();
bodyWriter.writeValue(byteArrayBuilder, data);
if(stream) {
byteArrayBuilder.write(NEWLINE_SEPARATOR);
}
ByteBuffer buffer = ByteBuffer.wrap(byteArrayBuilder.toByteArray());
return new ContentChunk(buffer);
} catch (java.io.IOException e) {
throw new UncheckedIOException(e);
}
}
示例11
JettyReactiveHttpResponse(Response clientResponse, Publisher<ContentChunk> contentChunks,
Class returnPublisherType, Class returnActualClass,
JsonFactory jsonFactory, ObjectReader objectReader) {
this.clientResponse = clientResponse;
this.contentChunks = contentChunks;
this.returnPublisherType = returnPublisherType;
this.returnActualClass = returnActualClass;
this.objectReader = objectReader;
this.jsonFactory = jsonFactory;
}
示例12
protected ContentChunk toByteBufferChunk(Object data){
return new ContentChunk((ByteBuffer)data);
}
示例13
protected ContentChunk toByteArrayChunk(Object data){
return new ContentChunk(ByteBuffer.wrap((byte[])data));
}
示例14
protected ContentChunk toCharSequenceChunk(Object data){
CharBuffer charBuffer = CharBuffer.wrap((CharSequence) data);
ByteBuffer byteBuffer = UTF_8.encode(charBuffer);
return new ContentChunk(byteBuffer);
}