Java源码示例:com.datatorrent.stram.plan.logical.LogicalPlanConfiguration

示例1
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorProperties(@PathParam("operatorName") String operatorName, @QueryParam("propertyName") String propertyName) throws IOException, JSONException
{
  init();
  OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
  BeanMap operatorProperties = null;
  if (logicalOperator == null) {
    ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
    if (logicalModule == null) {
      throw new NotFoundException();
    }
    operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
  } else {
    operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
  }

  Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
  return new JSONObject(objectMapper.writeValueAsString(m));
}
 
示例2
private void setOperatorProperty(OperatorMeta logicalOperator, String propertyName, String propertyValue)
{
  Map<String, String> properties = Collections.singletonMap(propertyName, propertyValue);
  LogicalPlanConfiguration.setOperatorProperties(logicalOperator.getOperator(), properties);

  List<PTOperator> operators = plan.getOperators(logicalOperator);
  for (PTOperator o : operators) {
    StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
    request.setOperatorId(o.getId());
    request.setPropertyKey(propertyName);
    request.setPropertyValue(propertyValue);
    addOperatorRequest(o, request);
    // re-apply to checkpointed state on deploy
    updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
  }
  // should probably not record it here because it's better to get confirmation from the operators first.
  // but right now, the operators do not give confirmation for the requests.  so record it here for now.
  recordEventAsync(new StramEvent.SetOperatorPropertyEvent(logicalOperator.getName(), propertyName, propertyValue));
}
 
示例3
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorProperties(@PathParam("operatorName") String operatorName, @QueryParam("propertyName") String propertyName) throws IOException, JSONException
{
  init();
  OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
  BeanMap operatorProperties = null;
  if (logicalOperator == null) {
    ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
    if (logicalModule == null) {
      throw new NotFoundException();
    }
    operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
  } else {
    operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
  }

  Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
  return new JSONObject(objectMapper.writeValueAsString(m));
}
 
示例4
private void setOperatorProperty(OperatorMeta logicalOperator, String propertyName, String propertyValue)
{
  Map<String, String> properties = Collections.singletonMap(propertyName, propertyValue);
  LogicalPlanConfiguration.setOperatorProperties(logicalOperator.getOperator(), properties);

  List<PTOperator> operators = plan.getOperators(logicalOperator);
  for (PTOperator o : operators) {
    StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
    request.setOperatorId(o.getId());
    request.setPropertyKey(propertyName);
    request.setPropertyValue(propertyValue);
    addOperatorRequest(o, request);
    // re-apply to checkpointed state on deploy
    updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
  }
  // should probably not record it here because it's better to get confirmation from the operators first.
  // but right now, the operators do not give confirmation for the requests.  so record it here for now.
  recordEventAsync(new StramEvent.SetOperatorPropertyEvent(logicalOperator.getName(), propertyName, propertyValue));
}
 
示例5
@Test
public void testMetrics() throws Exception
{
  CountDownLatch latch = new CountDownLatch(1);

  LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());

  TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);

  OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
  MockAggregator aggregator = new MockAggregator(latch);
  dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);

  dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());

  dag.addStream("TestTuples", inputOperator.outport, o1.inport1);

  lpc.prepareDAG(dag, null, "AutoMetricTest");

  StramLocalCluster lc = new StramLocalCluster(dag);
  lc.runAsync();
  latch.await();

  Assert.assertEquals("progress", 1L, ((Long)aggregator.result.get("progress")).longValue());
  lc.shutdown();
}
 
示例6
@Test
@Ignore
public void testMetricsAggregations() throws Exception
{
  CountDownLatch latch = new CountDownLatch(2);

  LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());

  TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);

  OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
  MockAggregator aggregator = new MockAggregator(latch);
  dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);

  dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
  dag.setOperatorAttribute(o1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2));

  dag.addStream("TestTuples", inputOperator.outport, o1.inport1);

  lpc.prepareDAG(dag, null, "AutoMetricTest");
  StramLocalCluster lc = new StramLocalCluster(dag);
  lc.runAsync();
  latch.await();
  Assert.assertEquals("progress", 2L, ((Long)aggregator.result.get("progress")).longValue());
  lc.shutdown();
}
 
示例7
@Test
public void testDefaultMetricsAggregator() throws Exception
{
  LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());

  TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);

  CountDownLatch latch = new CountDownLatch(1);
  OperatorAndAggregator o1 = dag.addOperator("o1", new OperatorAndAggregator(latch));

  dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());

  dag.addStream("TestTuples", inputOperator.outport, o1.inport1);

  lpc.prepareDAG(dag, null, "AutoMetricTest");

  LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1");
  Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());

  lpc.prepareDAG(dag, null, "AutoMetricTest");
  StramLocalCluster lc = new StramLocalCluster(dag);
  lc.runAsync();
  latch.await();
  Assert.assertEquals("progress", 1, o1.result.get("progress"));
  lc.shutdown();
}
 
