Java源码示例:org.apache.storm.metric.api.IMetricsConsumer
示例1
@Test
@Ignore // TODO: Fix for failover
public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
verify(timelineMetricsCache);
}
示例2
@Test
@Ignore // TODO: Fix for failover
public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42)));
verify(timelineMetricsCache);
}
示例3
@Test
@Ignore // TODO: Fix for failover
public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42)));
verify(timelineMetricsCache);
}
示例4
@Test
@Ignore // TODO: Fix for failover
public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
.andReturn(new TimelineMetric()).once();
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
Map<String, Object> valueMap = getTestValueMap();
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
verify(timelineMetricsCache);
}
示例5
@Test
@Ignore // TODO: Fix for failover
public void testWorkerLevelAggregatedNumericMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint("key1", Lists.newArrayList(42.3, 42.3))));
verify(timelineMetricsCache);
}
示例6
@Test
@Ignore // TODO: Fix for failover
public void testWorkerLevelAggregatedMapMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1.field1"))
.andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
List<Map<String, Object>> valueMapList = new ArrayList<>();
valueMapList.add(getTestValueMap());
valueMapList.add(getTestValueMap());
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMapList)));
verify(timelineMetricsCache);
}
示例7
/**
* Goes over the specified metrics and if both execute-count and execute-latency are present,
* computes the capacity metric according to the formula capacity = execute-count * execute-latency / time-window-ms
*
* @param component2metrics metrics keyed by component name.
* @param taskInfo additional task information pertaining to the reporting task.
* @return The capacity metrics that were calculated based on the specified input metrics.
*/
public static ImmutableList<Metric> calculateCapacityMetrics(final Map<String, List<Metric>> component2metrics,
final IMetricsConsumer.TaskInfo taskInfo) {
final Function<Map.Entry<String, List<Metric>>, Optional<Metric>> toCapacityMetric =
new Function<Map.Entry<String, List<Metric>>, Optional<Metric>>() {
@Override
public Optional<Metric> apply(final Map.Entry<String, List<Metric>> componentMetrics) {
final String component = componentMetrics.getKey();
final FluentIterable<Metric> metrics = FluentIterable.from(componentMetrics.getValue());
final Optional<Metric> count = metrics.firstMatch(isExecuteCountMetric);
final Optional<Metric> latency = metrics.firstMatch(isExecuteLatencyMetric);
return calculateCapacityMetric(component, count, latency, taskInfo.updateIntervalSecs);
}
};
return FluentIterable
.from(component2metrics.entrySet())
.transform(toCapacityMetric)
.filter(Metric.Option.isPresent)
.transform(Metric.Option.getValue)
.toList();
}
示例8
@Test
public void testValidJMXObjectName() throws Exception {
final String topologyName = "someTopology";
Map config = new HashMap();
config.put(Config.TOPOLOGY_NAME, topologyName);
processor = new SimpleJMXStormMetricProcessor(config);
Metric metric = new Metric("component", "kafkaPartition{host=kafka_9092, partition=0}", 1.9);
IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo("localhost", 1010, "emitBot", 2, System.currentTimeMillis(), 100);
String name = processor.mBeanName(metric, taskInfo);
ObjectName objName = new ObjectName(name);
assertThat(objName.getCanonicalName(), is("storm:component=component,host-port-task=localhost-1010-2,operation=\"kafkaPartition{host=kafka_9092, partition=0}\",topology=someTopology"));
}
示例9
@Test
public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
stormTimelineMetricsSink.setTopologyName("topology1");
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
replay(timelineMetricsCache);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
verify(timelineMetricsCache);
}
示例10
/**
* Creates an instance of a custom {@link IMetricsConsumer} to add for custom metrics logging for the topology. It
* then calls a static method with the signature register({@link Config}, {@link BulletStormConfig}) to let that
* class define any custom metrics and parallelisms etc using the {@link BulletStormConfig} into the Storm
* {@link Config}.
*
* @param className The name of the custom {@link IMetricsConsumer}.
* @param stormConfig The Storm configuration that would be used by the register method.
* @param bulletConfig The Bullet configuration to pass to the register method.
*/
public static void registerMetricsConsumer(String className, Config stormConfig, BulletStormConfig bulletConfig) {
try {
Class<? extends IMetricsConsumer> consumer = (Class<? extends IMetricsConsumer>) Class.forName(className);
Method method = consumer.getMethod(REGISTER_METHOD, Config.class, BulletStormConfig.class);
log.info("Calling the IMetricsConsumer register method for class {} using method {}", className, method.toGenericString());
method.invoke(null, stormConfig, bulletConfig);
log.info("Registered the IMetricsConsumer class {}", className);
} catch (Exception e) {
log.info("Could not call the register method for " + className, e);
}
}
示例11
/**
* Checks to see if the given class name is an instance of a custom {@link IMetricsConsumer} that can be used in
* the topology. That class must define a static method with the signature
* register({@link Config}, {@link BulletStormConfig}) to let that class define any custom metrics and
* parallelisms etc using the {@link BulletStormConfig} into the Storm {@link Config}.
*
* @param className The name of the custom {@link IMetricsConsumer}.
* @return A boolean denoting whether the class represented by the name is a valid custom metrics consumer.
*/
public static boolean isIMetricsConsumer(String className) {
try {
Class<? extends IMetricsConsumer> consumer = (Class<? extends IMetricsConsumer>) Class.forName(className);
consumer.getMethod(REGISTER_METHOD, Config.class, BulletStormConfig.class);
} catch (Exception e) {
log.warn("The given class: {} was not a proper IMetricsConsumer with a {} method", className, REGISTER_METHOD);
log.warn("Exception: {}", e);
return false;
}
return true;
}
示例12
@Override
public MetricName name(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo) {
return new MetricName(Metric.joinNameFragments(taskInfo.srcWorkerHost,
taskInfo.srcWorkerPort,
metric.getComponent()),
Integer.toString(taskInfo.srcTaskId),
metric.getOperation());
}
示例13
@Override
public void process(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo) {
final MetricName metricName = name(metric, taskInfo);
try {
createOrUpdateGauge(metric, metricName);
} catch (final Exception e) {
LOG.error(String.format("Unable to process metric %s", metricName.toString()), e);
}
}
示例14
String mBeanName(Metric metric, IMetricsConsumer.TaskInfo taskInfo) {
return "storm"
+ ":topology=" + topologyName
+ ",component=" + metric.getComponent()
+ ",operation=" + ObjectName.quote(metric.getOperation())
+ ",host-port-task=" + String.format("%s-%s-%s", taskInfo.srcWorkerHost
,taskInfo.srcWorkerPort
,taskInfo.srcTaskId);
}
示例15
@Override
public MetricName name(Metric metric, IMetricsConsumer.TaskInfo taskInfo) {
return new MetricName("storm",
topologyName,
metric.getComponent(),
metric.getOperation(),
mBeanName(metric, taskInfo));
}
示例16
/**
* Returns the metric name for the storm metric produced by a task.
*
* @param metric storm metric object
* @param taskInfo information about the task that generates the metric
* @return the name for the yammer metric
*/
MetricName name(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo);
示例17
/**
* Processes the storm metric
*
* @param metric storm metric object
* @param taskInfo information about the task that generates the metric
*/
void process(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo);