提问者:小点点

Kafka图式。登记处。提供了url,但不是已知的配置


试图使用模式注册表在主题上发布json消息,但下面出错了。

配置的架构。登记处。url'已提供,但不是已知的配置

应用程序yml fle

server:
  port: 9080
spring:
  kafka:
    properties:
      bootstrap.servers: server1:8080 
      schema.registry.url: https://bctdsdg:8081/
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: JsonSerializer.class        
    ssl:
      keystore-location: classpath:cert.jks
      keystore-password: pwd
      key-password: pwd
      truststore-location: classpath:dev_cacerts.jks
      truststore-password: pwd

Kafka配置

@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Autowired
    private KafkaProperties kafkaProperties;
    
    @Bean
      public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.schemaregjson.serdes.JsonSerializer.class);
        props.put("schema.registry.url", "https://bctdsdg:8081/");
        props.putAll(kafkaProperties.getSsl().buildProperties());
        props.putAll(kafkaProperties.getProperties());
        return props;
      }
    
     @Bean
        public ProducerFactory<String, User> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
    @Bean
    public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
}

----------------------------------------------

@SpringBootApplication
public class SchemaregjsonApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SchemaregjsonApplication.class, args);
    }
    
    @Autowired
    private JsonProducer jsonproducer;
    User user = new User().withAge(34).withFirstname("Don").withLastname("Joe");
    @Override
    public void run(String... args) throws Exception {
        jsonproducer.send(user);
    }
}

共1个答案

匿名用户

在生产者和消费者属性下配置架构注册表url。

应用程序yml fle

spring:
  kafka:
      bootstrap.servers: server1:8080 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: JsonSerializer.class   
    properties:
      schema.registry.url: http://localhost:8081      
    ssl:
      keystore-location: classpath:cert.jks
      keystore-password: pwd
      key-password: pwd
      truststore-location: classpath:dev_cacerts.jks
      truststore-password: pwd