Java源码示例:org.apache.kafka.connect.connector.ConnectorContext

示例1
KafkaMonitor(
    ConnectorContext context,
    SourceConfig config,
    Consumer<byte[], byte[]> sourceConsumer,
    Consumer<byte[], byte[]> destinationConsumer,
    TaskConfigBuilder taskConfigBuilder) {
  this.context = context;
  this.topicsWhitelist = config.getTopicsWhitelist();
  this.monitorPollWaitMs = config.getMonitorPollWaitMs();
  this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex());
  this.sourceConsumer = sourceConsumer;
  this.destinationConsumer = destinationConsumer;
  if (topicsWhitelist.isEmpty() && config.getTopicsRegex().isEmpty()) {
    logger.warn("No whitelist configured");
  }
  this.taskConfigBuilder = taskConfigBuilder;
  this.validationStrategy =
      config.getEnablePartitionMatching()
          ? SourcePartitionValidator.MatchingStrategy.PARTITION
          : SourcePartitionValidator.MatchingStrategy.TOPIC;
  this.topicCheckingEnabled = config.getTopicCheckingEnabled();
  this.routers = this.validateTransformations(config.transformations());
}
 
示例2
@Before
public void setUp() {
  Map<String, String> properties = getBaseProperties();
  SourceConfig config = new SourceConfig(properties);
  this.mockSourceConsumer = mockSourceConsumer();
  this.mockDestinationConsumer = mockDestinationConsumer();
  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
  kafkaMonitor =
      new KafkaMonitor(
          mock(ConnectorContext.class),
          config,
          mockSourceConsumer,
          mockDestinationConsumer,
          taskConfigBuilder);
}
 
示例3
@Test
public void shouldAlwaysReplicateWhenCheckingDisabled() {
  Map<String, String> properties = getBaseProperties();
  properties.put(SourceConfigDefinition.ENABLE_DESTINATION_TOPIC_CHECKING.getKey(), "false");
  SourceConfig config = new SourceConfig(properties);
  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
  KafkaMonitor monitor =
      new KafkaMonitor(
          mock(ConnectorContext.class),
          config,
          mockSourceConsumer,
          mockDestinationConsumer,
          taskConfigBuilder);
  monitor.partitionsChanged();
  List<Map<String, String>> result = monitor.taskConfigs(3);

  List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);

  assertThat(partitions, hasItem(new TopicPartition("topic5", 0)));
}
 
示例4
@Test(expected = IllegalArgumentException.class)
public void shouldThrowWhenUnsupportedTransformationEncountered() {
  Map<String, String> properties = getBaseProperties();
  properties.put("transforms", "reroute");
  properties.put(
      "transforms.reroute.type", "org.apache.kafka.connect.transforms.TimestampRouter");
  SourceConfig config = new SourceConfig(properties);
  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);

  new KafkaMonitor(
      mock(ConnectorContext.class),
      config,
      mockSourceConsumer,
      mockDestinationConsumer,
      taskConfigBuilder);
}
 
示例5
@Before
public void setUp() throws Exception {
  connector = new KafkaSourceConnector();
  connectorContextMock = PowerMock.createMock(ConnectorContext.class);
  partitionMonitorMock = PowerMock.createMock(PartitionMonitor.class);
  connector.initialize(connectorContextMock);

  // Default test settings
  sourceProperties = new HashMap<>();
  sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, SOURCE_TOPICS_VALUE);
  sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, SOURCE_BOOTSTRAP_SERVERS_CONFIG);
  sourceProperties.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, POLL_LOOP_TIMEOUT_MS_VALUE);
  sourceProperties.put(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG, TOPIC_LIST_TIMEOUT_MS_VALUE);
  sourceProperties.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, CONSUMER_GROUP_ID_VALUE);


  // Default leader topic partitions to return (just one)
  stubLeaderTopicPartitions = new HashSet<>();
  LeaderTopicPartition leaderTopicPartition = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
  stubLeaderTopicPartitions.add(leaderTopicPartition);
}
 
示例6
@Test
public void testTaskConfigsReturns1TaskOnOneTopicPartition() throws Exception {
  PowerMock
      .expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
          EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
      .andStubReturn(partitionMonitorMock);
  partitionMonitorMock.start();
  PowerMock.expectLastCall().andVoid();
  EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
  PowerMock.replayAll();

  connector.start(sourceProperties);
  List<Map<String, String>> taskConfigs = connector.taskConfigs(2);

  assertEquals(1, taskConfigs.size());
  assertEquals("0:test.topic:0", taskConfigs.get(0).get("task.leader.topic.partitions"));
  assertEquals(SOURCE_TOPICS_VALUE, taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG));
  assertEquals(SOURCE_BOOTSTRAP_SERVERS_CONFIG,
      taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG));

  verifyAll();
}
 