示例8
@Test
public void testMetricsAnnotatedMethod() throws Exception
{
  CountDownLatch latch = new CountDownLatch(1);

  LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());

  TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);

  OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
  MockAggregator aggregator = new MockAggregator(latch);
  dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);

  dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());

  dag.addStream("TestTuples", inputOperator.outport, o1.inport1);

  lpc.prepareDAG(dag, null, "AutoMetricTest");

  StramLocalCluster lc = new StramLocalCluster(dag);
  lc.runAsync();
  latch.await();

  Assert.assertEquals("myMetric", 3, ((Integer)aggregator.result.get("myMetric")).intValue());
  lc.shutdown();
}
 
示例9
@Test
public void testModuleProperties()
{
  Configuration conf = new Configuration(false);
  conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
  conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
  conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
  conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
  conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");

  LogicalPlan dag = new LogicalPlan();
  TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule());
  TestModules.ValidationTestModule o2 = dag.addModule("o2", new TestModules.ValidationTestModule());

  LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);

  pb.setModuleProperties(dag, "testSetOperatorProperties");
  Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
  Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, o2.getStringArrayField());

  Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
  Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
  Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));

}
 
示例10
@Test
public void testLoadFromPropertiesFile() throws IOException
{
  Properties props = new Properties();
  String resourcePath = "/testModuleTopology.properties";
  InputStream is = this.getClass().getResourceAsStream(resourcePath);
  if (is == null) {
    throw new RuntimeException("Could not load " + resourcePath);
  }
  props.load(is);
  LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
      .addFromProperties(props, null);

  LogicalPlan dag = new LogicalPlan();
  pb.populateDAG(dag);
  pb.prepareDAG(dag, null, "testApplication");
  dag.validate();
  validateTopLevelOperators(dag);
  validateTopLevelStreams(dag);
  validatePublicMethods(dag);
}
 
示例11
@Test
public void testLoadFromJson() throws Exception
{
  String resourcePath = "/testModuleTopology.json";
  InputStream is = this.getClass().getResourceAsStream(resourcePath);
  if (is == null) {
    throw new RuntimeException("Could not load " + resourcePath);
  }
  StringWriter writer = new StringWriter();

  IOUtils.copy(is, writer);
  JSONObject json = new JSONObject(writer.toString());

  Configuration conf = new Configuration(false);
  conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");

  LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
  LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
  planConf.prepareDAG(dag, null, "testApplication");
  dag.validate();
  validateTopLevelOperators(dag);
  validateTopLevelStreams(dag);
  validatePublicMethods(dag);
}
 
示例12
@Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
  if (launchParameters != null) {
    for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
      String property = propMapping.get(entry.getKey());
      if (property != null) {
        setConfiguration(conf, property, entry.getValue());
      }
    }
  }
  try {
    String name = app.getClass().getName();
    StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
    appLauncher.loadDependencies();
    StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
    {
      @Override
      public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
      {
        return super.createApp(app, planConfig);
      }
    };
    ApplicationId appId = appLauncher.launchApp(appFactory);
    appLauncher.resetContextClassLoader();
    return new YarnAppHandleImpl(appId, conf);
  } catch (Exception ex) {
    throw new LauncherException(ex);
  }
}
 
示例13
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
  final Map<String, String> properties = Collections.singletonMap(propertyKey, propertyValue);
  logger.info("Setting property {} on operator {}", properties, operator);
  LogicalPlanConfiguration.setOperatorProperties(operator, properties);
  return null;
}
 
示例14
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
  try {
    return conf.createFromProperties(LogicalPlanConfiguration.readProperties(propertyFile.getAbsolutePath()), getName());
  } catch (IOException e) {
    throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
  }
}
 
示例15
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
  try {
    return conf.createFromJson(json, getName());
  } catch (Exception e) {
    throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
  }
}
 
示例16
public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
{
  this.jarFile = appJarFile;
  this.conf = conf;
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  init(this.jarFile.getName());
}
 
示例17
public StramAppLauncher(FileSystem fs, Path path, Configuration conf) throws Exception
{
  File jarsDir = new File(StramClientUtils.getUserDTDirectory(), "jars");
  jarsDir.mkdirs();
  File localJarFile = new File(jarsDir, path.getName());
  this.fs = fs;
  fs.copyToLocalFile(path, new Path(localJarFile.getAbsolutePath()));
  this.jarFile = localJarFile;
  this.conf = conf;
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  init(this.jarFile.getName());
}
 
示例18
/**
 * This is for recovering an app without specifying apa or appjar file
 *
 * @throws Exception
 */
public StramAppLauncher(FileSystem fs, Configuration conf) throws Exception
{
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  this.fs = fs;
  this.conf = conf;
  init();
}
 
示例19
/**
 * Scan the application jar file entries for configuration classes.
 * This needs to occur in a class loader with access to the application dependencies.
 */
private void findAppConfigClasses(List<String> classFileNames)
{
  URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
  for (final String classFileName : classFileNames) {
    final String className = classFileName.replace('/', '.').substring(0, classFileName.length() - 6);
    try {
      final Class<?> clazz = cl.loadClass(className);
      if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
        final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
        {
          @Override
          public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
          {
            // load class from current context class loader
            Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
            StreamingApplication app = StramUtils.newInstance(c);
            return super.createApp(app, planConfig);
          }
        };
        appResourceList.add(appConfig);
      }
    } catch (Throwable e) { // java.lang.NoClassDefFoundError
      LOG.error("Unable to load class: " + className + " " + e);
    }
  }
}
 
