Java源码示例:org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps
示例1
@Test
public void buildKafkaSource() throws Exception {
File file = ResourceUtils.getFile("classpath:yaml/kafka-source.yaml");
SourceDescriptor sourceDescriptor = BindPropertiesUtil.bindProperties(file, SourceDescriptor.class);
Kafka010ConnectorDescriptor connectorDescriptor = BindPropertiesUtil.bindProperties(sourceDescriptor.getConnector(), Kafka010ConnectorDescriptor.class);
assertThat(connectorDescriptor.getTopic()).isEqualTo("app-log");
assertThat(connectorDescriptor.getStartupMode()).isEqualTo("earliest-offset");
assertThat(connectorDescriptor.getSpecificOffsets().get("1")).isEqualTo("1000");
assertThat(connectorDescriptor.getSpecificOffsets().get("2")).isEqualTo("3000");
assertThat(connectorDescriptor.getProperties().get("bootstrap.servers")).isEqualTo("127.0.0.1:9092");
assertThat(connectorDescriptor.getProperties().get("group.id")).isEqualTo("testGroup");
assertThat(sourceDescriptor.getSchema()).isNotNull();
FormatDescriptor formatDescriptor = sourceDescriptor.getFormat();
KafkaTableSourceBase alchemyKafkaTableSource = connectorDescriptor.buildSource(sourceDescriptor.getSchema(), formatDescriptor);
assertThat(alchemyKafkaTableSource).isNotNull();
assertThat(alchemyKafkaTableSource.getProctimeAttribute()).isEqualTo("procTime");
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = alchemyKafkaTableSource.getRowtimeAttributeDescriptors();
assertThat(rowtimeAttributeDescriptors).isNotNull();
assertThat(rowtimeAttributeDescriptors.get(0).getAttributeName()).isEqualTo("rowTime");
assertThat(rowtimeAttributeDescriptors.get(0).getTimestampExtractor()).isInstanceOf(ExistingField.class);
assertThat(rowtimeAttributeDescriptors.get(0).getWatermarkStrategy()).isInstanceOf(BoundedOutOfOrderTimestamps.class);
DeserializationSchema deserializationSchema = formatDescriptor.transform(new Tuple2<>(alchemyKafkaTableSource.getReturnType(), true));
assertThat(deserializationSchema).isInstanceOf(GrokRowDeserializationSchema.class);
}
示例2
/**
* Rowtime attribute should be of type TIMESTAMP.
*/
@Test (expected = ValidationException.class)
public void testWrongRowTimeAttributeType() {
final Schema schema = new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT()).rowtime(new Rowtime()
.timestampsFromField("age")
.watermarksFromStrategy(
new BoundedOutOfOrderTimestamps(30000L)));
Pravega pravega = new Pravega();
Stream stream = Stream.of(SCOPE, STREAM);
pravega.tableSourceReaderBuilder()
.forStream(stream)
.withPravegaConfig(PRAVEGA_CONFIG);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(JSON)
.withSchema(schema)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
FlinkPravegaTableFactoryBase tableFactoryBase = new FlinkPravegaStreamTableSourceFactory();
tableFactoryBase.createFlinkPravegaTableSource(propertiesMap);
fail("Schema validation failed");
}
示例3
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
return Collections.singletonList(
new RowtimeAttributeDescriptor(
"rowtime",
new ExistingField("ts"),
new BoundedOutOfOrderTimestamps(100)));
}
示例4
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
return Collections.singletonList(
new RowtimeAttributeDescriptor(
"timestamp",
new ExistingField("timestamp"),
new BoundedOutOfOrderTimestamps(100)));
}
示例5
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
return Collections.singletonList(
new RowtimeAttributeDescriptor(
"rowtime",
new ExistingField("ts"),
new BoundedOutOfOrderTimestamps(100)));
}
示例6
public WatermarkStrategy get() {
if (type == null) {
return null;
}
if (type.equals(Type.PERIODIC_ASCENDING.getType())) {
return new AscendingTimestamps();
} else if (type.equals(Type.PERIODIC_BOUNDED.getType())) {
return new BoundedOutOfOrderTimestamps(delay);
} else if (type.equals(Type.FROM_SOURCE.getType())) {
return new PreserveWatermarks();
}
return null;
}
示例7
@Override
public void handleRequest() {
TableSchema tableSchema = TripRecord.getTableSchema();
FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
.forStream(Stream.of(getScope(), getStream()).getScopedName())
.withPravegaConfig(getPravegaConfig())
.failOnMissingField(true)
.withRowtimeAttribute("pickupTime", new ExistingField("pickupTime"), new BoundedOutOfOrderTimestamps(30000L))
.withSchema(tableSchema)
.withReaderGroupScope(getScope())
.build();
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
// create a TableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.registerTableSource("TaxiRide", source);
String fields = "vendorId, pickupTime, startLocationId, destLocationId, startLocationBorough, startLocationZone, destLocationBorough, destLocationZone";
Table popularRides = tEnv
.scan("TaxiRide")
.select(fields)
.window(Slide.over("15.minutes").every("5.minutes").on("pickupTime").as("w"))
.groupBy("vendorId, w")
.select("vendorId, w.start AS start, w.end AS end, count(vendorId) AS cnt");
tEnv.toAppendStream(popularRides, Row.class).print();
try {
env.execute("Popular-Taxi-Vendor");
} catch (Exception e) {
log.error("Application Failed", e);
}
}
示例8
@Override
public void handleRequest() {
TableSchema tableSchema = TripRecord.getTableSchema();
FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
.forStream(Stream.of(getScope(), getStream()).getScopedName())
.withPravegaConfig(getPravegaConfig())
.failOnMissingField(true)
.withRowtimeAttribute("dropOffTime", new ExistingField("dropOffTime"), new BoundedOutOfOrderTimestamps(30000L))
.withSchema(tableSchema)
.withReaderGroupScope(getScope())
.build();
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
// create a TableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.registerTableSource("TaxiRide", source);
String fields = "passengerCount, dropOffTime, destLocationZone";
Table noOfTravelersPerDest = tEnv
.scan("TaxiRide")
.select(fields)
.window(Tumble.over("1.hour").on("dropOffTime").as("w"))
.groupBy("destLocationZone, w")
.select("destLocationZone, w.start AS start, w.end AS end, count(passengerCount) AS cnt");
tEnv.toAppendStream(noOfTravelersPerDest, Row.class).print();
try {
env.execute("Max-Travellers-Per-Destination");
} catch (Exception e) {
log.error("Application Failed", e);
}
}
示例9
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
return Collections.singletonList(
new RowtimeAttributeDescriptor(
"rowtime",
new ExistingField("ts"),
new BoundedOutOfOrderTimestamps(100)));
}
示例10
@Test
@SuppressWarnings("unchecked")
public void testTableSourceDescriptor() {
final String cityName = "fruitName";
final String total = "count";
final String eventTime = "eventTime";
final String procTime = "procTime";
final String controllerUri = "tcp://localhost:9090";
final long delay = 3000L;
final String streamName = "test";
final String scopeName = "test";
final TableSchema tableSchema = TableSchema.builder()
.field(cityName, DataTypes.STRING())
.field(total, DataTypes.BIGINT())
.field(eventTime, DataTypes.TIMESTAMP(3))
.field(procTime, DataTypes.TIMESTAMP(3))
.build();
Stream stream = Stream.of(scopeName, streamName);
PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
.withControllerURI(URI.create(controllerUri))
.withDefaultScope(scopeName);
// construct table source using descriptors and table source factory
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
.withFormat(new Json().failOnMissingField(false))
.withSchema(
new Schema()
.field(cityName, DataTypes.STRING())
.field(total, DataTypes.BIGINT())
.field(eventTime, DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField(eventTime)
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
)
.field(procTime, DataTypes.TIMESTAMP(3)).proctime())
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);
assertNotNull(actualSource);
TableSourceValidation.validateTableSource(actualSource, tableSchema);
}
示例11
@Test
@SuppressWarnings("unchecked")
public void testTableSinkDescriptor() {
final String cityName = "fruitName";
final String total = "count";
final String eventTime = "eventTime";
final String procTime = "procTime";
final String controllerUri = "tcp://localhost:9090";
final long delay = 3000L;
final String streamName = "test";
final String scopeName = "test";
Stream stream = Stream.of(scopeName, streamName);
PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
.withControllerURI(URI.create(controllerUri))
.withDefaultScope(scopeName);
// construct table sink using descriptors and table sink factory
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField(cityName)
.withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
.enableWatermark(true)
.forStream(stream)
.enableMetrics(true)
.withPravegaConfig(pravegaConfig);
final FlinkPravegaTableSourceTest.TestTableDescriptor testDesc = new FlinkPravegaTableSourceTest.TestTableDescriptor(pravega)
.withFormat(new Json().failOnMissingField(false))
.withSchema(
new Schema()
.field(cityName, DataTypes.STRING())
.field(total, DataTypes.BIGINT())
.field(eventTime, DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField(eventTime)
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
)
.field(procTime, DataTypes.TIMESTAMP(3)).proctime()
)
.inAppendMode();
final Map<String, String> propertiesMap = testDesc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
assertNotNull(sink);
}
示例12
@Override
public void handleRequest() {
TableSchema tableSchema = TripRecord.getTableSchema();
FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
.forStream(Stream.of(getScope(), getStream()).getScopedName())
.withPravegaConfig(getPravegaConfig())
.failOnMissingField(true)
.withRowtimeAttribute("pickupTime",
new ExistingField("pickupTime"),
new BoundedOutOfOrderTimestamps(30000L))
.withSchema(tableSchema)
.withReaderGroupScope(getScope())
.build();
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
// create a TableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.registerTableSource("TaxiRide", source);
String query =
"SELECT " +
"destLocationId, wstart, wend, cnt " +
"FROM " +
"(SELECT " +
"destLocationId, " +
"HOP_START(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wstart, " +
"HOP_END(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wend, " +
"COUNT(destLocationId) AS cnt " +
"FROM " +
"(SELECT " +
"pickupTime, " +
"destLocationId " +
"FROM TaxiRide) " +
"GROUP BY destLocationId, HOP(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE)) " +
"WHERE cnt > " + getLimit();
Table results = tEnv.sqlQuery(query);
tEnv.toAppendStream(results, Row.class).print();
try {
env.execute("Popular-Destination");
} catch (Exception e) {
log.error("Application Failed", e);
}
}