Java源码示例:com.datatorrent.api.DefaultInputPort
示例1
/**
* This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port
* of the encapsulating {@link AppData.Store}.
* @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the
* {@link AppData.Store}'s input port.
* @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
* @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}.
*/
public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort)
{
outputPort.setSink(new Sink<Object>()
{
@Override
@SuppressWarnings("unchecked")
public void put(Object tuple)
{
LOG.debug("processing tuple");
inputPort.process((T)tuple);
}
@Override
public int getCount(boolean reset)
{
return 0;
}
});
}
示例2
protected void populateCdrGeoDAG(DAG dag, Configuration conf,
List<DefaultInputPort<? super EnrichedCDR>> enrichedStreamSinks)
{
// dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("TagNetworkGeoLocations",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
enrichedStreamSinks.add(dimensions.input);
// Set operator properties
// key expression: Point( Lat, Lon )
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("zipcode", "getZipCode()");
keyToExpression.put("region", "getRegionZip2()");
keyToExpression.put("time", "getTime()");
dimensions.setKeyToExpression(keyToExpression);
}
// aggregate expression: disconnect and downloads
{
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("disconnectCount", "getDisconnectCount()");
aggregateToExpression.put("downloadBytes", "getBytes()");
aggregateToExpression.put("lat", "getLat()");
aggregateToExpression.put("lon", "getLon()");
dimensions.setAggregateToExpression(aggregateToExpression);
}
// event schema
String cdrGeoSchema = SchemaUtils.jarResourceFileToString(cdrGeoSchemaLocation);
dimensions.setConfigurationSchemaJSON(cdrGeoSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
// store
//AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("StoreNetworkTaggedGeoLocations", AppDataSingleSchemaDimensionStoreHDHT.class);
GeoDimensionStore store = dag.addOperator("StoreNetworkTaggedGeoLocations", GeoDimensionStore.class);
store.setUpdateEnumValues(true);
String basePath = Preconditions.checkNotNull(conf.get(PROP_GEO_STORE_PATH),
"GEO base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.setConfigurationSchemaJSON(cdrGeoSchema);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
PubSubWebSocketAppDataQuery query = createAppDataQuery();
URI queryUri = ConfigUtil.getAppDataQueryPubSubURI(dag, conf);
query.setUri(queryUri);
store.setEmbeddableQueryInfoProvider(query);
if (cdrGeoStorePartitionCount > 1) {
store.setPartitionCount(cdrGeoStorePartitionCount);
store.setQueryResultUnifier(new DimensionStoreHDHTNonEmptyQueryResultUnifier());
}
// wsOut
PubSubWebSocketAppDataResult wsOut = createAppDataResult();
wsOut.setUri(queryUri);
dag.addOperator("CDRGeoQueryResult", wsOut);
// Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("CDRGeoStream", dimensions.output, store.input);
dag.addStream("CDRGeoQueryResult", store.queryResult, wsOut.input);
}
示例3
protected void populateCsGeoDAG(DAG dag, Configuration conf,
List<DefaultInputPort<? super EnrichedCustomerService>> customerServiceStreamSinks)
{
// dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("TagServiceGeoLocations",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.getMeta(dimensions).getAttributes().put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 4);
customerServiceStreamSinks.add(dimensions.input);
// Set operator properties
// key expression: Point( Lat, Lon )
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("zipcode", "getZipCode()");
keyToExpression.put("region", "getRegionZip2()");
keyToExpression.put("time", "getTime()");
dimensions.setKeyToExpression(keyToExpression);
}
// aggregate expression: disconnect and downloads
{
Map<String, String> aggregateToExpression = Maps.newHashMap();
aggregateToExpression.put("wait", "getWait()");
aggregateToExpression.put("lat", "getLat()");
aggregateToExpression.put("lon", "getLon()");
dimensions.setAggregateToExpression(aggregateToExpression);
}
// event schema
String geoSchema = SchemaUtils.jarResourceFileToString(csGeoSchemaLocation);
dimensions.setConfigurationSchemaJSON(geoSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.getMeta(dimensions).getMeta(dimensions.output).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
8092);
// store
//AppDataSingleSchemaDimensionStoreHDHT store = dag.addOperator("StoreTaggedServiceGeoLocations", AppDataSingleSchemaDimensionStoreHDHT.class);
GeoDimensionStore store = dag.addOperator("StoreTaggedServiceGeoLocations", GeoDimensionStore.class);
store.setUpdateEnumValues(true);
String basePath = Preconditions.checkNotNull(conf.get(PROP_GEO_STORE_PATH),
"GEO base path should be specified in the properties.xml");
TFileImpl hdsFile = new TFileImpl.DTFileImpl();
basePath += System.currentTimeMillis();
hdsFile.setBasePath(basePath);
store.setFileStore(hdsFile);
store.setConfigurationSchemaJSON(geoSchema);
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
PubSubWebSocketAppDataQuery query = createAppDataQuery();
URI queryUri = ConfigUtil.getAppDataQueryPubSubURI(dag, conf);
query.setUri(queryUri);
store.setEmbeddableQueryInfoProvider(query);
if (csGeoStorePartitionCount > 1) {
store.setPartitionCount(csGeoStorePartitionCount);
store.setQueryResultUnifier(new DimensionStoreHDHTNonEmptyQueryResultUnifier());
}
// wsOut
PubSubWebSocketAppDataResult wsOut = createAppDataResult();
wsOut.setUri(queryUri);
dag.addOperator("CSGeoQueryResult", wsOut);
// Set remaining dag options
dag.setAttribute(store, Context.OperatorContext.COUNTERS_AGGREGATOR,
new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("CSGeoStream", dimensions.output, store.input);
dag.addStream("CSGeoQueryResult", store.queryResult, wsOut.input);
}
示例4
public void setInput(DefaultInputPort input)
{
this.input = input;
}