Java源码示例:org.apache.kafka.common.serialization.ByteBufferDeserializer
示例1
public void init() {
logger.info(new StringBuffer("[KafkaConsumer] [init] ")
.append("kafkaConnect(").append("192.168.4.5:9092")
.append(") groupId(").append(groupId)
.append(") topic(").append(topic).append(")").toString());
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.4.5:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", ByteBufferDeserializer.class);
props.put("value.deserializer", ByteBufferDeserializer.class);
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
}
示例2
static Deserializer getDeserializer(Serializer serializer) {
if (serializer instanceof StringSerializer) {
return new StringDeserializer();
} else if (serializer instanceof LongSerializer) {
return new LongDeserializer();
} else if (serializer instanceof IntegerSerializer) {
return new IntegerDeserializer();
} else if (serializer instanceof DoubleSerializer) {
return new DoubleDeserializer();
} else if (serializer instanceof BytesSerializer) {
return new BytesDeserializer();
} else if (serializer instanceof ByteBufferSerializer) {
return new ByteBufferDeserializer();
} else if (serializer instanceof ByteArraySerializer) {
return new ByteArrayDeserializer();
} else {
throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
}
}
示例3
@DataProvider(name = "serializers")
public Object[][] serializers() {
return new Object[][] {
{
new StringSerializer(), StringDeserializer.class
},
{
new LongSerializer(), LongDeserializer.class
},
{
new IntegerSerializer(), IntegerDeserializer.class,
},
{
new DoubleSerializer(), DoubleDeserializer.class,
},
{
new BytesSerializer(), BytesDeserializer.class
},
{
new ByteBufferSerializer(), ByteBufferDeserializer.class
},
{
new ByteArraySerializer(), ByteArrayDeserializer.class
}
};
}
示例4
@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
return Mono.fromCallable(() -> {
var properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
try (
var consumer = new KafkaConsumer<ByteBuffer, ByteBuffer>(
properties,
new ByteBufferDeserializer(),
new ByteBufferDeserializer()
)
) {
consumer.subscribe(List.of(topic));
var endOffsets = consumer.endOffsets(
consumer.partitionsFor(topic).stream()
.map(it -> new TopicPartition(topic, it.partition()))
.collect(Collectors.toSet())
);
return endOffsets.entrySet().stream().collect(Collectors.toMap(
it -> it.getKey().partition(),
it -> it.getValue() - 1
));
}
}).subscribeOn(Schedulers.elastic()).toFuture();
}