Java源码示例:org.apache.storm.metric.api.IMetricsConsumer

示例1
@Test
@Ignore // TODO: Fix for failover
public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);
  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
  verify(timelineMetricsCache);
}
 
示例2
@Test
@Ignore // TODO: Fix for failover
public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);
  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42)));
  verify(timelineMetricsCache);
}
 
示例3
@Test
@Ignore // TODO: Fix for failover
public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);
  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42)));
  verify(timelineMetricsCache);
}
 
示例4
@Test
@Ignore // TODO: Fix for failover
public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
      .andReturn(new TimelineMetric()).once();
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);

  Map<String, Object> valueMap = getTestValueMap();
  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
  verify(timelineMetricsCache);
}
 
示例5
@Test
@Ignore // TODO: Fix for failover
public void testWorkerLevelAggregatedNumericMetricMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);

  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint("key1", Lists.newArrayList(42.3, 42.3))));
  verify(timelineMetricsCache);
}
 
示例6
@Test
@Ignore // TODO: Fix for failover
public void testWorkerLevelAggregatedMapMetricMetricSubmission() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1.field1"))
      .andReturn(new TimelineMetric()).once();
  timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
  expectLastCall().once();
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);

  List<Map<String, Object>> valueMapList = new ArrayList<>();
  valueMapList.add(getTestValueMap());
  valueMapList.add(getTestValueMap());

  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMapList)));
  verify(timelineMetricsCache);
}
 
示例7
/**
 * Goes over the specified metrics and if both execute-count and execute-latency are present,
 * computes the capacity metric according to the formula capacity = execute-count * execute-latency / time-window-ms
 *
 * @param component2metrics metrics keyed by component name.
 * @param taskInfo          additional task information pertaining to the reporting task.
 * @return The capacity metrics that were calculated based on the specified input metrics.
 */
public static ImmutableList<Metric> calculateCapacityMetrics(final Map<String, List<Metric>> component2metrics,
                                                             final IMetricsConsumer.TaskInfo taskInfo) {

  final Function<Map.Entry<String, List<Metric>>, Optional<Metric>> toCapacityMetric =
          new Function<Map.Entry<String, List<Metric>>, Optional<Metric>>() {
            @Override
            public Optional<Metric> apply(final Map.Entry<String, List<Metric>> componentMetrics) {

              final String component = componentMetrics.getKey();
              final FluentIterable<Metric> metrics = FluentIterable.from(componentMetrics.getValue());
              final Optional<Metric> count = metrics.firstMatch(isExecuteCountMetric);
              final Optional<Metric> latency = metrics.firstMatch(isExecuteLatencyMetric);

              return calculateCapacityMetric(component, count, latency, taskInfo.updateIntervalSecs);
            }
          };

  return FluentIterable
          .from(component2metrics.entrySet())
          .transform(toCapacityMetric)
          .filter(Metric.Option.isPresent)
          .transform(Metric.Option.getValue)
          .toList();
}
 
示例8
@Test
public void testValidJMXObjectName() throws Exception {

  final String topologyName = "someTopology";

  Map config = new HashMap();
  config.put(Config.TOPOLOGY_NAME, topologyName);
  processor = new SimpleJMXStormMetricProcessor(config);

  Metric metric = new Metric("component", "kafkaPartition{host=kafka_9092, partition=0}", 1.9);
  IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo("localhost", 1010, "emitBot", 2, System.currentTimeMillis(), 100);

  String name = processor.mBeanName(metric, taskInfo);
  ObjectName objName = new ObjectName(name);

  assertThat(objName.getCanonicalName(), is("storm:component=component,host-port-task=localhost-1010-2,operation=\"kafkaPartition{host=kafka_9092, partition=0}\",topology=someTopology"));
}
 
示例9
@Test
public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
  StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
  stormTimelineMetricsSink.setTopologyName("topology1");
  TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
  stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
  replay(timelineMetricsCache);
  stormTimelineMetricsSink.handleDataPoints(
      new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
      Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
  verify(timelineMetricsCache);
}
 
