Java源码示例:org.apache.flink.metrics.Gauge
示例1
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
if (metric instanceof Counter) {
counters.remove(metric);
} else if (metric instanceof Gauge) {
gauges.remove(metric);
} else if (metric instanceof Histogram) {
histograms.remove(metric);
} else if (metric instanceof Meter) {
meters.remove(metric);
} else {
log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例2
@Test
public void closedGroupDoesNotRegisterMetrics() {
GenericMetricGroup group = new GenericMetricGroup(
exceptionOnRegister, new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
assertFalse(group.isClosed());
group.close();
assertTrue(group.isClosed());
// these will fail is the registration is propagated
group.counter("testcounter");
group.gauge("testgauge", new Gauge<Object>() {
@Override
public Object getValue() {
return null;
}
});
}
示例3
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, name);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例4
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final MetricInfo metricInfo = metricInfoProvider.getMetricInfo(metricName, group);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, metricInfo);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, metricInfo);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, metricInfo);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, metricInfo);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例5
/**
* Tests that heap/non-heap metrics do not rely on a static MemoryUsage instance.
*
* <p>We can only check this easily for the currently used heap memory, so we use it this as a proxy for testing
* the functionality in general.
*/
@Test
public void testHeapMetrics() throws Exception {
final InterceptingOperatorMetricGroup heapMetrics = new InterceptingOperatorMetricGroup();
MetricUtils.instantiateHeapMemoryMetrics(heapMetrics);
@SuppressWarnings("unchecked")
final Gauge<Long> used = (Gauge<Long>) heapMetrics.get(MetricNames.MEMORY_USED);
final long usedHeapInitially = used.getValue();
// check memory usage difference multiple times since other tests may affect memory usage as well
for (int x = 0; x < 10; x++) {
final byte[] array = new byte[1024 * 1024 * 8];
final long usedHeapAfterAllocation = used.getValue();
if (usedHeapInitially != usedHeapAfterAllocation) {
return;
}
Thread.sleep(50);
}
Assert.fail("Heap usage metric never changed it's value.");
}
示例6
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, name);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, name);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, name);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, name);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例7
@VisibleForTesting
io.prometheus.client.Gauge.Child gaugeFrom(Gauge gauge) {
return new io.prometheus.client.Gauge.Child() {
@Override
public double get() {
final Object value = gauge.getValue();
if (value == null) {
log.debug("Gauge {} is null-valued, defaulting to 0.", gauge);
return 0;
}
if (value instanceof Double) {
return (double) value;
}
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
if (value instanceof Boolean) {
return ((Boolean) value) ? 1 : 0;
}
log.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
gauge, value.getClass().getName());
return 0;
}
};
}
示例8
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
String fullName;
if (metric instanceof Counter) {
fullName = counters.remove(metric);
} else if (metric instanceof Gauge) {
fullName = gauges.remove(metric);
} else if (metric instanceof Histogram) {
fullName = histograms.remove(metric);
} else if (metric instanceof Meter) {
fullName = meters.remove(metric);
} else {
fullName = null;
}
if (fullName != null) {
registry.remove(fullName);
}
}
}
示例9
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName);
List<String> tags = new ArrayList<>(configTags);
tags.addAll(getTagsFromMetricGroup(group));
String host = getHostFromMetricGroup(group);
if (metric instanceof Counter) {
Counter c = (Counter) metric;
counters.put(c, new DCounter(c, name, host, tags, clock));
} else if (metric instanceof Gauge) {
Gauge g = (Gauge) metric;
gauges.put(g, new DGauge(g, name, host, tags, clock));
} else if (metric instanceof Meter) {
Meter m = (Meter) metric;
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags, clock));
} else if (metric instanceof Histogram) {
LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
} else {
LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
示例10
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName);
List<String> tags = new ArrayList<>(configTags);
tags.addAll(getTagsFromMetricGroup(group));
String host = getHostFromMetricGroup(group);
if (metric instanceof Counter) {
Counter c = (Counter) metric;
counters.put(c, new DCounter(c, name, host, tags));
} else if (metric instanceof Gauge) {
Gauge g = (Gauge) metric;
gauges.put(g, new DGauge(g, name, host, tags));
} else if (metric instanceof Meter) {
Meter m = (Meter) metric;
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags));
} else if (metric instanceof Histogram) {
LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
} else {
LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
示例11
@Test
public void testMapGauge() {
verifyPoint(
MetricMapper.map(INFO, TIMESTAMP, (Gauge<Number>) () -> 42),
"value=42");
verifyPoint(
MetricMapper.map(INFO, TIMESTAMP, (Gauge<Number>) () -> null),
"value=null");
verifyPoint(
MetricMapper.map(INFO, TIMESTAMP, (Gauge<String>) () -> "hello"),
"value=hello");
verifyPoint(
MetricMapper.map(INFO, TIMESTAMP, (Gauge<Long>) () -> 42L),
"value=42");
}
示例12
/**
* Tests that heap/non-heap metrics do not rely on a static MemoryUsage instance.
*
* <p>We can only check this easily for the currently used heap memory, so we use it this as a proxy for testing
* the functionality in general.
*/
@Test
public void testHeapMetrics() throws Exception {
final InterceptingOperatorMetricGroup heapMetrics = new InterceptingOperatorMetricGroup();
MetricUtils.instantiateHeapMemoryMetrics(heapMetrics);
@SuppressWarnings("unchecked")
final Gauge<Long> used = (Gauge<Long>) heapMetrics.get(MetricNames.MEMORY_USED);
final long usedHeapInitially = used.getValue();
// check memory usage difference multiple times since other tests may affect memory usage as well
for (int x = 0; x < 10; x++) {
final byte[] array = new byte[1024 * 1024 * 8];
final long usedHeapAfterAllocation = used.getValue();
if (usedHeapInitially != usedHeapAfterAllocation) {
return;
}
Thread.sleep(50);
}
Assert.fail("Heap usage metric never changed it's value.");
}
示例13
@Test
public void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
Gauge<Integer> gauge1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 3;
}
};
Gauge<Integer> gauge2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 4;
}
};
taskMetricGroup1.gauge("my_gauge", gauge1);
taskMetricGroup2.gauge("my_gauge", gauge2);
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1),
equalTo(3.));
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2),
equalTo(4.));
}
示例14
@Test
public void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
Gauge<Integer> gauge1 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 3;
}
};
Gauge<Integer> gauge2 = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 4;
}
};
taskMetricGroup1.gauge("my_gauge", gauge1);
taskMetricGroup2.gauge("my_gauge", gauge2);
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues1),
equalTo(3.));
assertThat(CollectorRegistry.defaultRegistry.getSampleValue("flink_taskmanager_job_task_my_gauge", LABEL_NAMES, labelValues2),
equalTo(4.));
}
示例15
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String name = group.getMetricIdentifier(metricName);
List<String> tags = new ArrayList<>(configTags);
tags.addAll(getTagsFromMetricGroup(group));
String host = getHostFromMetricGroup(group);
if (metric instanceof Counter) {
Counter c = (Counter) metric;
counters.put(c, new DCounter(c, name, host, tags));
} else if (metric instanceof Gauge) {
Gauge g = (Gauge) metric;
gauges.put(g, new DGauge(g, name, host, tags));
} else if (metric instanceof Meter) {
Meter m = (Meter) metric;
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags));
} else if (metric instanceof Histogram) {
LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
} else {
LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
示例16
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final MetricInfo metricInfo = metricInfoProvider.getMetricInfo(metricName, group);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, metricInfo);
} else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, metricInfo);
} else if (metric instanceof Histogram) {
histograms.put((Histogram) metric, metricInfo);
} else if (metric instanceof Meter) {
meters.put((Meter) metric, metricInfo);
} else {
log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例17
@Test
public void testCreateDump() throws Exception {
MetricQueryService queryService = MetricQueryService.createMetricQueryService(rpcService, ResourceID.generate(), Long.MAX_VALUE);
queryService.start();
final Counter c = new SimpleCounter();
final Gauge<String> g = () -> "Hello";
final Histogram h = new TestHistogram();
final Meter m = new TestMeter();
final TaskManagerMetricGroup tm = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
queryService.addMetric("counter", c, tm);
queryService.addMetric("gauge", g, tm);
queryService.addMetric("histogram", h, tm);
queryService.addMetric("meter", m, tm);
MetricDumpSerialization.MetricSerializationResult dump = queryService.queryMetrics(TIMEOUT).get();
assertTrue(dump.serializedCounters.length > 0);
assertTrue(dump.serializedGauges.length > 0);
assertTrue(dump.serializedHistograms.length > 0);
assertTrue(dump.serializedMeters.length > 0);
queryService.removeMetric(c);
queryService.removeMetric(g);
queryService.removeMetric(h);
queryService.removeMetric(m);
MetricDumpSerialization.MetricSerializationResult emptyDump = queryService.queryMetrics(TIMEOUT).get();
assertEquals(0, emptyDump.serializedCounters.length);
assertEquals(0, emptyDump.serializedGauges.length);
assertEquals(0, emptyDump.serializedHistograms.length);
assertEquals(0, emptyDump.serializedMeters.length);
}
示例18
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String fullName = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
}
else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);
if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);
if (meter instanceof DropwizardMeterWrapper) {
registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例19
private static void serializeGauge(DataOutput out, QueryScopeInfo info, String name, Gauge<?> gauge) throws IOException {
Object value = gauge.getValue();
if (value == null) {
throw new NullPointerException("Value returned by gauge " + name + " was null.");
}
String stringValue = value.toString();
if (stringValue == null) {
throw new NullPointerException("toString() of the value returned by gauge " + name + " returned null.");
}
serializeMetricInfo(out, info);
out.writeUTF(name);
out.writeUTF(stringValue);
}
示例20
private void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) {
if (metric instanceof Gauge) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Counter) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Meter) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Histogram) {
((HistogramSummaryProxy) collector).remove(dimensionValues);
} else {
log.warn("Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
metric.getClass().getName());
}
}
示例21
@Test
public void shortGaugeIsConvertedCorrectly() {
assertThat(reporter.gaugeFrom(new Gauge<Short>() {
@Override
public Short getValue() {
return 13;
}
}).get(), equalTo(13.));
}
示例22
private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
gcGroup.<Long, Gauge<Long>>gauge("Count", garbageCollector::getCollectionCount);
gcGroup.<Long, Gauge<Long>>gauge("Time", garbageCollector::getCollectionTime);
}
}
示例23
private static void instantiateCPUMetrics(MetricGroup metrics) {
try {
final com.sun.management.OperatingSystemMXBean mxBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad);
metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime);
} catch (Exception e) {
LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
" - CPU load metrics will not be available.", e);
}
}
示例24
private Gauge<Long> getTotalQueueLenGauge() {
return new Gauge<Long>() {
@Override
public Long getValue() {
return refreshAndGetTotal();
}
};
}
示例25
private Gauge<Integer> getMinQueueLenGauge() {
return new Gauge<Integer>() {
@Override
public Integer getValue() {
return refreshAndGetMin();
}
};
}
示例26
private Gauge<Long> getTotalQueueLenGauge() {
return new Gauge<Long>() {
@Override
public Long getValue() {
return refreshAndGetTotal();
}
};
}
示例27
private Gauge<Integer> getMaxQueueLenGauge() {
return new Gauge<Integer>() {
@Override
public Integer getValue() {
return refreshAndGetMax();
}
};
}
示例28
@Test
public void testNullGaugeHandling() throws IOException {
MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer();
MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();
Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
gauges.put(new Gauge<Object>() {
@Override
public Object getValue() {
return null;
}
}, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
MetricDumpSerialization.MetricSerializationResult output = serializer.serialize(
Collections.<Counter, Tuple2<QueryScopeInfo, String>>emptyMap(),
gauges,
Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(),
Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap());
// no metrics should be serialized
Assert.assertEquals(0, output.serializedCounters.length);
Assert.assertEquals(0, output.serializedGauges.length);
Assert.assertEquals(0, output.serializedHistograms.length);
Assert.assertEquals(0, output.serializedMeters.length);
List<MetricDump> deserialized = deserializer.deserialize(output);
Assert.assertEquals(0, deserialized.size());
}
示例29
/**
* Prometheus only supports numbers, so report non-numeric gauges as 0.
*/
@Test
public void stringGaugeCannotBeConverted() {
assertThat(reporter.gaugeFrom(new Gauge<String>() {
@Override
public String getValue() {
return "I am not a number";
}
}).get(), equalTo(0.));
}
示例30
private Gauge<Float> getAvgQueueLenGauge() {
return new Gauge<Float>() {
@Override
public Float getValue() {
return refreshAndGetAvg();
}
};
}