Java源码示例:org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps

示例1
@Test
public void buildKafkaSource() throws Exception {
    File file = ResourceUtils.getFile("classpath:yaml/kafka-source.yaml");
    SourceDescriptor sourceDescriptor = BindPropertiesUtil.bindProperties(file, SourceDescriptor.class);
    Kafka010ConnectorDescriptor connectorDescriptor = BindPropertiesUtil.bindProperties(sourceDescriptor.getConnector(), Kafka010ConnectorDescriptor.class);
    assertThat(connectorDescriptor.getTopic()).isEqualTo("app-log");
    assertThat(connectorDescriptor.getStartupMode()).isEqualTo("earliest-offset");
    assertThat(connectorDescriptor.getSpecificOffsets().get("1")).isEqualTo("1000");
    assertThat(connectorDescriptor.getSpecificOffsets().get("2")).isEqualTo("3000");
    assertThat(connectorDescriptor.getProperties().get("bootstrap.servers")).isEqualTo("127.0.0.1:9092");
    assertThat(connectorDescriptor.getProperties().get("group.id")).isEqualTo("testGroup");
    assertThat(sourceDescriptor.getSchema()).isNotNull();
    FormatDescriptor formatDescriptor = sourceDescriptor.getFormat();
    KafkaTableSourceBase alchemyKafkaTableSource = connectorDescriptor.buildSource(sourceDescriptor.getSchema(), formatDescriptor);
    assertThat(alchemyKafkaTableSource).isNotNull();
    assertThat(alchemyKafkaTableSource.getProctimeAttribute()).isEqualTo("procTime");
    List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = alchemyKafkaTableSource.getRowtimeAttributeDescriptors();
    assertThat(rowtimeAttributeDescriptors).isNotNull();
    assertThat(rowtimeAttributeDescriptors.get(0).getAttributeName()).isEqualTo("rowTime");
    assertThat(rowtimeAttributeDescriptors.get(0).getTimestampExtractor()).isInstanceOf(ExistingField.class);
    assertThat(rowtimeAttributeDescriptors.get(0).getWatermarkStrategy()).isInstanceOf(BoundedOutOfOrderTimestamps.class);
    DeserializationSchema deserializationSchema = formatDescriptor.transform(new Tuple2<>(alchemyKafkaTableSource.getReturnType(), true));
    assertThat(deserializationSchema).isInstanceOf(GrokRowDeserializationSchema.class);
}
 
示例2
/**
 * Rowtime attribute should be of type TIMESTAMP.
 */
@Test (expected = ValidationException.class)
public void testWrongRowTimeAttributeType() {
    final Schema schema = new Schema()
            .field("name", DataTypes.STRING())
            .field("age", DataTypes.INT()).rowtime(new Rowtime()
                                                            .timestampsFromField("age")
                                                            .watermarksFromStrategy(
                                                                    new BoundedOutOfOrderTimestamps(30000L)));
    Pravega pravega = new Pravega();
    Stream stream = Stream.of(SCOPE, STREAM);
    pravega.tableSourceReaderBuilder()
            .forStream(stream)
            .withPravegaConfig(PRAVEGA_CONFIG);
    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(JSON)
            .withSchema(schema)
            .inAppendMode();
    final Map<String, String> propertiesMap = testDesc.toProperties();
    FlinkPravegaTableFactoryBase tableFactoryBase = new FlinkPravegaStreamTableSourceFactory();
    tableFactoryBase.createFlinkPravegaTableSource(propertiesMap);
    fail("Schema validation failed");
}
 
示例3
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
	return Collections.singletonList(
		new RowtimeAttributeDescriptor(
			"rowtime",
			new ExistingField("ts"),
			new BoundedOutOfOrderTimestamps(100)));
}
 
示例4
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
	return Collections.singletonList(
		new RowtimeAttributeDescriptor(
			"timestamp",
			new ExistingField("timestamp"),
			new BoundedOutOfOrderTimestamps(100)));
}
 
示例5
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
	return Collections.singletonList(
		new RowtimeAttributeDescriptor(
			"rowtime",
			new ExistingField("ts"),
			new BoundedOutOfOrderTimestamps(100)));
}
 
