Java源码示例:org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper
示例1
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String fullName = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
}
else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);
if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);
if (meter instanceof DropwizardMeterWrapper) {
registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例2
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String fullName = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
}
else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);
if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);
if (meter instanceof DropwizardMeterWrapper) {
registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例3
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
final String fullName = group.getMetricIdentifier(metricName, this);
synchronized (this) {
if (metric instanceof Counter) {
counters.put((Counter) metric, fullName);
registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
}
else if (metric instanceof Gauge) {
gauges.put((Gauge<?>) metric, fullName);
registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
histograms.put(histogram, fullName);
if (histogram instanceof DropwizardHistogramWrapper) {
registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
} else {
registry.register(fullName, new FlinkHistogramWrapper(histogram));
}
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
meters.put(meter, fullName);
if (meter instanceof DropwizardMeterWrapper) {
registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
} else {
registry.register(fullName, new FlinkMeterWrapper(meter));
}
} else {
log.warn("Cannot add metric of type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
}
}
}
示例4
/**
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
String hostname = "loc<>al\"::host\".:";
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);
char delimiter = metricRegistry.getDelimiter();
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);
SimpleCounter myCounter = new SimpleCounter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);
taskMetricGroup.counter(counterName, myCounter);
taskMetricGroup.meter("meter", meterWrapper);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);
TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;
Map<Counter, String> counters = reporter.getCounters();
assertTrue(counters.containsKey(myCounter));
Map<Meter, String> meters = reporter.getMeters();
assertTrue(meters.containsKey(meterWrapper));
String expectedCounterName = reporter.filterCharacters(hostname)
+ delimiter
+ reporter.filterCharacters(taskManagerId)
+ delimiter
+ reporter.filterCharacters(jobName)
+ delimiter
+ reporter.filterCharacters(counterName);
assertEquals(expectedCounterName, counters.get(myCounter));
metricRegistry.shutdown().get();
}
示例5
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
// env.setParallelism(1);
env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> out) throws Exception {
while (isRunning) {
out.collect(String.valueOf(Math.round(Math.random() * 100)));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).map(new RichMapFunction<String, String>() {
Meter meter;
int index;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
index = getRuntimeContext().getIndexOfThisSubtask() + 1;
meter = getRuntimeContext().getMetricGroup()
.addGroup("flink-metrics-test")
.meter("meterTest", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public String map(String s) throws Exception {
meter.markEvent();
System.out.println("index = " + index + " rate = " + meter.getRate() + " count = " + meter.getCount());
return s;
}
}).print();
env.execute("Flink custom Meter Metrics");
}
示例6
/**
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
String hostname = "loc<>al\"::host\".:";
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
metricRegistryConfiguration,
Collections.singletonList(ReporterSetup.forReporter("test", new TestingScheduledDropwizardReporter())));
char delimiter = metricRegistry.getDelimiter();
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);
SimpleCounter myCounter = new SimpleCounter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);
taskMetricGroup.counter(counterName, myCounter);
taskMetricGroup.meter("meter", meterWrapper);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);
TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;
Map<Counter, String> counters = reporter.getCounters();
assertTrue(counters.containsKey(myCounter));
Map<Meter, String> meters = reporter.getMeters();
assertTrue(meters.containsKey(meterWrapper));
String expectedCounterName = reporter.filterCharacters(hostname)
+ delimiter
+ reporter.filterCharacters(taskManagerId)
+ delimiter
+ reporter.filterCharacters(jobName)
+ delimiter
+ reporter.filterCharacters(counterName);
assertEquals(expectedCounterName, counters.get(myCounter));
metricRegistry.shutdown().get();
}
示例7
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
// env.setParallelism(1);
env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> out) throws Exception {
while (isRunning) {
out.collect(String.valueOf(Math.round(Math.random() * 100)));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).map(new RichMapFunction<String, String>() {
Meter meter;
int index;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
index = getRuntimeContext().getIndexOfThisSubtask() + 1;
meter = getRuntimeContext().getMetricGroup()
.addGroup("flink-metrics-test")
.meter("meterTest", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public String map(String s) throws Exception {
meter.markEvent();
System.out.println("index = " + index + " rate = " + meter.getRate() + " count = " + meter.getCount());
return s;
}
}).print();
env.execute("Flink custom Meter Metrics");
}
示例8
/**
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
String hostname = "loc<>al\"::host\".:";
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";
configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
metricRegistryConfiguration,
Collections.singletonList(ReporterSetup.forReporter("test", new TestingScheduledDropwizardReporter())));
char delimiter = metricRegistry.getDelimiter();
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);
SimpleCounter myCounter = new SimpleCounter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);
taskMetricGroup.counter(counterName, myCounter);
taskMetricGroup.meter("meter", meterWrapper);
List<MetricReporter> reporters = metricRegistry.getReporters();
assertTrue(reporters.size() == 1);
MetricReporter metricReporter = reporters.get(0);
assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);
TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;
Map<Counter, String> counters = reporter.getCounters();
assertTrue(counters.containsKey(myCounter));
Map<Meter, String> meters = reporter.getMeters();
assertTrue(meters.containsKey(meterWrapper));
String expectedCounterName = reporter.filterCharacters(hostname)
+ delimiter
+ reporter.filterCharacters(taskManagerId)
+ delimiter
+ reporter.filterCharacters(jobName)
+ delimiter
+ reporter.filterCharacters(counterName);
assertEquals(expectedCounterName, counters.get(myCounter));
metricRegistry.shutdown().get();
}