示例7
@Test
public void testTaskConfigsReturns1TaskOnTwoTopicPartitions() throws Exception {
  // Default leader topic partitions to return (just one)
  stubLeaderTopicPartitions = new HashSet<>();
  LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
  stubLeaderTopicPartitions.add(leaderTopicPartition1);
  LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
  stubLeaderTopicPartitions.add(leaderTopicPartition2);

  PowerMock
      .expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
          EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
      .andStubReturn(partitionMonitorMock);

  partitionMonitorMock.start();
  PowerMock.expectLastCall().andVoid();
  EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
  PowerMock.replayAll();

  connector.start(sourceProperties);
  List<Map<String, String>> taskConfigs = connector.taskConfigs(1);

  assertEquals(1, taskConfigs.size());

  PowerMock.verifyAll();
}
 
示例8
@Test
public void testTaskConfigsReturns2TasksOnTwoTopicPartitions() throws Exception {
  // Default leader topic partitions to return (just one)
  stubLeaderTopicPartitions = new HashSet<>();
  LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
  stubLeaderTopicPartitions.add(leaderTopicPartition1);
  LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
  stubLeaderTopicPartitions.add(leaderTopicPartition2);

  PowerMock
      .expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
          EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
      .andStubReturn(partitionMonitorMock);

  partitionMonitorMock.start();
  PowerMock.expectLastCall().andVoid();
  EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
  PowerMock.replayAll();

  connector.start(sourceProperties);
  List<Map<String, String>> taskConfigs = connector.taskConfigs(2);

  assertEquals(2, taskConfigs.size());

  PowerMock.verifyAll();
}
 
示例9
KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) {
  this(
      context,
      config,
      newSourceConsumer(config),
      newDestinationConsumer(config),
      taskConfigBuilder);
}
 
示例10
@Test
public void shouldApplyTopicRenameTransforms() {
  Map<String, String> properties = getBaseProperties();
  properties.put(SourceConfigDefinition.TOPICS_REGEX.getKey(), "reroute.*");
  properties.put("transforms", "reroute");
  properties.put("transforms.reroute.type", "org.apache.kafka.connect.transforms.RegexRouter");
  properties.put("transforms.reroute.regex", "^reroute\\.outgoing$");
  properties.put("transforms.reroute.replacement", "reroute.incoming");
  SourceConfig config = new SourceConfig(properties);

  MockConsumer<byte[], byte[]> mockSource = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  updateMockPartitions(mockSource, "reroute.outgoing", 1);

  MockConsumer<byte[], byte[]> mockDest = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  updateMockPartitions(mockDest, "reroute.incoming", 1);

  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
  KafkaMonitor monitor =
      new KafkaMonitor(
          mock(ConnectorContext.class), config, mockSource, mockDest, taskConfigBuilder);

  monitor.partitionsChanged();
  List<Map<String, String>> result = monitor.taskConfigs(3);

  List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);
  assertThat(partitions, contains(new TopicPartition("reroute.outgoing", 0)));
}
 
示例11
@Test
public void shouldAllowUnsupportedTransformationWhenCheckingDisabled() {
  Map<String, String> properties = getBaseProperties();
  properties.put(SourceConfigDefinition.ENABLE_DESTINATION_TOPIC_CHECKING.getKey(), "false");
  properties.put("transforms", "reroute");
  properties.put(
      "transforms.reroute.type", "org.apache.kafka.connect.transforms.TimestampRouter");
  SourceConfig config = new SourceConfig(properties);
  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);
  KafkaMonitor monitor =
      new KafkaMonitor(
          mock(ConnectorContext.class),
          config,
          mockSourceConsumer,
          mockDestinationConsumer,
          taskConfigBuilder);
  monitor.partitionsChanged();
  List<Map<String, String>> result = monitor.taskConfigs(3);

  List<TopicPartition> partitions = assignedTopicPartitionsFromTaskConfigs(result);

  // All topics matching assignment regex, even ones not present in destination, should be
  // replicated
  assertThat(
      partitions,
      containsInAnyOrder(
          new TopicPartition("topic1", 0),
          new TopicPartition("topic1", 1),
          new TopicPartition("topic2", 0),
          new TopicPartition("topic3", 0),
          new TopicPartition("topic4", 0),
          new TopicPartition("topic5", 0)));
}
 
示例12
@Test
public void shouldContinueRunningWhenExceptionEncountered() throws InterruptedException {
  Map<String, String> properties = getBaseProperties();
  SourceConfig config = new SourceConfig(properties);
  TaskConfigBuilder taskConfigBuilder =
      new TaskConfigBuilder(new RoundRobinTaskAssignor(), config);

  // Require two thrown exceptions to ensure that the KafkaMonitor run loop executes more than
  // once
  CountDownLatch exceptionThrownLatch = new CountDownLatch(2);
  MockConsumer<byte[], byte[]> consumer =
      new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
        @Override
        public Map<String, List<PartitionInfo>> listTopics() {
          exceptionThrownLatch.countDown();
          throw new TimeoutException("KABOOM!");
        }
      };

  kafkaMonitor =
      new KafkaMonitor(
          mock(ConnectorContext.class),
          config,
          consumer,
          mockDestinationConsumer,
          taskConfigBuilder);
  Thread monitorThread = new Thread(kafkaMonitor);
  monitorThread.start();
  exceptionThrownLatch.await(2, TimeUnit.SECONDS);
  monitorThread.join(1);

  assertThat(monitorThread.getState(), not(State.TERMINATED));
  kafkaMonitor.stop();
  monitorThread.interrupt();
  monitorThread.join(5000);
}
 