示例6
public WatermarkStrategy get() {
    if (type == null) {
        return null;
    }
    if (type.equals(Type.PERIODIC_ASCENDING.getType())) {
        return new AscendingTimestamps();
    } else if (type.equals(Type.PERIODIC_BOUNDED.getType())) {
        return new BoundedOutOfOrderTimestamps(delay);
    } else if (type.equals(Type.FROM_SOURCE.getType())) {
        return new PreserveWatermarks();
    }
    return null;
}
 
示例7
@Override
public void handleRequest() {

    TableSchema tableSchema = TripRecord.getTableSchema();

    FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
            .forStream(Stream.of(getScope(), getStream()).getScopedName())
            .withPravegaConfig(getPravegaConfig())
            .failOnMissingField(true)
            .withRowtimeAttribute("pickupTime", new ExistingField("pickupTime"), new BoundedOutOfOrderTimestamps(30000L))
            .withSchema(tableSchema)
            .withReaderGroupScope(getScope())
            .build();

    StreamExecutionEnvironment env = getStreamExecutionEnvironment();

    // create a TableEnvironment
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.registerTableSource("TaxiRide", source);

    String fields = "vendorId, pickupTime, startLocationId, destLocationId, startLocationBorough, startLocationZone, destLocationBorough, destLocationZone";

    Table popularRides = tEnv
            .scan("TaxiRide")
            .select(fields)
            .window(Slide.over("15.minutes").every("5.minutes").on("pickupTime").as("w"))
            .groupBy("vendorId, w")
            .select("vendorId, w.start AS start, w.end AS end, count(vendorId) AS cnt");

    tEnv.toAppendStream(popularRides, Row.class).print();

    try {
        env.execute("Popular-Taxi-Vendor");
    } catch (Exception e) {
        log.error("Application Failed", e);
    }
}
 
示例8
@Override
public void handleRequest() {

    TableSchema tableSchema = TripRecord.getTableSchema();

    FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
            .forStream(Stream.of(getScope(), getStream()).getScopedName())
            .withPravegaConfig(getPravegaConfig())
            .failOnMissingField(true)
            .withRowtimeAttribute("dropOffTime", new ExistingField("dropOffTime"), new BoundedOutOfOrderTimestamps(30000L))
            .withSchema(tableSchema)
            .withReaderGroupScope(getScope())
            .build();

    StreamExecutionEnvironment env = getStreamExecutionEnvironment();

    // create a TableEnvironment
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.registerTableSource("TaxiRide", source);

    String fields = "passengerCount, dropOffTime, destLocationZone";

    Table noOfTravelersPerDest = tEnv
            .scan("TaxiRide")
            .select(fields)
            .window(Tumble.over("1.hour").on("dropOffTime").as("w"))
            .groupBy("destLocationZone, w")
            .select("destLocationZone, w.start AS start, w.end AS end, count(passengerCount) AS cnt");

    tEnv.toAppendStream(noOfTravelersPerDest, Row.class).print();

    try {
        env.execute("Max-Travellers-Per-Destination");
    } catch (Exception e) {
        log.error("Application Failed", e);
    }
}
 
示例9
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
	return Collections.singletonList(
		new RowtimeAttributeDescriptor(
			"rowtime",
			new ExistingField("ts"),
			new BoundedOutOfOrderTimestamps(100)));
}
 
