Java源码示例:org.apache.kafka.connect.connector.ConnectorContext
示例1
KafkaMonitor(
ConnectorContext context,
SourceConfig config,
Consumer<byte[], byte[]> sourceConsumer,
Consumer<byte[], byte[]> destinationConsumer,
TaskConfigBuilder taskConfigBuilder) {
this.context = context;
this.topicsWhitelist = config.getTopicsWhitelist();
this.monitorPollWaitMs = config.getMonitorPollWaitMs();
this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex());
this.sourceConsumer = sourceConsumer;
this.destinationConsumer = destinationConsumer;
if (topicsWhitelist.isEmpty() && config.getTopicsRegex().isEmpty()) {
logger.warn("No whitelist configured");
}
this.taskConfigBuilder = taskConfigBuilder;
this.validationStrategy =
config.getEnablePartitionMatching()
? SourcePartitionValidator.MatchingStrategy.PARTITION
: SourcePartitionValidator.MatchingStrategy.TOPIC;
this.topicCheckingEnabled = config.getTopicCheckingEnabled();
this.routers = this.validateTransformations(config.transformations());
}
示例2
@Before
public void setUp() {
Map<String, String> properties = getBaseProperties();
SourceConfig config = new SourceConfig(properties);
this.mockSourceConsumer = mockSourceConsumer();
this.mockDestinationConsumer = mockDestinationConsumer();
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
kafkaMonitor =
new KafkaMonitor(
mock(ConnectorContext.class),
config,
mockSourceConsumer,
mockDestinationConsumer,
taskConfigBuilder);
}
示例3
@Test
public void shouldAlwaysReplicateWhenCheckingDisabled() {
Map<String, String> properties = getBaseProperties();
properties.put(SourceConfigDefinition.ENABLE_DESTINATION_TOPIC_CHECKING.getKey(), "false");
SourceConfig config = new SourceConfig(properties);
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
KafkaMonitor monitor =
new KafkaMonitor(
mock(ConnectorContext.class),
config,
mockSourceConsumer,
mockDestinationConsumer,
taskConfigBuilder);
monitor.partitionsChanged();
List<Map<String, String>> result = monitor.taskConfigs(3);
List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);
assertThat(partitions, hasItem(new TopicPartition("topic5", 0)));
}
示例4
@Test(expected = IllegalArgumentException.class)
public void shouldThrowWhenUnsupportedTransformationEncountered() {
Map<String, String> properties = getBaseProperties();
properties.put("transforms", "reroute");
properties.put(
"transforms.reroute.type", "org.apache.kafka.connect.transforms.TimestampRouter");
SourceConfig config = new SourceConfig(properties);
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
new KafkaMonitor(
mock(ConnectorContext.class),
config,
mockSourceConsumer,
mockDestinationConsumer,
taskConfigBuilder);
}
示例5
@Before
public void setUp() throws Exception {
connector = new KafkaSourceConnector();
connectorContextMock = PowerMock.createMock(ConnectorContext.class);
partitionMonitorMock = PowerMock.createMock(PartitionMonitor.class);
connector.initialize(connectorContextMock);
// Default test settings
sourceProperties = new HashMap<>();
sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, SOURCE_TOPICS_VALUE);
sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, SOURCE_BOOTSTRAP_SERVERS_CONFIG);
sourceProperties.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, POLL_LOOP_TIMEOUT_MS_VALUE);
sourceProperties.put(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG, TOPIC_LIST_TIMEOUT_MS_VALUE);
sourceProperties.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, CONSUMER_GROUP_ID_VALUE);
// Default leader topic partitions to return (just one)
stubLeaderTopicPartitions = new HashSet<>();
LeaderTopicPartition leaderTopicPartition = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
stubLeaderTopicPartitions.add(leaderTopicPartition);
}
示例6
@Test
public void testTaskConfigsReturns1TaskOnOneTopicPartition() throws Exception {
PowerMock
.expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
.andStubReturn(partitionMonitorMock);
partitionMonitorMock.start();
PowerMock.expectLastCall().andVoid();
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
PowerMock.replayAll();
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(2);
assertEquals(1, taskConfigs.size());
assertEquals("0:test.topic:0", taskConfigs.get(0).get("task.leader.topic.partitions"));
assertEquals(SOURCE_TOPICS_VALUE, taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG));
assertEquals(SOURCE_BOOTSTRAP_SERVERS_CONFIG,
taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG));
verifyAll();
}
示例7
@Test
public void testTaskConfigsReturns1TaskOnTwoTopicPartitions() throws Exception {
// Default leader topic partitions to return (just one)
stubLeaderTopicPartitions = new HashSet<>();
LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
stubLeaderTopicPartitions.add(leaderTopicPartition1);
LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
stubLeaderTopicPartitions.add(leaderTopicPartition2);
PowerMock
.expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
.andStubReturn(partitionMonitorMock);
partitionMonitorMock.start();
PowerMock.expectLastCall().andVoid();
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
PowerMock.replayAll();
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
PowerMock.verifyAll();
}
示例8
@Test
public void testTaskConfigsReturns2TasksOnTwoTopicPartitions() throws Exception {
// Default leader topic partitions to return (just one)
stubLeaderTopicPartitions = new HashSet<>();
LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
stubLeaderTopicPartitions.add(leaderTopicPartition1);
LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
stubLeaderTopicPartitions.add(leaderTopicPartition2);
PowerMock
.expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
.andStubReturn(partitionMonitorMock);
partitionMonitorMock.start();
PowerMock.expectLastCall().andVoid();
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
PowerMock.replayAll();
connector.start(sourceProperties);
List<Map<String, String>> taskConfigs = connector.taskConfigs(2);
assertEquals(2, taskConfigs.size());
PowerMock.verifyAll();
}
示例9
KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) {
this(
context,
config,
newSourceConsumer(config),
newDestinationConsumer(config),
taskConfigBuilder);
}
示例10
@Test
public void shouldApplyTopicRenameTransforms() {
Map<String, String> properties = getBaseProperties();
properties.put(SourceConfigDefinition.TOPICS_REGEX.getKey(), "reroute.*");
properties.put("transforms", "reroute");
properties.put("transforms.reroute.type", "org.apache.kafka.connect.transforms.RegexRouter");
properties.put("transforms.reroute.regex", "^reroute\\.outgoing$");
properties.put("transforms.reroute.replacement", "reroute.incoming");
SourceConfig config = new SourceConfig(properties);
MockConsumer<byte[], byte[]> mockSource = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
updateMockPartitions(mockSource, "reroute.outgoing", 1);
MockConsumer<byte[], byte[]> mockDest = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
updateMockPartitions(mockDest, "reroute.incoming", 1);
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
KafkaMonitor monitor =
new KafkaMonitor(
mock(ConnectorContext.class), config, mockSource, mockDest, taskConfigBuilder);
monitor.partitionsChanged();
List<Map<String, String>> result = monitor.taskConfigs(3);
List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);
assertThat(partitions, contains(new TopicPartition("reroute.outgoing", 0)));
}
示例11
@Test
public void shouldAllowUnsupportedTransformationWhenCheckingDisabled() {
Map<String, String> properties = getBaseProperties();
properties.put(SourceConfigDefinition.ENABLE_DESTINATION_TOPIC_CHECKING.getKey(), "false");
properties.put("transforms", "reroute");
properties.put(
"transforms.reroute.type", "org.apache.kafka.connect.transforms.TimestampRouter");
SourceConfig config = new SourceConfig(properties);
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
KafkaMonitor monitor =
new KafkaMonitor(
mock(ConnectorContext.class),
config,
mockSourceConsumer,
mockDestinationConsumer,
taskConfigBuilder);
monitor.partitionsChanged();
List<Map<String, String>> result = monitor.taskConfigs(3);
List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);
// All topics matching assignment regex, even ones not present in destination, should be
// replicated
assertThat(
partitions,
containsInAnyOrder(
new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic2", 0),
new TopicPartition("topic3", 0),
new TopicPartition("topic4", 0),
new TopicPartition("topic5", 0)));
}
示例12
@Test
public void shouldContinueRunningWhenExceptionEncountered() throws InterruptedException {
Map<String, String> properties = getBaseProperties();
SourceConfig config = new SourceConfig(properties);
TaskConfigBuilder taskConfigBuilder =
new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
// Require two thrown exceptions to ensure that the KafkaMonitor run loop executes more than
// once
CountDownLatch exceptionThrownLatch = new CountDownLatch(2);
MockConsumer<byte[], byte[]> consumer =
new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public Map<String, List<PartitionInfo>> listTopics() {
exceptionThrownLatch.countDown();
throw new TimeoutException("KABOOM!");
}
};
kafkaMonitor =
new KafkaMonitor(
mock(ConnectorContext.class),
config,
consumer,
mockDestinationConsumer,
taskConfigBuilder);
Thread monitorThread = new Thread(kafkaMonitor);
monitorThread.start();
exceptionThrownLatch.await(2, TimeUnit.SECONDS);
monitorThread.join(1);
assertThat(monitorThread.getState(), not(State.TERMINATED));
kafkaMonitor.stop();
monitorThread.interrupt();
monitorThread.join(5000);
}
示例13
@Test
public void testStartCorrectConfig() throws Exception {
PowerMock
.expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
.andStubReturn(partitionMonitorMock);
partitionMonitorMock.start();
PowerMock.expectLastCall().andVoid();
PowerMock.replayAll();
connector.start(sourceProperties);
verifyAll();
}
示例14
@Before
public void setup() {
connector = new FileStreamSinkConnector();
ctx = PowerMock.createMock(ConnectorContext.class);
connector.initialize(ctx);
sinkProperties = new HashMap<>();
sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
}
示例15
@Before
public void setup() {
connector = new FileStreamSourceConnector();
ctx = PowerMock.createMock(ConnectorContext.class);
connector.initialize(ctx);
sourceProperties = new HashMap<>();
sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
}
示例16
PartitionMonitor(ConnectorContext connectorContext, KafkaSourceConnectorConfig sourceConnectorConfig) {
topicWhitelistPattern = sourceConnectorConfig.getTopicWhitelistPattern();
reconfigureTasksOnLeaderChange = sourceConnectorConfig
.getBoolean(KafkaSourceConnectorConfig.RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG);
topicPollIntervalMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_POLL_INTERVAL_MS_CONFIG);
maxShutdownWaitMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG);
topicRequestTimeoutMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG);
partitionMonitorClient = AdminClient.create(sourceConnectorConfig.getAdminClientProperties());
// Thread to periodically poll the kafka cluster for changes in topics or
// partitions
pollThread = new Runnable() {
@Override
public void run() {
if (!shutdown.get()) {
logger.info("Fetching latest topic partitions.");
try {
Set<LeaderTopicPartition> retrievedLeaderTopicPartitions = retrieveLeaderTopicPartitions(
topicRequestTimeoutMs);
if (logger.isDebugEnabled()) {
logger.debug("retrievedLeaderTopicPartitions: {}", retrievedLeaderTopicPartitions);
logger.debug("currentLeaderTopicPartitions: {}", currentLeaderTopicPartitions);
}
boolean requestTaskReconfiguration = false;
if (currentLeaderTopicPartitions != null) {
if (reconfigureTasksOnLeaderChange) {
if (!retrievedLeaderTopicPartitions.equals(currentLeaderTopicPartitions)) {
logger.info("Retrieved leaders and topic partitions do not match currently stored leaders and topic partitions, will request task reconfiguration");
requestTaskReconfiguration = true;
}
} else {
Set<TopicPartition> retrievedTopicPartitions = retrievedLeaderTopicPartitions.stream().map(LeaderTopicPartition::toTopicPartition).collect(Collectors.toSet());
if (logger.isDebugEnabled())
logger.debug("retrievedTopicPartitions: {}", retrievedTopicPartitions);
Set<TopicPartition> currentTopicPartitions = currentLeaderTopicPartitions.stream().map(LeaderTopicPartition::toTopicPartition).collect(Collectors.toSet());
if (logger.isDebugEnabled())
logger.debug("currentTopicPartitions: {}", currentTopicPartitions);
if (!retrievedTopicPartitions.equals(currentTopicPartitions)) {
logger.info("Retrieved topic partitions do not match currently stored topic partitions, will request task reconfiguration");
requestTaskReconfiguration = true;
}
}
setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions);
if (requestTaskReconfiguration)
connectorContext.requestTaskReconfiguration();
else
logger.info("No partition changes which require reconfiguration have been detected.");
} else {
setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions);
}
} catch (TimeoutException e) {
logger.error(
"Timeout while waiting for AdminClient to return topic list. This indicates a (possibly transient) connection issue, or is an indicator that the timeout is set too low. {}",
e);
} catch (ExecutionException e) {
logger.error("Unexpected ExecutionException. {}", e);
} catch (InterruptedException e) {
logger.error("InterruptedException. Probably shutting down. {}, e");
}
}
}
};
}
示例17
@Before
public void setupByHostAndPort() {
connector = new MongodbSourceConnector();
context = PowerMock.createMock(ConnectorContext.class);
connector.initialize(context);
}
示例18
/**
* Initialise the connector
* @param ctx context of the connector
*/
@Override
public void initialize(ConnectorContext ctx) {
//do nothing
}
示例19
/**
* Initialise the connector
* @param ctx context of the connector
* @param taskConfigs task configuration
*/
@Override
public void initialize(ConnectorContext ctx,
List<Map<String,String>> taskConfigs) {
//do nothing
}