Java源码示例:org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper

示例1
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
	final String fullName = group.getMetricIdentifier(metricName, this);

	synchronized (this) {
		if (metric instanceof Counter) {
			counters.put((Counter) metric, fullName);
			registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
		}
		else if (metric instanceof Gauge) {
			gauges.put((Gauge<?>) metric, fullName);
			registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
		} else if (metric instanceof Histogram) {
			Histogram histogram = (Histogram) metric;
			histograms.put(histogram, fullName);

			if (histogram instanceof DropwizardHistogramWrapper) {
				registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
			} else {
				registry.register(fullName, new FlinkHistogramWrapper(histogram));
			}
		} else if (metric instanceof Meter) {
			Meter meter = (Meter) metric;
			meters.put(meter, fullName);

			if (meter instanceof DropwizardMeterWrapper) {
				registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
			} else {
				registry.register(fullName, new FlinkMeterWrapper(meter));
			}
		} else {
			log.warn("Cannot add metric of type {}. This indicates that the reporter " +
				"does not support this metric type.", metric.getClass().getName());
		}
	}
}
 
示例2
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
	final String fullName = group.getMetricIdentifier(metricName, this);

	synchronized (this) {
		if (metric instanceof Counter) {
			counters.put((Counter) metric, fullName);
			registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
		}
		else if (metric instanceof Gauge) {
			gauges.put((Gauge<?>) metric, fullName);
			registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
		} else if (metric instanceof Histogram) {
			Histogram histogram = (Histogram) metric;
			histograms.put(histogram, fullName);

			if (histogram instanceof DropwizardHistogramWrapper) {
				registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
			} else {
				registry.register(fullName, new FlinkHistogramWrapper(histogram));
			}
		} else if (metric instanceof Meter) {
			Meter meter = (Meter) metric;
			meters.put(meter, fullName);

			if (meter instanceof DropwizardMeterWrapper) {
				registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
			} else {
				registry.register(fullName, new FlinkMeterWrapper(meter));
			}
		} else {
			log.warn("Cannot add metric of type {}. This indicates that the reporter " +
				"does not support this metric type.", metric.getClass().getName());
		}
	}
}
 
示例3
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
	final String fullName = group.getMetricIdentifier(metricName, this);

	synchronized (this) {
		if (metric instanceof Counter) {
			counters.put((Counter) metric, fullName);
			registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
		}
		else if (metric instanceof Gauge) {
			gauges.put((Gauge<?>) metric, fullName);
			registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
		} else if (metric instanceof Histogram) {
			Histogram histogram = (Histogram) metric;
			histograms.put(histogram, fullName);

			if (histogram instanceof DropwizardHistogramWrapper) {
				registry.register(fullName, ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
			} else {
				registry.register(fullName, new FlinkHistogramWrapper(histogram));
			}
		} else if (metric instanceof Meter) {
			Meter meter = (Meter) metric;
			meters.put(meter, fullName);

			if (meter instanceof DropwizardMeterWrapper) {
				registry.register(fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
			} else {
				registry.register(fullName, new FlinkMeterWrapper(meter));
			}
		} else {
			log.warn("Cannot add metric of type {}. This indicates that the reporter " +
				"does not support this metric type.", metric.getClass().getName());
		}
	}
}
 
示例4
/**
 * Tests that the registered metrics' names don't contain invalid characters.
 */
@Test
public void testAddingMetrics() throws Exception {
	Configuration configuration = new Configuration();
	String taskName = "test\"Ta\"..sk";
	String jobName = "testJ\"ob:-!ax..?";
	String hostname = "loc<>al\"::host\".:";
	String taskManagerId = "tas:kMana::ger";
	String counterName = "testCounter";

	configuration.setString(
			ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
			"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");

	configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
	configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

	MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

	MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);

	char delimiter = metricRegistry.getDelimiter();

	TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
	TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
	TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

	SimpleCounter myCounter = new SimpleCounter();
	com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
	DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);

	taskMetricGroup.counter(counterName, myCounter);
	taskMetricGroup.meter("meter", meterWrapper);

	List<MetricReporter> reporters = metricRegistry.getReporters();

	assertTrue(reporters.size() == 1);
	MetricReporter metricReporter = reporters.get(0);

	assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);

	TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;

	Map<Counter, String> counters = reporter.getCounters();
	assertTrue(counters.containsKey(myCounter));

	Map<Meter, String> meters = reporter.getMeters();
	assertTrue(meters.containsKey(meterWrapper));

	String expectedCounterName = reporter.filterCharacters(hostname)
		+ delimiter
		+ reporter.filterCharacters(taskManagerId)
		+ delimiter
		+ reporter.filterCharacters(jobName)
		+ delimiter
		+ reporter.filterCharacters(counterName);

	assertEquals(expectedCounterName, counters.get(myCounter));

	metricRegistry.shutdown().get();
}
 
