提问者:小点点

Apache Kafka:生产者不生成所有数据


我是kafka新手。我的要求是,我在数据库源和目标中有两个表。现在我想从源表中获取数据并将其存储到目标kafka将作为生产者和消费者工作。我已经完成了代码,但问题是当生产者生成数据时,一些数据会错过生成。例如,如果我在源表中有100条记录,那么它不会生成所有100条记录。我正在使用Kafka-0.10

MyProducer配置-

bootstrap.servers=192.168.1.XXX:9092,192.168.1.XXX:9093,192.168.1.XXX:9094
acks=all
retries=2
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

我的生产者代码:-

public void run() {
    SourceDAO sourceDAO = new SourceDAO();
    Source source;
    int id;
    try {
        logger.debug("INSIDE RUN");
        List<Source> listOfEmployee = sourceDAO.getAllSource();
        Iterator<Source> sourceIterator = listOfEmployee.iterator();
        String sourceJson;
        Gson gson = new Gson();
        while(sourceIterator.hasNext()) {
            source = sourceIterator.next();
            sourceJson = gson.toJson(source);
            id = source.getId();
            producerRecord = new ProducerRecord<Integer, String>(TOPIC, id, sourceJson);
            producerRecords.add(producerRecord);
        }

        for(ProducerRecord<Integer, String> record : producerRecords) {
            logger.debug("Producer Record: " + record.value());
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    logger.debug("Exception: " + exception);
                    if (exception != null)
                        throw new RuntimeException(exception.getMessage());
                    logger.info("The offset of the record we just sent is: " + metadata.offset()
                            + " In Partition : " + metadata.partition());
                }
            });
        }
        producer.close();
        producer.flush();
        logger.info("Size of Record: " + producerRecords.size());
    } catch (SourceServiceException e) {
        logger.error("Unable to Produce data...", e);
        throw new RuntimeException("Unable to Produce data...", e);
    }
}

我的消费者配置:-

bootstrap.servers=192.168.1.XXX:9092,192.168.1.231:XXX,192.168.1.232:XXX
group.id=consume
client.id=C1
enable.auto.commit=true
auto.commit.interval.ms=1000
max.partition.fetch.bytes=10485760
session.timeout.ms=35000
consumer.timeout.ms=35000
auto.offset.reset=earliest
message.max.bytes=10000000
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

值。去序列化r=org. apache.kafka.公共.序列化。字符串去序列化r

消费者守则:-

public void doWork() {
    logger.debug("Inside doWork of DestinationConsumer");
    DestinationDAO destinationDAO = new DestinationDAO();
    consumer.subscribe(Collections.singletonList(this.TOPIC));
    while(true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
        int minBatchSize = 1;
        for(ConsumerRecord<String, String> rec : consumerRecords) {
            logger.debug("Consumer Recieved Record: " + rec);
            consumerRecordsList.add(rec);
        }
        logger.debug("Record Size: " + consumerRecordsList.size());
        if(consumerRecordsList.size() >= minBatchSize) {
            try {
                destinationDAO.insertSourceDataIntoDestination(consumerRecordsList);
            } catch (DestinationServiceException e) {
                logger.error("Unable to update destination table");
            }
        }
    }
}

共1个答案

匿名用户

从这里可以看到的情况来看,我猜你没有刷新或关闭生产者。您应该注意send是异步运行的,只需准备一批稍后发送(取决于您的生产者的配置):

从kafka留档

send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中并立即返回。这允许生产者将单个记录批处理在一起以提高效率。

您应该尝试在遍历所有producerRecords后调用producer. close()(BTW:为什么要缓存整个producerRecords,这可能会在您有很多记录时导致问题)。

如果这没有帮助,您应该尝试使用例如控制台消费者来找出缺少的内容。请提供更多代码。生产者是如何配置的?您的消费者是什么样子的?producerRecords的类型是什么?

希望有帮助。