public static void main(String[] args) {
JetInstance jet = JetBootstrap.getInstance();
Properties properties = new Properties();
properties.setProperty("group.id", "cdc-demo");
properties.setProperty("bootstrap.servers", "kafka:9092");
properties.setProperty("key.deserializer", JsonDeserializer.class.getCanonicalName());
properties.setProperty("value.deserializer", JsonDeserializer.class.getCanonicalName());
properties.setProperty("auto.offset.reset", "earliest");
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(properties, record -> {
HazelcastJsonValue key = new HazelcastJsonValue(record.key().toString());
HazelcastJsonValue value = new HazelcastJsonValue(record.value().toString());
return Util.entry(key, value);
}, "dbserver1.inventory.customers"))
.withoutTimestamps()
.peek()
.writeTo(Sinks.map("customers"));
jet.newJob(p).join();
}
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(TradeSource.tradeSource())
.withNativeTimestamps(0)
.map(trade -> Util.entry(trade.getSymbol(), trade))
.writeTo(Sinks.map(LATEST_TRADES_PER_SYMBOL));
return p;
}
public static String getMetricsMapName(long jobId) {
return Util.idToString(jobId) + "_METRICS";
}