Java源码示例:io.cloudevents.json.Json
示例1
@Override
public @NotNull Mono<Void> metadataPush(@NotNull Payload payload) {
try {
if (payload.metadata().readableBytes() > 0) {
CloudEventImpl<?> cloudEvent = Json.decodeValue(payload.getMetadataUtf8(), CLOUD_EVENT_TYPE_REFERENCE);
//todo
String type = cloudEvent.getAttributes().getType();
if (UpstreamClusterChangedEvent.class.getCanonicalName().equalsIgnoreCase(type)) {
handleUpstreamClusterChangedEvent(cloudEvent);
}
}
} catch (Exception e) {
log.error(RsocketErrorCode.message(RsocketErrorCode.message("RST-610500", e.getMessage())), e);
} finally {
ReferenceCountUtil.safeRelease(payload);
}
return Mono.empty();
}
示例2
@Test
public void testJson() throws Exception {
// given
final String eventId = UUID.randomUUID().toString();
final URI src = URI.create("/trigger");
final String eventType = "My.Cloud.Event.Type";
// passing in the given attributes
final CloudEventImpl<String> cloudEvent = CloudEventBuilder.<String>builder()
.withType(eventType)
.withId(eventId)
.withTime(ZonedDateTime.now())
.withDataschema(URI.create("demo:demo"))
.withDataContentType("text/plain")
.withSource(src)
.withData("欢迎")
.build();
String text = Json.encode(cloudEvent);
System.out.println(text);
text = text.replace("欢迎", "leijuan");
Json.decodeValue(text, new TypeReference<CloudEventImpl<String>>() {
});
System.out.println(cloudEvent.getData().get());
}
示例3
@Test
public void testCloudEvent() throws Exception {
UpstreamClusterChangedEvent upstreamClusterChangedEvent = new UpstreamClusterChangedEvent();
upstreamClusterChangedEvent.setGroup("demo");
upstreamClusterChangedEvent.setInterfaceName("com.alibaba.account.AccountService");
upstreamClusterChangedEvent.setVersion("1.0.0");
upstreamClusterChangedEvent.setUris(Arrays.asList("demo1", "demo2"));
// passing in the given attributes
final CloudEventImpl<UpstreamClusterChangedEvent> cloudEvent = CloudEventBuilder.<UpstreamClusterChangedEvent>builder()
.withType("com.alibaba.rsocket.upstream.UpstreamClusterChangedEvent")
.withId("xxxxx")
.withTime(ZonedDateTime.now())
.withDataschema(URI.create("demo:demo"))
.withDataContentType("application/json")
.withSource(new URI("demo"))
.withData(upstreamClusterChangedEvent)
.build();
String text = Json.encode(cloudEvent);
CloudEventImpl<UpstreamClusterChangedEvent> event2 = Json.decodeValue(text, new TypeReference<CloudEventImpl<UpstreamClusterChangedEvent>>() {
});
UpstreamClusterChangedEvent upstreamClusterChangedEvent1 = CloudEventSupport.unwrapData(event2, UpstreamClusterChangedEvent.class);
System.out.println(Json.encode(upstreamClusterChangedEvent1));
Assertions.assertEquals(upstreamClusterChangedEvent.getInterfaceName(), upstreamClusterChangedEvent1.getInterfaceName());
}
示例4
private static TraceOutputValue traceOutputFrom(EvaluateDecisionResult decisionResult, DMNModel model) {
TraceType type = Optional.ofNullable(model)
.map(m -> m.getDecisionById(decisionResult.getDecisionId()))
.map(DecisionNode::getResultType)
.map(TraceType::from)
.orElse(null);
JsonNode value = Optional.ofNullable(decisionResult.getResult())
.<JsonNode>map(Json.MAPPER::valueToTree)
.orElse(null);
return new TraceOutputValue(
decisionResult.getDecisionId(),
decisionResult.getDecisionName(),
decisionResult.getEvaluationStatus().name(),
type,
value,
decisionResult.getMessages()
);
}
示例5
@Test
default void testPublish() throws Exception {
var envelope = createEnvelope("key".getBytes());
var offsetInfo = publish(envelope);
assertThat(offsetInfo)
.satisfies(info -> {
assertThat(info.getTopic()).as("topic").isEqualTo(getTopic());
assertThat(info.getOffset()).as("offset").isNotNegative();
});
var receivedRecord = subscribeToPartition(offsetInfo.getPartition())
.flatMap(RecordsStorage.PartitionSource::getPublisher)
.blockFirst(Duration.ofSeconds(10));
assertThat(receivedRecord.getEnvelope()).as("envelope")
.usingComparatorForType(Comparator.comparing(Json::encode), CloudEvent.class)
.isEqualToIgnoringGivenFields(envelope, "keyEncoder", "valueEncoder")
.satisfies(it -> {
assertThat(it.getRawValue()).isInstanceOf(CloudEvent.class);
});
assertThat(receivedRecord.getPartition()).as("partition").isEqualTo(offsetInfo.getPartition());
assertThat(receivedRecord.getOffset()).as("offset").isEqualTo(offsetInfo.getOffset());
}
示例6
@Override
@NotNull
public Mono<Void> metadataPush(@NotNull Payload payload) {
try {
if (payload.metadata().readableBytes() > 0) {
CloudEventImpl<ObjectNode> cloudEvent = Json.decodeValue(payload.getMetadataUtf8(), CLOUD_EVENT_TYPE_REFERENCE);
return fireCloudEvent(cloudEvent);
}
} catch (Exception e) {
log.error(RsocketErrorCode.message("RST-610500", e.getMessage()), e);
} finally {
ReferenceCountUtil.safeRelease(payload);
}
return Mono.empty();
}
示例7
/**
* receive event from peer
*
* @param payload payload with metadata only
* @return mono empty
*/
@Override
@NotNull
public Mono<Void> metadataPush(@NotNull Payload payload) {
try {
if (payload.metadata().readableBytes() > 0) {
return fireCloudEvent(Json.decodeValue(payload.getMetadataUtf8(), CLOUD_EVENT_TYPE_REFERENCE));
}
} catch (Exception e) {
log.error(RsocketErrorCode.message(RsocketErrorCode.message("RST-610500", e.getMessage())), e);
} finally {
ReferenceCountUtil.safeRelease(payload);
}
return Mono.empty();
}
示例8
@Test
public void testConvert() throws Exception {
ConfigEvent configurationEvent = new ConfigEvent("app1", "text/plain", "Hello");
CloudEventImpl<ConfigEvent> cloudEvent = configurationEvent.toCloudEvent(URI.create("demo"));
String jsonText = Json.encode(cloudEvent);
System.out.println(jsonText);
CloudEventImpl<ConfigEvent> configurationEvent2 = Json.decodeValue(jsonText, new TypeReference<CloudEventImpl<ConfigEvent>>() {
});
System.out.println(Json.encode(configurationEvent2));
}
示例9
private static TraceInputValue traceInputFrom(InputDataNode node, Map<String, Object> context) {
JsonNode value = Optional.ofNullable(context.get(node.getName()))
.<JsonNode>map(Json.MAPPER::valueToTree)
.orElse(null);
return new TraceInputValue(
node.getId(),
node.getName(),
TraceType.from(node.getType()),
value,
Collections.emptyList()
);
}
示例10
private static TraceInputValue traceInputFrom(Map.Entry<String, Object> contextEntry) {
return new TraceInputValue(
null,
contextEntry.getKey(),
null,
Json.MAPPER.valueToTree(contextEntry.getValue()),
Collections.emptyList()
);
}
示例11
private void testInterleavedEvaluations(Supplier<TerminationDetector> terminationDetectorSupplier) {
MockDefaultAggregator aggregator = new MockDefaultAggregator();
Consumer<String> payloadConsumer = mock(Consumer.class);
DecisionTracingCollector collector = new DecisionTracingCollector(
aggregator,
payloadConsumer,
(namespace, name) -> model,
terminationDetectorSupplier
);
List<EvaluateEvent> evaluateAllEvents = readEvaluateEventsFromJsonResource(EVALUATE_ALL_JSON_RESOURCE);
List<EvaluateEvent> evaluateDecisionServiceEvents = readEvaluateEventsFromJsonResource(EVALUATE_DECISION_SERVICE_JSON_RESOURCE);
for (int i = 0; i < Math.max(evaluateAllEvents.size(), evaluateDecisionServiceEvents.size()); i++) {
if (i < evaluateAllEvents.size()) {
collector.addEvent(evaluateAllEvents.get(i));
}
if (i < evaluateDecisionServiceEvents.size()) {
collector.addEvent(evaluateDecisionServiceEvents.get(i));
}
}
Map<String, Pair<List<EvaluateEvent>, CloudEventImpl<TraceEvent>>> aggregatorCalls = aggregator.getCalls();
assertEquals(2, aggregatorCalls.size());
assertTrue(aggregatorCalls.containsKey(EVALUATE_ALL_EXECUTION_ID));
assertEquals(evaluateAllEvents.size(), aggregatorCalls.get(EVALUATE_ALL_EXECUTION_ID).getLeft().size());
assertTrue(aggregatorCalls.containsKey(EVALUATE_DECISION_SERVICE_EXECUTION_ID));
assertEquals(evaluateDecisionServiceEvents.size(), aggregatorCalls.get(EVALUATE_DECISION_SERVICE_EXECUTION_ID).getLeft().size());
ArgumentCaptor<String> payloadCaptor = ArgumentCaptor.forClass(String.class);
verify(payloadConsumer, times(2)).accept(payloadCaptor.capture());
int evaluateAllIndex = evaluateAllEvents.size() > evaluateDecisionServiceEvents.size() ? 1 : 0;
int evaluateDecisionServiceIndex = evaluateAllIndex == 1 ? 0 : 1;
List<String> payloads = payloadCaptor.getAllValues();
assertEquals(Json.encode(aggregatorCalls.get(EVALUATE_ALL_EXECUTION_ID).getRight()), payloads.get(evaluateAllIndex));
assertEquals(Json.encode(aggregatorCalls.get(EVALUATE_DECISION_SERVICE_EXECUTION_ID).getRight()), payloads.get(evaluateDecisionServiceIndex));
}
示例12
@Test
default void testSubscribeWithLatest() throws Exception {
var key = UUID.randomUUID().toString().getBytes();
var offsetInfos = publishMany(key, 5);
var partition = offsetInfos.get(0).getPartition();
var disposeAll = DirectProcessor.<Boolean>create();
try {
var recordsSoFar = new ArrayList<RecordsStorage.Record>();
var assigned = new AtomicBoolean(false);
subscribeToPartition(partition, "latest")
.doOnNext(__ -> assigned.set(true))
.flatMap(RecordsStorage.PartitionSource::getPublisher)
.takeUntilOther(disposeAll)
.subscribe(recordsSoFar::add);
await.untilTrue(assigned);
var envelope = createEnvelope(key);
var offsetInfo = publish(envelope);
await.untilAsserted(() -> {
assertThat(recordsSoFar)
.hasSize(1)
.allSatisfy(it -> {
assertThat(it.getEnvelope()).as("envelope")
.usingComparatorForType(Comparator.comparing(Json::encode), CloudEvent.class)
.isEqualToIgnoringGivenFields(envelope, "keyEncoder", "valueEncoder");
assertThat(it.getPartition()).as("partition").isEqualTo(offsetInfo.getPartition());
assertThat(it.getOffset()).as("offset").isEqualTo(offsetInfo.getOffset());
});
});
} finally {
disposeAll.onNext(true);
}
}
示例13
@SneakyThrows
private void preProcess(JsonSchemaPreProcessor processor, CloudEvent<?, ?> cloudEvent) {
try {
processor.preProcess(new Envelope(
"topic",
null,
__ -> ByteBuffer.wrap("key".getBytes()),
cloudEvent,
it -> ByteBuffer.wrap(Json.binaryEncode(it)).asReadOnlyBuffer()
)).toCompletableFuture().get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}
示例14
public static Payload cloudEventToPayload(CloudEventImpl<?> cloudEvent) {
RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(new MessageMimeTypeMetadata(RSocketMimeType.CloudEventsJson));
return ByteBufPayload.create(Unpooled.wrappedBuffer(Json.binaryEncode(cloudEvent)), compositeMetadata.getContent());
}
示例15
@Test
public void test_ListenerAndCollector_UseRealEvents_Working() {
final String modelResource = "/Traffic Violation.dmn";
final String modelNamespace = "https://github.com/kiegroup/drools/kie-dmn/_A4BCA8B8-CF08-433F-93B2-A2598F19ECFF";
final String modelName = "Traffic Violation";
final DMNRuntime runtime = DMNKogito.createGenericDMNRuntime(new java.io.InputStreamReader(
SpringBootDecisionTracingTest.class.getResourceAsStream(modelResource)
));
ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class);
SpringBootDecisionTracingListener listener = new SpringBootDecisionTracingListener(eventPublisher);
runtime.addListener(listener);
final Map<String, Object> driver = new HashMap<>();
driver.put("Age", 25);
driver.put("Points", 10);
final Map<String, Object> violation = new HashMap<>();
violation.put("Type", "speed");
violation.put("Actual Speed", 105);
violation.put("Speed Limit", 100);
final Map<String, Object> contextVariables = new HashMap<>();
contextVariables.put("Driver", driver);
contextVariables.put("Violation", violation);
final DecisionModel model = new DmnDecisionModel(runtime, modelNamespace, modelName, () -> TEST_EXECUTION_ID);
final DMNContext context = model.newContext(contextVariables);
model.evaluateAll(context);
ArgumentCaptor<EvaluateEvent> eventCaptor = ArgumentCaptor.forClass(EvaluateEvent.class);
verify(eventPublisher, times(14)).publishEvent(eventCaptor.capture());
final DecisionModels mockedDecisionModels = mock(DecisionModels.class);
when(mockedDecisionModels.getDecisionModel(modelNamespace, modelName)).thenReturn(model);
final Application mockedApplication = mock(Application.class);
when(mockedApplication.decisionModels()).thenReturn(mockedDecisionModels);
KafkaTemplate<String, String> template = mock(KafkaTemplate.class);
SpringBootDecisionTracingCollector collector = new SpringBootDecisionTracingCollector(mockedApplication, template, TEST_TOPIC);
eventCaptor.getAllValues().forEach(collector::onApplicationEvent);
ArgumentCaptor<String> topicCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> payloadCaptor = ArgumentCaptor.forClass(String.class);
verify(template).send(topicCaptor.capture(), payloadCaptor.capture());
assertEquals(TEST_TOPIC, topicCaptor.getValue());
CloudEventImpl<JsonNode> cloudEvent = Json.decodeValue(payloadCaptor.getValue(), CloudEventImpl.class, JsonNode.class);
assertEquals(TEST_EXECUTION_ID, cloudEvent.getAttributes().getId());
}
示例16
@Test
public void test_ListenerAndCollector_UseRealEvents_Working() {
final String modelResource = "/Traffic Violation.dmn";
final String modelNamespace = "https://github.com/kiegroup/drools/kie-dmn/_A4BCA8B8-CF08-433F-93B2-A2598F19ECFF";
final String modelName = "Traffic Violation";
final DMNRuntime runtime = DMNKogito.createGenericDMNRuntime(new java.io.InputStreamReader(
QuarkusDecisionTracingTest.class.getResourceAsStream(modelResource)
));
EventBus eventBus = mock(EventBus.class);
QuarkusDecisionTracingListener listener = new QuarkusDecisionTracingListener(eventBus);
runtime.addListener(listener);
final Map<String, Object> driver = new HashMap<>();
driver.put("Age", 25);
driver.put("Points", 10);
final Map<String, Object> violation = new HashMap<>();
violation.put("Type", "speed");
violation.put("Actual Speed", 105);
violation.put("Speed Limit", 100);
final Map<String, Object> contextVariables = new HashMap<>();
contextVariables.put("Driver", driver);
contextVariables.put("Violation", violation);
final DecisionModel model = new DmnDecisionModel(runtime, modelNamespace, modelName, () -> TEST_EXECUTION_ID);
final DMNContext context = model.newContext(contextVariables);
model.evaluateAll(context);
ArgumentCaptor<EvaluateEvent> eventCaptor = ArgumentCaptor.forClass(EvaluateEvent.class);
verify(eventBus, times(14)).send(eq("kogito-tracing-decision_EvaluateEvent"), eventCaptor.capture());
TestSubscriber<String> subscriber = new TestSubscriber<>();
final DecisionModels mockedDecisionModels = mock(DecisionModels.class);
when(mockedDecisionModels.getDecisionModel(modelNamespace, modelName)).thenReturn(model);
final Application mockedApplication = mock(Application.class);
when(mockedApplication.decisionModels()).thenReturn(mockedDecisionModels);
QuarkusDecisionTracingCollector collector = new QuarkusDecisionTracingCollector(mockedApplication);
collector.getEventPublisher().subscribe(subscriber);
eventCaptor.getAllValues().forEach(collector::onEvent);
subscriber.assertValueCount(1);
CloudEventImpl<JsonNode> cloudEvent = Json.decodeValue(subscriber.values().get(0), CloudEventImpl.class, JsonNode.class);
assertEquals(TEST_EXECUTION_ID, cloudEvent.getAttributes().getId());
}
示例17
private String aggregate(DMNModel model, String executionId, List<EvaluateEvent> events) {
return Json.encode(aggregator.aggregate(model, executionId, events));
}
示例18
public static List<EvaluateEvent> readEvaluateEventsFromJsonResource(String resourceName) {
return Json.fromInputStream(
DecisionTestUtils.class.getResourceAsStream(resourceName),
EVALUATE_EVENT_LIST_TYPE
);
}
示例19
@Override
public Uni<Buffer> convert(CloudEventMessage payload) {
return Uni.createFrom().item(Buffer.buffer(Json.encode(payload)));
}
示例20
public ByteBuffer asJson() {
// TODO
return ByteBuffer.wrap(Json.binaryEncode(toV1())).asReadOnlyBuffer();
}