示例5
public static void main(String[] args) throws Exception {
        //创建流运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//        env.setParallelism(1);
        env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> out) throws Exception {
                while (isRunning) {
                    out.collect(String.valueOf(Math.round(Math.random() * 100)));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).map(new RichMapFunction<String, String>() {
            Meter meter;
            int index;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
                index = getRuntimeContext().getIndexOfThisSubtask() + 1;
                meter = getRuntimeContext().getMetricGroup()
                        .addGroup("flink-metrics-test")
                        .meter("meterTest", new DropwizardMeterWrapper(dropwizardMeter));
            }

            @Override
            public String map(String s) throws Exception {
                meter.markEvent();
                System.out.println("index = " + index + " rate = " + meter.getRate() + " count = " + meter.getCount());
                return s;
            }
        }).print();

        env.execute("Flink custom Meter Metrics");
    }
 
示例6
/**
 * Tests that the registered metrics' names don't contain invalid characters.
 */
@Test
public void testAddingMetrics() throws Exception {
	Configuration configuration = new Configuration();
	String taskName = "test\"Ta\"..sk";
	String jobName = "testJ\"ob:-!ax..?";
	String hostname = "loc<>al\"::host\".:";
	String taskManagerId = "tas:kMana::ger";
	String counterName = "testCounter";

	configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
	configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

	MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

	MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
		metricRegistryConfiguration,
		Collections.singletonList(ReporterSetup.forReporter("test", new TestingScheduledDropwizardReporter())));

	char delimiter = metricRegistry.getDelimiter();

	TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
	TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
	TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

	SimpleCounter myCounter = new SimpleCounter();
	com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
	DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);

	taskMetricGroup.counter(counterName, myCounter);
	taskMetricGroup.meter("meter", meterWrapper);

	List<MetricReporter> reporters = metricRegistry.getReporters();

	assertTrue(reporters.size() == 1);
	MetricReporter metricReporter = reporters.get(0);

	assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);

	TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;

	Map<Counter, String> counters = reporter.getCounters();
	assertTrue(counters.containsKey(myCounter));

	Map<Meter, String> meters = reporter.getMeters();
	assertTrue(meters.containsKey(meterWrapper));

	String expectedCounterName = reporter.filterCharacters(hostname)
		+ delimiter
		+ reporter.filterCharacters(taskManagerId)
		+ delimiter
		+ reporter.filterCharacters(jobName)
		+ delimiter
		+ reporter.filterCharacters(counterName);

	assertEquals(expectedCounterName, counters.get(myCounter));

	metricRegistry.shutdown().get();
}
 
示例7
public static void main(String[] args) throws Exception {
        //创建流运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//        env.setParallelism(1);
        env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> out) throws Exception {
                while (isRunning) {
                    out.collect(String.valueOf(Math.round(Math.random() * 100)));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).map(new RichMapFunction<String, String>() {
            Meter meter;
            int index;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
                index = getRuntimeContext().getIndexOfThisSubtask() + 1;
                meter = getRuntimeContext().getMetricGroup()
                        .addGroup("flink-metrics-test")
                        .meter("meterTest", new DropwizardMeterWrapper(dropwizardMeter));
            }

            @Override
            public String map(String s) throws Exception {
                meter.markEvent();
                System.out.println("index = " + index + " rate = " + meter.getRate() + " count = " + meter.getCount());
                return s;
            }
        }).print();

        env.execute("Flink custom Meter Metrics");
    }
 
示例8
/**
 * Tests that the registered metrics' names don't contain invalid characters.
 */
@Test
public void testAddingMetrics() throws Exception {
	Configuration configuration = new Configuration();
	String taskName = "test\"Ta\"..sk";
	String jobName = "testJ\"ob:-!ax..?";
	String hostname = "loc<>al\"::host\".:";
	String taskManagerId = "tas:kMana::ger";
	String counterName = "testCounter";

	configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
	configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

	MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

	MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
		metricRegistryConfiguration,
		Collections.singletonList(ReporterSetup.forReporter("test", new TestingScheduledDropwizardReporter())));

	char delimiter = metricRegistry.getDelimiter();

	TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
	TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
	TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

	SimpleCounter myCounter = new SimpleCounter();
	com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
	DropwizardMeterWrapper meterWrapper = new DropwizardMeterWrapper(dropwizardMeter);

	taskMetricGroup.counter(counterName, myCounter);
	taskMetricGroup.meter("meter", meterWrapper);

	List<MetricReporter> reporters = metricRegistry.getReporters();

	assertTrue(reporters.size() == 1);
	MetricReporter metricReporter = reporters.get(0);

	assertTrue("Reporter should be of type ScheduledDropwizardReporter", metricReporter instanceof ScheduledDropwizardReporter);

	TestingScheduledDropwizardReporter reporter = (TestingScheduledDropwizardReporter) metricReporter;

	Map<Counter, String> counters = reporter.getCounters();
	assertTrue(counters.containsKey(myCounter));

	Map<Meter, String> meters = reporter.getMeters();
	assertTrue(meters.containsKey(meterWrapper));

	String expectedCounterName = reporter.filterCharacters(hostname)
		+ delimiter
		+ reporter.filterCharacters(taskManagerId)
		+ delimiter
		+ reporter.filterCharacters(jobName)
		+ delimiter
		+ reporter.filterCharacters(counterName);

	assertEquals(expectedCounterName, counters.get(myCounter));

	metricRegistry.shutdown().get();
}