提问者:小点点

Spring boot应用程序未使用kafka和restcontroller启动


我有一个运行良好的Spring启动应用程序,直到我在我的应用程序中包含Kafka消费者和生产者。运行完全没有问题的代码是有一个restController,如下所示:

@RestController
public class OrderResource {
    //Get orderheaderkeys for a particular date
    //OrderLine
    @GetMapping("/orderForDate/{forDate}")
    public List<String> findOrderHeaderKeys(@PathVariable String forDate) {
        //Some business logic
        return keys;
    }
}

这个rest终点给出了期望的响应。现在,我包括Kafka制作人和消费者

@Component
public class KafkaProducerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaProducerClient.class);
    private KafkaProducer<String, String> producer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(properties);
    }

    public void sendMessageAsync(String topic, String key, String jsonString) {
        logger.info("Sending message async to kafka topic with key = {}", key);
        long startTime = System.currentTimeMillis();
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, jsonString);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                final long timeTaken = System.currentTimeMillis() - startTime;
                if (recordMetadata != null) {
                    logger.info("Producer sent record(key={}, value={}). " +
                                    "Topic={}, Partition={}, Offset={}, timeTaken={}",
                            record.key(), record.value(), topic, recordMetadata.partition(),
                            recordMetadata.offset(), String.valueOf(timeTaken));
                }
                if (exception != null) {
                    logger.error("Exception occurred while posting message", exception.getMessage());
                    return;
                }
            }
        });
        logger.info("Message sent to kafka topic with key = {}", key);
    }

    public void sendMessageSync(String topic, String key, String jsonString) {
        try {
            logger.info("Sending message sync to kafka topic={} with key={}", topic, key);
            long startTime = System.currentTimeMillis();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, jsonString);
            Future<RecordMetadata> future = producer.send(record);
            producer.flush();
            RecordMetadata recordMetadata = future.get();
            final long timeTaken = System.currentTimeMillis() - startTime;
            if (recordMetadata != null) {
                logger.info(
                        "Producer sent message by sendMessageSync. record={}. timeTaken={}",
                        recordMetadata,
                        String.valueOf(timeTaken));
            }
        } catch (Exception ex) {
            logger.error("Exception occured....", ex);
        }

    }

    @PreDestroy
    private void shutdown(){
        producer.close();
    }
}
@Component
public class KafkaConsumerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerClient.class);
    private KafkaConsumer<String, String> consumer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @Value("${kafka.topic}")
    private String topic;

    @Value("${zookeeper.groupId}")
    private String groupId;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                try {
                    logger.info("Key: " + record.key() + ", Value: " + record.value());
                    //orderResource.saveOrderToSecondaryStore(record.value().toString());
                }catch (Exception e){
                    logger.error("Exception while processing Kafka message", e);
                }
            }
        }
    }
}

在包含这些消费者和生产者之后,我的应用程序没有启动。我没有看到之前应用程序运行正常时显示的以下行。

2019-12-12 15:01:12.090信息38376---[重新启动主]o.s.b.w.embedded。公猫TomcatWebServer:Tomcat在端口8080(http)上启动,上下文路径为2019-12-12 15:01:12.093信息38376---[restartedMain]c.w.c.o.p.MySpringApplication:15.187秒内启动MySpringApplication(JVM运行15.617)


共2个答案

匿名用户

在Tomcat服务器上部署Spring Boot应用程序:更新pom。xml:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>

更新主应用程序类:

@SpringBootApplication
public class Application extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(Application.class);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

和更新kafka依赖和配置也为更多的细节如下链接-https://www.confluent.io/blog/apache-kafka-spring-boot-application/

匿名用户

我通过将消费者轮询(while循环)移到KafkaConsumerClient的init方法之外,解决了这个问题

@PostConstruct
public void init() {
}