示例20
public static boolean isLocalKey(String key)
{
  return key.equals(StramClientUtils.DT_DFS_ROOT_DIR)
      || key.equals(LogicalPlanConfiguration.GATEWAY_LISTEN_ADDRESS)
      || key.equals(StramClientUtils.DT_CONFIG_STATUS)
      || key.equals(StramClientUtils.DT_VERSION)
      || key.equals(StreamingApplication.DT_PREFIX + LogicalPlan.GATEWAY_CONNECT_ADDRESS.getName());
}
 
示例21
/**
 * Set the property on a new operator. Since this is only intended to modify
 * previously added operators, no change to the physical plan is required.
 *
 * @param operatorName
 * @param propertyName
 * @param propertyValue
 */
public void setOperatorProperty(String operatorName, String propertyName, String propertyValue)
{
  OperatorMeta om = assertGetOperator(operatorName);
  if (physicalPlan != null) {
    for (PTOperator oper : physicalPlan.getOperators(om)) {
      if (!physicalPlan.newOpers.containsKey(oper)) {
        throw new ValidationException("Properties can only be set on new operators: " + om + " " + propertyName + " " + propertyValue);
      }
    }
  }
  Map<String, String> props = Collections.singletonMap(propertyName, propertyValue);
  LogicalPlanConfiguration.setOperatorProperties(om.getOperator(), props);
}
 
示例22
@Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
  if (launchParameters != null) {
    for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
      String property = propMapping.get(entry.getKey());
      if (property != null) {
        setConfiguration(conf, property, entry.getValue());
      }
    }
  }
  try {
    String name = app.getClass().getName();
    StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
    appLauncher.loadDependencies();
    StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
    {
      @Override
      public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
      {
        return super.createApp(app, planConfig);
      }
    };
    ApplicationId appId = appLauncher.launchApp(appFactory);
    appLauncher.resetContextClassLoader();
    return new YarnAppHandleImpl(appId, conf);
  } catch (Exception ex) {
    throw new LauncherException(ex);
  }
}
 
示例23
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
  final Map<String, String> properties = Collections.singletonMap(propertyKey, propertyValue);
  logger.info("Setting property {} on operator {}", properties, operator);
  LogicalPlanConfiguration.setOperatorProperties(operator, properties);
  return null;
}
 
示例24
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
  try {
    return conf.createFromProperties(LogicalPlanConfiguration.readProperties(propertyFile.getAbsolutePath()), getName());
  } catch (IOException e) {
    throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
  }
}
 
示例25
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
  try {
    return conf.createFromJson(json, getName());
  } catch (Exception e) {
    throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
  }
}
 
示例26
public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
{
  this.jarFile = appJarFile;
  this.conf = conf;
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  init(this.jarFile.getName());
}
 
示例27
public StramAppLauncher(FileSystem fs, Path path, Configuration conf) throws Exception
{
  File jarsDir = new File(StramClientUtils.getUserDTDirectory(), "jars");
  jarsDir.mkdirs();
  File localJarFile = new File(jarsDir, path.getName());
  this.fs = fs;
  fs.copyToLocalFile(path, new Path(localJarFile.getAbsolutePath()));
  this.jarFile = localJarFile;
  this.conf = conf;
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  init(this.jarFile.getName());
}
 
示例28
/**
 * This is for recovering an app without specifying apa or appjar file
 *
 * @throws Exception
 */
public StramAppLauncher(FileSystem fs, Configuration conf) throws Exception
{
  this.propertiesBuilder = new LogicalPlanConfiguration(conf);
  this.fs = fs;
  this.conf = conf;
  init();
}
 
示例29
/**
 * Scan the application jar file entries for configuration classes.
 * This needs to occur in a class loader with access to the application dependencies.
 */
private void findAppConfigClasses(List<String> classFileNames)
{
  URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
  for (final String classFileName : classFileNames) {
    final String className = classFileName.replace('/', '.').substring(0, classFileName.length() - 6);
    try {
      final Class<?> clazz = cl.loadClass(className);
      if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
        final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
        {
          @Override
          public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
          {
            // load class from current context class loader
            Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
            StreamingApplication app = StramUtils.newInstance(c);
            return super.createApp(app, planConfig);
          }
        };
        appResourceList.add(appConfig);
      }
    } catch (Throwable e) { // java.lang.NoClassDefFoundError
      LOG.error("Unable to load class: " + className + " " + e);
    }
  }
}
 
示例30
public static boolean isLocalKey(String key)
{
  return key.equals(StramClientUtils.DT_DFS_ROOT_DIR)
      || key.equals(LogicalPlanConfiguration.GATEWAY_LISTEN_ADDRESS)
      || key.equals(StramClientUtils.DT_CONFIG_STATUS)
      || key.equals(StramClientUtils.DT_VERSION)
      || key.equals(StreamingApplication.DT_PREFIX + LogicalPlan.GATEWAY_CONNECT_ADDRESS.getName());
}