Java源码示例:io.reactivex.rxjava3.functions.Predicate
示例1
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
示例2
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
示例3
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Maybe.concatDelayError(Arrays.asList(cache,remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
})
.firstElement();
}
示例4
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Observable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
示例5
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
示例6
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
示例7
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
示例8
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
示例9
@Test
public void callProducesValue() throws Exception {
server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));
Rx3Apollo
.from(apolloClient.query(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
.test()
.assertNoErrors()
.assertComplete()
.assertValue(new Predicate<Response<EpisodeHeroNameQuery.Data>>() {
@Override public boolean test(Response<EpisodeHeroNameQuery.Data> response) throws Exception {
assertThat(response.data().hero().name()).isEqualTo("R2-D2");
return true;
}
});
}
示例10
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
示例11
@Test
public void eventThrowsBadException() {
PublishSubject<String> stream = PublishSubject.create();
PublishSubject<String> lifecycle = PublishSubject.create();
TestObserver<String> testObserver = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
// We get an error from the function for this lifecycle event
lifecycle.onNext("ick");
stream.onNext("1");
testObserver.assertNoValues();
// We only want to check for our IllegalArgumentException, but may have
// to wade through a CompositeException to get at it.
testObserver.assertError(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
if (throwable instanceof CompositeException) {
CompositeException ce = (CompositeException) throwable;
for (Throwable t : ce.getExceptions()) {
if (t instanceof IllegalArgumentException) {
return true;
}
}
}
return false;
}
});
}