我在本地机器上运行Confluent JDBC连接器,并试图找出它存储有关上次读取ID和时间戳的信息的位置。
根据https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#starting-table-capture
它需要在offset.storage. topic
中,但我尝试设置配置,但没有成功。
我的连接器:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"connection.password": "test123",
"tasks.max": "1",
"topics": "dp_status",
"table.whitelist": "source_test",
"mode": "timestamp+incrementing",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topic.prefix": "test-postgresql-jdbc-source.",
"connection.user": "test",
"name": "postgresql-jdbc-source-gp",
"connection.url": "jdbc:postgresql://postgresql:5432/test_db",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}
如果您在独立模式下运行Kafka Connect,则有关偏移量的信息(因此在JDBC源代码的情况下,ID/时间戳)存储在由Kafka Connect工作器属性中的offset.storage. file.filename
定义的本地文件中。