Java源码示例:org.apache.flink.streaming.runtime.streamrecord.LatencyMarker

示例1
private static void testLatencyStats(
	final LatencyStats.Granularity granularity,
	final Consumer<List<Tuple2<String, Histogram>>> verifier) {

	final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
	final TestMetricRegistry registry = new TestMetricRegistry();
	final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);

	final LatencyStats latencyStats = new LatencyStats(
		parentGroup,
		MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
		OPERATOR_SUBTASK_INDEX,
		OPERATOR_ID,
		granularity);

	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));

	verifier.accept(registry.latencyHistograms);
}
 
示例2
public LatencyMarksEmitter(
		final ProcessingTimeService processingTimeService,
		final Output<StreamRecord<OUT>> output,
		long latencyTrackingInterval,
		final OperatorID operatorId,
		final int subtaskIndex) {

	latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
		new ProcessingTimeCallback() {
			@Override
			public void onProcessingTime(long timestamp) throws Exception {
				try {
					// ProcessingTimeService callbacks are executed under the checkpointing lock
					output.emitLatencyMarker(new LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId, subtaskIndex));
				} catch (Throwable t) {
					// we catch the Throwables here so that we don't trigger the processing
					// timer services async exception handler
					LOG.warn("Error while emitting latency marker.", t);
				}
			}
		},
		0L,
		latencyTrackingInterval);
}
 
示例3
public LatencyMarksEmitter(
		final ProcessingTimeService processingTimeService,
		final Output<StreamRecord<OUT>> output,
		long latencyTrackingInterval,
		final OperatorID operatorId,
		final int subtaskIndex) {

	latencyMarkTimer = processingTimeService.scheduleWithFixedDelay(
		new ProcessingTimeCallback() {
			@Override
			public void onProcessingTime(long timestamp) throws Exception {
				try {
					// ProcessingTimeService callbacks are executed under the checkpointing lock
					output.emitLatencyMarker(new LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId, subtaskIndex));
				} catch (Throwable t) {
					// we catch the Throwables here so that we don't trigger the processing
					// timer services async exception handler
					LOG.warn("Error while emitting latency marker.", t);
				}
			}
		},
		0L,
		latencyTrackingInterval);
}
 
示例4
@Test
public void testLatencyMarker() throws Exception {
	final Map<String, Metric> metrics = new ConcurrentHashMap<>();
	final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);

	try (StreamTaskMailboxTestHarness<String> testHarness =
			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
				.addInput(BasicTypeInfo.INT_TYPE_INFO)
				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
				.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
				.setTaskMetricGroup(taskMetricGroup)
				.build()) {
		ArrayDeque<Object> expectedOutput = new ArrayDeque<>();

		OperatorID sourceId = new OperatorID();
		LatencyMarker latencyMarker = new LatencyMarker(42L, sourceId, 0);
		testHarness.processElement(latencyMarker);
		expectedOutput.add(latencyMarker);

		assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));

		testHarness.endInput();
		testHarness.waitForTaskCompletion();
	}
}
 
示例5
private static void testLatencyStats(
	final LatencyStats.Granularity granularity,
	final Consumer<List<Tuple2<String, Histogram>>> verifier) {

	final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
	final TestMetricRegistry registry = new TestMetricRegistry();
	final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);

	final LatencyStats latencyStats = new LatencyStats(
		parentGroup,
		MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
		OPERATOR_SUBTASK_INDEX,
		OPERATOR_ID,
		granularity);

	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
	latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));

	verifier.accept(registry.latencyHistograms);
}
 
示例6
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// everything except sinks forwards latency markers
	this.output.emitLatencyMarker(marker);
}
 
示例7
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	serializationDelegate.setInstance(latencyMarker);

	try {
		recordWriter.randomEmit(serializationDelegate);
	}
	catch (Exception e) {
		throw new RuntimeException(e.getMessage(), e);
	}
}
 
示例8
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// sinks don't forward latency markers
}
 
示例9
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	try {
		operator.processLatencyMarker(latencyMarker);
	}
	catch (Exception e) {
		throw new ExceptionInChainedOperatorException(e);
	}
}
 
示例10
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	serializationDelegate.setInstance(latencyMarker);

	try {
		recordWriter.randomEmit(serializationDelegate);
	}
	catch (Exception e) {
		throw new RuntimeException(e.getMessage(), e);
	}
}
 
示例11
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
 
示例12
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()))
		.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
 
示例13
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base;
}
 
示例14
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// sinks don't forward latency markers
}
 
示例15
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
	if (inputIndex == 0) {
		operator.processLatencyMarker1(latencyMarker);
	} else {
		operator.processLatencyMarker2(latencyMarker);
	}
}
 
示例16
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
 
示例17
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	if (outputs.length <= 0) {
		// ignore
	} else if (outputs.length == 1) {
		outputs[0].emitLatencyMarker(latencyMarker);
	} else {
		// randomly select an output
		outputs[random.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
	}
}
 
示例18
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// sinks don't forward latency markers
}
 
示例19
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	try {
		operator.processLatencyMarker(latencyMarker);
	}
	catch (Exception e) {
		throw new ExceptionInChainedOperatorException(e);
	}
}
 
示例20
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	if (outputs.length <= 0) {
		// ignore
	} else if (outputs.length == 1) {
		outputs[0].emitLatencyMarker(latencyMarker);
	} else {
		// randomly select an output
		outputs[random.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
	}
}
 
示例21
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	serializationDelegate.setInstance(latencyMarker);

	try {
		recordWriter.randomEmit(serializationDelegate);
	}
	catch (Exception e) {
		throw new RuntimeException(e.getMessage(), e);
	}
}
 
示例22
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base;
}
 
示例23
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()));
}
 
示例24
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()))
		.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
 
示例25
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	try {
		operator.processLatencyMarker(latencyMarker);
	}
	catch (Exception e) {
		throw new ExceptionInChainedOperatorException(e);
	}
}
 
示例26
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// sinks don't forward latency markers
}
 
示例27
@Override
MetricGroup createSourceMetricGroups(
		MetricGroup base,
		LatencyMarker marker,
		OperatorID operatorId,
		int operatorSubtaskIndex) {
	return base
		.addGroup("source_id", String.valueOf(marker.getOperatorId()))
		.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
}
 
示例28
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
	// all operators are tracking latencies
	this.latencyStats.reportLatency(marker);

	// everything except sinks forwards latency markers
	this.output.emitLatencyMarker(marker);
}
 
示例29
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
	output.emitLatencyMarker(latencyMarker);
}
 
示例30
public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
	reportOrForwardLatencyMarker(latencyMarker);
}