Java源码示例:org.apache.kafka.common.serialization.ByteBufferDeserializer

示例1
public void init() {

        logger.info(new StringBuffer("[KafkaConsumer] [init] ")
                .append("kafkaConnect(").append("192.168.4.5:9092")
                .append(") groupId(").append(groupId)
                .append(") topic(").append(topic).append(")").toString());

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.4.5:9092");
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", ByteBufferDeserializer.class);
        props.put("value.deserializer", ByteBufferDeserializer.class);

        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);

    }
 
示例2
static Deserializer getDeserializer(Serializer serializer) {
    if (serializer instanceof StringSerializer) {
        return new StringDeserializer();
    } else if (serializer instanceof LongSerializer) {
        return new LongDeserializer();
    } else if (serializer instanceof IntegerSerializer) {
        return new IntegerDeserializer();
    } else if (serializer instanceof DoubleSerializer) {
        return new DoubleDeserializer();
    } else if (serializer instanceof BytesSerializer) {
        return new BytesDeserializer();
    } else if (serializer instanceof ByteBufferSerializer) {
        return new ByteBufferDeserializer();
    } else if (serializer instanceof ByteArraySerializer) {
        return new ByteArrayDeserializer();
    } else {
        throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
    }
}
 
示例3
@DataProvider(name = "serializers")
public Object[][] serializers() {
    return new Object[][] {
        {
            new StringSerializer(), StringDeserializer.class
        },
        {
            new LongSerializer(), LongDeserializer.class
        },
        {
            new IntegerSerializer(), IntegerDeserializer.class,
        },
        {
            new DoubleSerializer(), DoubleDeserializer.class,
        },
        {
            new BytesSerializer(), BytesDeserializer.class
        },
        {
            new ByteBufferSerializer(), ByteBufferDeserializer.class
        },
        {
            new ByteArraySerializer(), ByteArrayDeserializer.class
        }
    };
}
 
示例4
@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
    return Mono.fromCallable(() -> {
        var properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");

        try (
                var consumer = new KafkaConsumer<ByteBuffer, ByteBuffer>(
                        properties,
                        new ByteBufferDeserializer(),
                        new ByteBufferDeserializer()
                )
        ) {
            consumer.subscribe(List.of(topic));

            var endOffsets = consumer.endOffsets(
                    consumer.partitionsFor(topic).stream()
                            .map(it -> new TopicPartition(topic, it.partition()))
                            .collect(Collectors.toSet())
            );

            return endOffsets.entrySet().stream().collect(Collectors.toMap(
                    it -> it.getKey().partition(),
                    it -> it.getValue() - 1
            ));
        }
    }).subscribeOn(Schedulers.elastic()).toFuture();
}