Java源码示例:org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
示例1
private static void testLatencyStats(
final LatencyStats.Granularity granularity,
final Consumer<List<Tuple2<String, Histogram>>> verifier) {
final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
final TestMetricRegistry registry = new TestMetricRegistry();
final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
final LatencyStats latencyStats = new LatencyStats(
parentGroup,
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
OPERATOR_SUBTASK_INDEX,
OPERATOR_ID,
granularity);
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
verifier.accept(registry.latencyHistograms);
}
示例2
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
示例3
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleWithFixedDelay(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
示例4
@Test
public void testLatencyMarker() throws Exception {
final Map<String, Metric> metrics = new ConcurrentHashMap<>();
final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.setTaskMetricGroup(taskMetricGroup)
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
OperatorID sourceId = new OperatorID();
LatencyMarker latencyMarker = new LatencyMarker(42L, sourceId, 0);
testHarness.processElement(latencyMarker);
expectedOutput.add(latencyMarker);
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
}
示例5
private static void testLatencyStats(
final LatencyStats.Granularity granularity,
final Consumer<List<Tuple2<String, Histogram>>> verifier) {
final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
final TestMetricRegistry registry = new TestMetricRegistry();
final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
final LatencyStats latencyStats = new LatencyStats(
parentGroup,
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
OPERATOR_SUBTASK_INDEX,
OPERATOR_ID,
granularity);
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
verifier.accept(registry.latencyHistograms);
}
示例6
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
}
示例7
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
示例8
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// sinks don't forward latency markers
}
示例9
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
try {
operator.processLatencyMarker(latencyMarker);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
示例10
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
示例11
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
示例12
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()))
.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
示例13
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base;
}
示例14
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// sinks don't forward latency markers
}
示例15
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
if (inputIndex == 0) {
operator.processLatencyMarker1(latencyMarker);
} else {
operator.processLatencyMarker2(latencyMarker);
}
}
示例16
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
示例17
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
if (outputs.length <= 0) {
// ignore
} else if (outputs.length == 1) {
outputs[0].emitLatencyMarker(latencyMarker);
} else {
// randomly select an output
outputs[random.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
}
}
示例18
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// sinks don't forward latency markers
}
示例19
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
try {
operator.processLatencyMarker(latencyMarker);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
示例20
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
if (outputs.length <= 0) {
// ignore
} else if (outputs.length == 1) {
outputs[0].emitLatencyMarker(latencyMarker);
} else {
// randomly select an output
outputs[random.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
}
}
示例21
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
示例22
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base;
}
示例23
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
示例24
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()))
.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
示例25
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
try {
operator.processLatencyMarker(latencyMarker);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
示例26
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// sinks don't forward latency markers
}
示例27
@Override
MetricGroup createSourceMetricGroups(
MetricGroup base,
LatencyMarker marker,
OperatorID operatorId,
int operatorSubtaskIndex) {
return base
.addGroup("source_id", String.valueOf(marker.getOperatorId()))
.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
示例28
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
}
示例29
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
output.emitLatencyMarker(latencyMarker);
}
示例30
public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
reportOrForwardLatencyMarker(latencyMarker);
}