示例10
/**
 * Creates an instance of a custom {@link IMetricsConsumer} to add for custom metrics logging for the topology. It
 * then calls a static method with the signature register({@link Config}, {@link BulletStormConfig}) to let that
 * class define any custom metrics and parallelisms etc using the {@link BulletStormConfig} into the Storm
 * {@link Config}.
 *
 * @param className The name of the custom {@link IMetricsConsumer}.
 * @param stormConfig The Storm configuration that would be used by the register method.
 * @param bulletConfig The Bullet configuration to pass to the register method.
 */
public static void registerMetricsConsumer(String className, Config stormConfig, BulletStormConfig bulletConfig) {
    try {
        Class<? extends IMetricsConsumer> consumer = (Class<? extends IMetricsConsumer>) Class.forName(className);
        Method method = consumer.getMethod(REGISTER_METHOD, Config.class, BulletStormConfig.class);
        log.info("Calling the IMetricsConsumer register method for class {} using method {}", className, method.toGenericString());
        method.invoke(null, stormConfig, bulletConfig);
        log.info("Registered the IMetricsConsumer class {}", className);
    } catch (Exception e) {
        log.info("Could not call the register method for " + className, e);
    }
}
 
示例11
/**
 * Checks to see if the given class name is an instance of a custom {@link IMetricsConsumer} that can be used in
 * the topology. That class must define a static method with the signature
 * register({@link Config}, {@link BulletStormConfig}) to let that class define any custom metrics and
 * parallelisms etc using the {@link BulletStormConfig} into the Storm {@link Config}.
 *
 * @param className The name of the custom {@link IMetricsConsumer}.
 * @return A boolean denoting whether the class represented by the name is a valid custom metrics consumer.
 */
public static boolean isIMetricsConsumer(String className) {
    try {
        Class<? extends IMetricsConsumer> consumer = (Class<? extends IMetricsConsumer>) Class.forName(className);
        consumer.getMethod(REGISTER_METHOD, Config.class, BulletStormConfig.class);
    } catch (Exception e) {
        log.warn("The given class: {} was not a proper IMetricsConsumer with a {} method", className, REGISTER_METHOD);
        log.warn("Exception: {}", e);
        return false;
    }
    return true;
}
 
示例12
@Override
public MetricName name(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo) {
    return new MetricName(Metric.joinNameFragments(taskInfo.srcWorkerHost,
                    taskInfo.srcWorkerPort,
                    metric.getComponent()),
                    Integer.toString(taskInfo.srcTaskId),
                    metric.getOperation());
}
 
示例13
@Override
public void process(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo) {

    final MetricName metricName = name(metric, taskInfo);
    try {
        createOrUpdateGauge(metric, metricName);
    } catch (final Exception e) {
        LOG.error(String.format("Unable to process metric %s", metricName.toString()), e);
    }
}
 
示例14
String mBeanName(Metric metric, IMetricsConsumer.TaskInfo taskInfo) {
    return "storm"
            + ":topology=" + topologyName
            + ",component=" + metric.getComponent()
            + ",operation=" + ObjectName.quote(metric.getOperation())
            + ",host-port-task=" + String.format("%s-%s-%s", taskInfo.srcWorkerHost
                ,taskInfo.srcWorkerPort
                ,taskInfo.srcTaskId);
}
 
示例15
@Override
public MetricName name(Metric metric, IMetricsConsumer.TaskInfo taskInfo) {
    return new MetricName("storm",
            topologyName,
            metric.getComponent(),
            metric.getOperation(),
            mBeanName(metric, taskInfo));
}
 
示例16
/**
 * Returns the metric name for the storm metric produced by a task.
 *
 * @param metric storm metric object
 * @param taskInfo information about the task that generates the metric
 * @return the name for the yammer metric
 */
MetricName name(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo);
 
示例17
/**
 * Processes the storm metric
 *
 * @param metric storm metric object
 * @param taskInfo information about the task that generates the metric
 */
void process(final Metric metric, final IMetricsConsumer.TaskInfo taskInfo);