示例13
@Test
public void testStartCorrectConfig() throws Exception {
  PowerMock
      .expectNew(PartitionMonitor.class, new Class<?>[] { ConnectorContext.class, KafkaSourceConnectorConfig.class },
          EasyMock.anyObject(ConnectorContext.class), EasyMock.anyObject(KafkaSourceConnectorConfig.class))
      .andStubReturn(partitionMonitorMock);
  partitionMonitorMock.start();
  PowerMock.expectLastCall().andVoid();
  PowerMock.replayAll();

  connector.start(sourceProperties);

  verifyAll();
}
 
示例14
@Before
public void setup() {
    connector = new FileStreamSinkConnector();
    ctx = PowerMock.createMock(ConnectorContext.class);
    connector.initialize(ctx);

    sinkProperties = new HashMap<>();
    sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
    sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
}
 
示例15
@Before
public void setup() {
    connector = new FileStreamSourceConnector();
    ctx = PowerMock.createMock(ConnectorContext.class);
    connector.initialize(ctx);

    sourceProperties = new HashMap<>();
    sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
    sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
}
 
示例16
PartitionMonitor(ConnectorContext connectorContext, KafkaSourceConnectorConfig sourceConnectorConfig) {
  topicWhitelistPattern = sourceConnectorConfig.getTopicWhitelistPattern();
  reconfigureTasksOnLeaderChange = sourceConnectorConfig
      .getBoolean(KafkaSourceConnectorConfig.RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG);
  topicPollIntervalMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_POLL_INTERVAL_MS_CONFIG);
  maxShutdownWaitMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG);
  topicRequestTimeoutMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG);
  partitionMonitorClient = AdminClient.create(sourceConnectorConfig.getAdminClientProperties());
  // Thread to periodically poll the kafka cluster for changes in topics or
  // partitions
  pollThread = new Runnable() {
    @Override
    public void run() {
      if (!shutdown.get()) {
        logger.info("Fetching latest topic partitions.");
        try {
          Set<LeaderTopicPartition> retrievedLeaderTopicPartitions = retrieveLeaderTopicPartitions(
              topicRequestTimeoutMs);
          if (logger.isDebugEnabled()) {
            logger.debug("retrievedLeaderTopicPartitions: {}", retrievedLeaderTopicPartitions);
            logger.debug("currentLeaderTopicPartitions: {}", currentLeaderTopicPartitions);
          }
          boolean requestTaskReconfiguration = false;
          if (currentLeaderTopicPartitions != null) {
            if (reconfigureTasksOnLeaderChange) {
              if (!retrievedLeaderTopicPartitions.equals(currentLeaderTopicPartitions)) {
                logger.info("Retrieved leaders and topic partitions do not match currently stored leaders and topic partitions, will request task reconfiguration");
                requestTaskReconfiguration = true;
              }
            } else {
              Set<TopicPartition> retrievedTopicPartitions = retrievedLeaderTopicPartitions.stream().map(LeaderTopicPartition::toTopicPartition).collect(Collectors.toSet());
              if (logger.isDebugEnabled())
                logger.debug("retrievedTopicPartitions: {}", retrievedTopicPartitions);
              Set<TopicPartition> currentTopicPartitions = currentLeaderTopicPartitions.stream().map(LeaderTopicPartition::toTopicPartition).collect(Collectors.toSet());
              if (logger.isDebugEnabled())
                logger.debug("currentTopicPartitions: {}", currentTopicPartitions);
              if (!retrievedTopicPartitions.equals(currentTopicPartitions)) {
                logger.info("Retrieved topic partitions do not match currently stored topic partitions, will request task reconfiguration");
                requestTaskReconfiguration = true;
              }
            }
            setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions);
            if (requestTaskReconfiguration)
              connectorContext.requestTaskReconfiguration();
            else
              logger.info("No partition changes which require reconfiguration have been detected.");
          } else {
            setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions);
          }
        } catch (TimeoutException e) {
          logger.error(
              "Timeout while waiting for AdminClient to return topic list. This indicates a (possibly transient) connection issue, or is an indicator that the timeout is set too low. {}",
              e);
        } catch (ExecutionException e) {
          logger.error("Unexpected ExecutionException. {}", e);
        } catch (InterruptedException e) {
          logger.error("InterruptedException. Probably shutting down. {}, e");
        }
      }
    }
  };
}
 
示例17
@Before
public void setupByHostAndPort() {
    connector = new MongodbSourceConnector();
    context = PowerMock.createMock(ConnectorContext.class);
    connector.initialize(context);
}
 
示例18
/**
 * Initialise the connector
 * @param ctx context of the connector
 */
@Override
public void initialize(ConnectorContext ctx) {
  //do nothing    
}
 
示例19
/**
 * Initialise the connector
 * @param ctx context of the connector
 * @param taskConfigs task configuration
 */
@Override
public void initialize(ConnectorContext ctx,
            List<Map<String,String>> taskConfigs) {
  //do nothing
}