Java源码示例:com.evanlennick.retry4j.CallExecutor
示例1
@Override
public void sendBatch(TransportBuffer buffer) throws TransportException {
Buffer internalBuffer = ((TcpTransportBuffer) buffer).getInternalBuffer();
Callable<Boolean> write = () -> {
try {
sink.write(internalBuffer, internalBuffer.size());
return true;
} catch (IOException ex) {
throw new TransportException("Error while sending in tcp transport", ex);
}
};
try {
new CallExecutor(retryConfig).execute(write);
} catch (RetriesExhaustedException | UnexpectedException ue) {
throw new TransportException(ue);
}
}
示例2
protected final <T> T retry(Callable<T> callable) {
final Callable<T> wrapper = () -> {
try {
return callable.call();
} catch (final Exception e) {
System.out.println("retry[] exception: " + e.getMessage());
e.printStackTrace();
throw e;
}
};
final RetryConfig config = new RetryConfigBuilder()
.retryOnAnyException()
.withMaxNumberOfTries(30)
.withDelayBetweenTries(10, ChronoUnit.SECONDS)
.withFixedBackoff()
.build();
final CallResults<Object> results = new CallExecutor(config).execute(wrapper);
return (T) results.getResult();
}
示例3
protected final <T> T retry(Callable<T> callable) {
final Callable<T> wrapper = () -> {
try {
return callable.call();
} catch (final Exception e) {
System.out.println("retry[] exception: " + e.getMessage());
e.printStackTrace();
throw e;
}
};
final RetryConfig config = new RetryConfigBuilder()
.retryOnAnyException()
.withMaxNumberOfTries(30)
.withDelayBetweenTries(10, ChronoUnit.SECONDS)
.withFixedBackoff()
.build();
final CallResults<Object> results = new CallExecutor(config).execute(wrapper);
return (T) results.getResult();
}
示例4
protected final <T> T retry(Callable<T> callable) {
final Callable<T> wrapper = () -> {
try {
return callable.call();
} catch (final Exception e) {
System.out.println("retry[] exception: " + e.getMessage());
e.printStackTrace();
throw e;
}
};
final RetryConfig config = new RetryConfigBuilder()
.retryOnAnyException()
.withMaxNumberOfTries(30)
.withDelayBetweenTries(10, ChronoUnit.SECONDS)
.withFixedBackoff()
.build();
final CallResults<Object> results = new CallExecutor(config).execute(wrapper);
return (T) results.getResult();
}
示例5
@Override
public InternalEvent next() {
updateCursor();
/*
* Wrap reading next row in retry logic. This is because there is intermittent socket timeouts
* when reading from S3 that cause the function to hang/fail.
*/
Callable<String> callable = () -> {
return this.lineIterator.next();
};
String nextRow;
try {
CallResults<Object> results = new CallExecutor(this.config).execute(callable);
nextRow = (String) results.getResult();
} catch (RetriesExhaustedException ree) {
throw new RuntimeException(ree.getCallResults().getLastExceptionThatCausedRetry());
} catch (UnexpectedException ue) {
throw ue;
}
/*
* Construct the internal event
*/
return new S3InternalEvent(nextRow, this.context, this.arrivalTime,
currentS3Entity.getObject().getKey(), currentS3Entity.getBucket().getName(),
currentS3Entity.getObject().getVersionId());
}
示例6
@Test
public void verifySimpleExponentialProfile() {
Callable<Boolean> callable = () -> true;
RetryConfig retryConfig = retryConfigBuilder
.exponentialBackoff5Tries5Sec()
.build();
CallExecutor callExecutor = new CallExecutorBuilder().config(retryConfig).build();
Status results = callExecutor.execute(callable);
assertThat(results.wasSuccessful());
}
示例7
@Test
public void verifySimpleFibonacciProfile() {
Callable<Boolean> callable = () -> true;
RetryConfig retryConfig = retryConfigBuilder
.fiboBackoff7Tries5Sec()
.build();
CallExecutor callExecutor = new CallExecutorBuilder().config(retryConfig).build();
Status results = callExecutor.execute(callable);
assertThat(results.wasSuccessful());
}
示例8
@Test
public void verifySimpleRandomExponentialProfile() {
Callable<Boolean> callable = () -> true;
RetryConfig retryConfig = retryConfigBuilder
.randomExpBackoff10Tries60Sec()
.build();
CallExecutor callExecutor = new CallExecutorBuilder().config(retryConfig).build();
Status results = callExecutor.execute(callable);
assertThat(results.wasSuccessful());
}
示例9
@Test
public void verifySimpleFixedProfile() {
Callable<Boolean> callable = () -> true;
RetryConfig retryConfig = retryConfigBuilder
.fixedBackoff5Tries10Sec()
.build();
CallExecutor callExecutor = new CallExecutorBuilder().config(retryConfig).build();
Status results = callExecutor.execute(callable);
assertThat(results.wasSuccessful());
}
示例10
public void sendBatch(byte[] raw) throws TransportException {
/*
* Wrap the call with retry logic to avoid intermittent ES issues.
*/
Callable<HttpResponse> callable = () -> {
HttpResponse resp;
String responseString = null;
HttpPost httpPost = new HttpPost(this.url);
/*
* Do the call, read response, release connection so it is available for use again, and
* finally check the response.
*/
try {
if (this.useGzip) {
resp = sendBatchCompressed(httpPost, raw);
} else {
resp = sendBatchUncompressed(httpPost, raw);
}
try {
responseString = EntityUtils.toString(resp.getEntity());
} catch (ParseException | IOException e) {
throw new TransportException(
"http transport call failed because " + resp.getStatusLine().getReasonPhrase());
}
} finally {
/*
* Always release connection otherwise it blocks future requests.
*/
httpPost.releaseConnection();
}
checkResponse(resp, responseString);
return resp;
};
RetryConfig config = new RetryConfigBuilder()
.retryOnSpecificExceptions(TransportException.class).withMaxNumberOfTries(this.retries + 1)
.withDelayBetweenTries(this.retryDelayMs, ChronoUnit.MILLIS).withExponentialBackoff()
.build();
try {
new CallExecutor(config).execute(callable);
} catch (RetriesExhaustedException ree) {
logger.warn("transport failed after " + ree.getCallResults().getTotalTries() + " tries.");
throw new TransportException(ree.getCallResults().getLastExceptionThatCausedRetry());
} catch (UnexpectedException ue) {
throw new TransportException(ue);
}
}