Java源码示例:com.datatorrent.stram.plan.logical.LogicalPlanConfiguration
示例1
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorProperties(@PathParam("operatorName") String operatorName, @QueryParam("propertyName") String propertyName) throws IOException, JSONException
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
BeanMap operatorProperties = null;
if (logicalOperator == null) {
ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
if (logicalModule == null) {
throw new NotFoundException();
}
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
} else {
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
}
Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
return new JSONObject(objectMapper.writeValueAsString(m));
}
示例2
private void setOperatorProperty(OperatorMeta logicalOperator, String propertyName, String propertyValue)
{
Map<String, String> properties = Collections.singletonMap(propertyName, propertyValue);
LogicalPlanConfiguration.setOperatorProperties(logicalOperator.getOperator(), properties);
List<PTOperator> operators = plan.getOperators(logicalOperator);
for (PTOperator o : operators) {
StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
request.setOperatorId(o.getId());
request.setPropertyKey(propertyName);
request.setPropertyValue(propertyValue);
addOperatorRequest(o, request);
// re-apply to checkpointed state on deploy
updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
}
// should probably not record it here because it's better to get confirmation from the operators first.
// but right now, the operators do not give confirmation for the requests. so record it here for now.
recordEventAsync(new StramEvent.SetOperatorPropertyEvent(logicalOperator.getName(), propertyName, propertyValue));
}
示例3
@GET
@Path(PATH_LOGICAL_PLAN_OPERATORS + "/{operatorName}/properties")
@Produces(MediaType.APPLICATION_JSON)
public JSONObject getOperatorProperties(@PathParam("operatorName") String operatorName, @QueryParam("propertyName") String propertyName) throws IOException, JSONException
{
init();
OperatorMeta logicalOperator = dagManager.getLogicalPlan().getOperatorMeta(operatorName);
BeanMap operatorProperties = null;
if (logicalOperator == null) {
ModuleMeta logicalModule = dagManager.getModuleMeta(operatorName);
if (logicalModule == null) {
throw new NotFoundException();
}
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
} else {
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
}
Map<String, Object> m = getPropertiesAsMap(propertyName, operatorProperties);
return new JSONObject(objectMapper.writeValueAsString(m));
}
示例4
private void setOperatorProperty(OperatorMeta logicalOperator, String propertyName, String propertyValue)
{
Map<String, String> properties = Collections.singletonMap(propertyName, propertyValue);
LogicalPlanConfiguration.setOperatorProperties(logicalOperator.getOperator(), properties);
List<PTOperator> operators = plan.getOperators(logicalOperator);
for (PTOperator o : operators) {
StramToNodeSetPropertyRequest request = new StramToNodeSetPropertyRequest();
request.setOperatorId(o.getId());
request.setPropertyKey(propertyName);
request.setPropertyValue(propertyValue);
addOperatorRequest(o, request);
// re-apply to checkpointed state on deploy
updateOnDeployRequests(o, new SetOperatorPropertyRequestFilter(propertyName), request);
}
// should probably not record it here because it's better to get confirmation from the operators first.
// but right now, the operators do not give confirmation for the requests. so record it here for now.
recordEventAsync(new StramEvent.SetOperatorPropertyEvent(logicalOperator.getName(), propertyName, propertyValue));
}
示例5
@Test
public void testMetrics() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 1L, ((Long)aggregator.result.get("progress")).longValue());
lc.shutdown();
}
示例6
@Test
@Ignore
public void testMetricsAggregations() throws Exception
{
CountDownLatch latch = new CountDownLatch(2);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.setOperatorAttribute(o1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2));
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 2L, ((Long)aggregator.result.get("progress")).longValue());
lc.shutdown();
}
示例7
@Test
public void testDefaultMetricsAggregator() throws Exception
{
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
CountDownLatch latch = new CountDownLatch(1);
OperatorAndAggregator o1 = dag.addOperator("o1", new OperatorAndAggregator(latch));
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1");
Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 1, o1.result.get("progress"));
lc.shutdown();
}
示例8
@Test
public void testMetricsAnnotatedMethod() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("myMetric", 3, ((Integer)aggregator.result.get("myMetric")).intValue());
lc.shutdown();
}
示例9
@Test
public void testModuleProperties()
{
Configuration conf = new Configuration(false);
conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
LogicalPlan dag = new LogicalPlan();
TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule());
TestModules.ValidationTestModule o2 = dag.addModule("o2", new TestModules.ValidationTestModule());
LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
pb.setModuleProperties(dag, "testSetOperatorProperties");
Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, o2.getStringArrayField());
Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
}
示例10
@Test
public void testLoadFromPropertiesFile() throws IOException
{
Properties props = new Properties();
String resourcePath = "/testModuleTopology.properties";
InputStream is = this.getClass().getResourceAsStream(resourcePath);
if (is == null) {
throw new RuntimeException("Could not load " + resourcePath);
}
props.load(is);
LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
.addFromProperties(props, null);
LogicalPlan dag = new LogicalPlan();
pb.populateDAG(dag);
pb.prepareDAG(dag, null, "testApplication");
dag.validate();
validateTopLevelOperators(dag);
validateTopLevelStreams(dag);
validatePublicMethods(dag);
}
示例11
@Test
public void testLoadFromJson() throws Exception
{
String resourcePath = "/testModuleTopology.json";
InputStream is = this.getClass().getResourceAsStream(resourcePath);
if (is == null) {
throw new RuntimeException("Could not load " + resourcePath);
}
StringWriter writer = new StringWriter();
IOUtils.copy(is, writer);
JSONObject json = new JSONObject(writer.toString());
Configuration conf = new Configuration(false);
conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
planConf.prepareDAG(dag, null, "testApplication");
dag.validate();
validateTopLevelOperators(dag);
validateTopLevelStreams(dag);
validatePublicMethods(dag);
}
示例12
@Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
if (launchParameters != null) {
for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
String property = propMapping.get(entry.getKey());
if (property != null) {
setConfiguration(conf, property, entry.getValue());
}
}
}
try {
String name = app.getClass().getName();
StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
appLauncher.loadDependencies();
StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
return super.createApp(app, planConfig);
}
};
ApplicationId appId = appLauncher.launchApp(appFactory);
appLauncher.resetContextClassLoader();
return new YarnAppHandleImpl(appId, conf);
} catch (Exception ex) {
throw new LauncherException(ex);
}
}
示例13
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
final Map<String, String> properties = Collections.singletonMap(propertyKey, propertyValue);
logger.info("Setting property {} on operator {}", properties, operator);
LogicalPlanConfiguration.setOperatorProperties(operator, properties);
return null;
}
示例14
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
try {
return conf.createFromProperties(LogicalPlanConfiguration.readProperties(propertyFile.getAbsolutePath()), getName());
} catch (IOException e) {
throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
}
}
示例15
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
try {
return conf.createFromJson(json, getName());
} catch (Exception e) {
throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
}
}
示例16
public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
{
this.jarFile = appJarFile;
this.conf = conf;
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
init(this.jarFile.getName());
}
示例17
public StramAppLauncher(FileSystem fs, Path path, Configuration conf) throws Exception
{
File jarsDir = new File(StramClientUtils.getUserDTDirectory(), "jars");
jarsDir.mkdirs();
File localJarFile = new File(jarsDir, path.getName());
this.fs = fs;
fs.copyToLocalFile(path, new Path(localJarFile.getAbsolutePath()));
this.jarFile = localJarFile;
this.conf = conf;
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
init(this.jarFile.getName());
}
示例18
/**
* This is for recovering an app without specifying apa or appjar file
*
* @throws Exception
*/
public StramAppLauncher(FileSystem fs, Configuration conf) throws Exception
{
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
this.fs = fs;
this.conf = conf;
init();
}
示例19
/**
* Scan the application jar file entries for configuration classes.
* This needs to occur in a class loader with access to the application dependencies.
*/
private void findAppConfigClasses(List<String> classFileNames)
{
URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
for (final String classFileName : classFileNames) {
final String className = classFileName.replace('/', '.').substring(0, classFileName.length() - 6);
try {
final Class<?> clazz = cl.loadClass(className);
if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
// load class from current context class loader
Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
StreamingApplication app = StramUtils.newInstance(c);
return super.createApp(app, planConfig);
}
};
appResourceList.add(appConfig);
}
} catch (Throwable e) { // java.lang.NoClassDefFoundError
LOG.error("Unable to load class: " + className + " " + e);
}
}
}
示例20
public static boolean isLocalKey(String key)
{
return key.equals(StramClientUtils.DT_DFS_ROOT_DIR)
|| key.equals(LogicalPlanConfiguration.GATEWAY_LISTEN_ADDRESS)
|| key.equals(StramClientUtils.DT_CONFIG_STATUS)
|| key.equals(StramClientUtils.DT_VERSION)
|| key.equals(StreamingApplication.DT_PREFIX + LogicalPlan.GATEWAY_CONNECT_ADDRESS.getName());
}
示例21
/**
* Set the property on a new operator. Since this is only intended to modify
* previously added operators, no change to the physical plan is required.
*
* @param operatorName
* @param propertyName
* @param propertyValue
*/
public void setOperatorProperty(String operatorName, String propertyName, String propertyValue)
{
OperatorMeta om = assertGetOperator(operatorName);
if (physicalPlan != null) {
for (PTOperator oper : physicalPlan.getOperators(om)) {
if (!physicalPlan.newOpers.containsKey(oper)) {
throw new ValidationException("Properties can only be set on new operators: " + om + " " + propertyName + " " + propertyValue);
}
}
}
Map<String, String> props = Collections.singletonMap(propertyName, propertyValue);
LogicalPlanConfiguration.setOperatorProperties(om.getOperator(), props);
}
示例22
@Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
if (launchParameters != null) {
for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
String property = propMapping.get(entry.getKey());
if (property != null) {
setConfiguration(conf, property, entry.getValue());
}
}
}
try {
String name = app.getClass().getName();
StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
appLauncher.loadDependencies();
StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
return super.createApp(app, planConfig);
}
};
ApplicationId appId = appLauncher.launchApp(appFactory);
appLauncher.resetContextClassLoader();
return new YarnAppHandleImpl(appId, conf);
} catch (Exception ex) {
throw new LauncherException(ex);
}
}
示例23
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
final Map<String, String> properties = Collections.singletonMap(propertyKey, propertyValue);
logger.info("Setting property {} on operator {}", properties, operator);
LogicalPlanConfiguration.setOperatorProperties(operator, properties);
return null;
}
示例24
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
try {
return conf.createFromProperties(LogicalPlanConfiguration.readProperties(propertyFile.getAbsolutePath()), getName());
} catch (IOException e) {
throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
}
}
示例25
@Override
public LogicalPlan createApp(LogicalPlanConfiguration conf)
{
try {
return conf.createFromJson(json, getName());
} catch (Exception e) {
throw new IllegalArgumentException("Failed to load: " + this + "\n" + e.getMessage(), e);
}
}
示例26
public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
{
this.jarFile = appJarFile;
this.conf = conf;
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
init(this.jarFile.getName());
}
示例27
public StramAppLauncher(FileSystem fs, Path path, Configuration conf) throws Exception
{
File jarsDir = new File(StramClientUtils.getUserDTDirectory(), "jars");
jarsDir.mkdirs();
File localJarFile = new File(jarsDir, path.getName());
this.fs = fs;
fs.copyToLocalFile(path, new Path(localJarFile.getAbsolutePath()));
this.jarFile = localJarFile;
this.conf = conf;
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
init(this.jarFile.getName());
}
示例28
/**
* This is for recovering an app without specifying apa or appjar file
*
* @throws Exception
*/
public StramAppLauncher(FileSystem fs, Configuration conf) throws Exception
{
this.propertiesBuilder = new LogicalPlanConfiguration(conf);
this.fs = fs;
this.conf = conf;
init();
}
示例29
/**
* Scan the application jar file entries for configuration classes.
* This needs to occur in a class loader with access to the application dependencies.
*/
private void findAppConfigClasses(List<String> classFileNames)
{
URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
for (final String classFileName : classFileNames) {
final String className = classFileName.replace('/', '.').substring(0, classFileName.length() - 6);
try {
final Class<?> clazz = cl.loadClass(className);
if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
// load class from current context class loader
Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
StreamingApplication app = StramUtils.newInstance(c);
return super.createApp(app, planConfig);
}
};
appResourceList.add(appConfig);
}
} catch (Throwable e) { // java.lang.NoClassDefFoundError
LOG.error("Unable to load class: " + className + " " + e);
}
}
}
示例30
public static boolean isLocalKey(String key)
{
return key.equals(StramClientUtils.DT_DFS_ROOT_DIR)
|| key.equals(LogicalPlanConfiguration.GATEWAY_LISTEN_ADDRESS)
|| key.equals(StramClientUtils.DT_CONFIG_STATUS)
|| key.equals(StramClientUtils.DT_VERSION)
|| key.equals(StreamingApplication.DT_PREFIX + LogicalPlan.GATEWAY_CONNECT_ADDRESS.getName());
}