示例10
@Test
@SuppressWarnings("unchecked")
public void testTableSourceDescriptor() {
    final String cityName = "fruitName";
    final String total = "count";
    final String eventTime = "eventTime";
    final String procTime = "procTime";
    final String controllerUri = "tcp://localhost:9090";
    final long delay = 3000L;
    final String streamName = "test";
    final String scopeName = "test";

    final TableSchema tableSchema = TableSchema.builder()
            .field(cityName, DataTypes.STRING())
            .field(total, DataTypes.BIGINT())
            .field(eventTime, DataTypes.TIMESTAMP(3))
            .field(procTime, DataTypes.TIMESTAMP(3))
            .build();

    Stream stream = Stream.of(scopeName, streamName);
    PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
            .withControllerURI(URI.create(controllerUri))
            .withDefaultScope(scopeName);

    // construct table source using descriptors and table source factory
    Pravega pravega = new Pravega();
    pravega.tableSourceReaderBuilder()
            .withReaderGroupScope(stream.getScope())
            .forStream(stream)
            .withPravegaConfig(pravegaConfig);

    final TestTableDescriptor testDesc = new TestTableDescriptor(pravega)
            .withFormat(new Json().failOnMissingField(false))
            .withSchema(
                    new Schema()
                            .field(cityName, DataTypes.STRING())
                            .field(total, DataTypes.BIGINT())
                            .field(eventTime, DataTypes.TIMESTAMP(3))
                                .rowtime(new Rowtime()
                                            .timestampsFromField(eventTime)
                                            .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
                                        )
                            .field(procTime, DataTypes.TIMESTAMP(3)).proctime())
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    final TableSource<?> actualSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
            .createStreamTableSource(propertiesMap);
    assertNotNull(actualSource);
    TableSourceValidation.validateTableSource(actualSource, tableSchema);
}
 
示例11
@Test
@SuppressWarnings("unchecked")
public void testTableSinkDescriptor() {
    final String cityName = "fruitName";
    final String total = "count";
    final String eventTime = "eventTime";
    final String procTime = "procTime";
    final String controllerUri = "tcp://localhost:9090";
    final long delay = 3000L;
    final String streamName = "test";
    final String scopeName = "test";

    Stream stream = Stream.of(scopeName, streamName);
    PravegaConfig pravegaConfig = PravegaConfig.fromDefaults()
            .withControllerURI(URI.create(controllerUri))
            .withDefaultScope(scopeName);

    // construct table sink using descriptors and table sink factory
    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField(cityName)
            .withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
            .enableWatermark(true)
            .forStream(stream)
            .enableMetrics(true)
            .withPravegaConfig(pravegaConfig);

    final FlinkPravegaTableSourceTest.TestTableDescriptor testDesc = new FlinkPravegaTableSourceTest.TestTableDescriptor(pravega)
            .withFormat(new Json().failOnMissingField(false))
            .withSchema(
                    new Schema()
                            .field(cityName, DataTypes.STRING())
                            .field(total, DataTypes.BIGINT())
                            .field(eventTime, DataTypes.TIMESTAMP(3))
                            .rowtime(new Rowtime()
                                    .timestampsFromField(eventTime)
                                    .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay))
                            )
                            .field(procTime, DataTypes.TIMESTAMP(3)).proctime()
            )
            .inAppendMode();

    final Map<String, String> propertiesMap = testDesc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    assertNotNull(sink);
}
 
示例12
@Override
public void handleRequest() {

    TableSchema tableSchema = TripRecord.getTableSchema();

    FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
            .forStream(Stream.of(getScope(), getStream()).getScopedName())
            .withPravegaConfig(getPravegaConfig())
            .failOnMissingField(true)
            .withRowtimeAttribute("pickupTime",
                    new ExistingField("pickupTime"),
                    new BoundedOutOfOrderTimestamps(30000L))
            .withSchema(tableSchema)
            .withReaderGroupScope(getScope())
            .build();


    StreamExecutionEnvironment env = getStreamExecutionEnvironment();

    // create a TableEnvironment
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.registerTableSource("TaxiRide", source);

    String query =
            "SELECT " +
                    "destLocationId, wstart, wend, cnt " +
                    "FROM " +
                    "(SELECT " +
                    "destLocationId, " +
                    "HOP_START(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wstart, " +
                    "HOP_END(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wend, " +
                    "COUNT(destLocationId) AS cnt " +
                    "FROM " +
                    "(SELECT " +
                    "pickupTime, " +
                    "destLocationId " +
                    "FROM TaxiRide) " +
                    "GROUP BY destLocationId, HOP(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE)) " +
                    "WHERE cnt > " + getLimit();

    Table results = tEnv.sqlQuery(query);

    tEnv.toAppendStream(results, Row.class).print();

    try {
        env.execute("Popular-Destination");
    } catch (Exception e) {
        log.error("Application Failed", e);
    }
}