Java源码示例:io.atomix.protocols.raft.cluster.RaftMember

示例1
@Override
public CompletableFuture<Void> bootstrap(Collection<MemberId> cluster) {
  if (joinFuture != null) {
    return joinFuture;
  }

  if (configuration == null) {
    member.setType(RaftMember.Type.ACTIVE);

    // Create a set of active members.
    Set<RaftMember> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.memberId()))
        .map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
        .collect(Collectors.toSet());

    // Add the local member to the set of active members.
    activeMembers.add(member);

    // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
    configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
  }
  return join();
}
 
示例2
/**
 * Transitions the server to the base state for the given member type.
 */
public void transition(RaftMember.Type type) {
  switch (type) {
    case ACTIVE:
      if (!(role instanceof ActiveRole)) {
        transition(RaftServer.Role.FOLLOWER);
      }
      break;
    case PROMOTABLE:
      if (this.role.role() != RaftServer.Role.PROMOTABLE) {
        transition(RaftServer.Role.PROMOTABLE);
      }
      break;
    case PASSIVE:
      if (this.role.role() != RaftServer.Role.PASSIVE) {
        transition(RaftServer.Role.PASSIVE);
      }
      break;
    default:
      if (this.role.role() != RaftServer.Role.INACTIVE) {
        transition(RaftServer.Role.INACTIVE);
      }
      break;
  }
}
 
示例3
/**
 * Commits the given configuration.
 */
protected CompletableFuture<Long> configure(Collection<RaftMember> members) {
  raft.checkThread();

  final long term = raft.getTerm();

  return appendAndCompact(new ConfigurationEntry(term, System.currentTimeMillis(), members))
      .thenComposeAsync(entry -> {
        // Store the index of the configuration entry in order to prevent other configurations from
        // being logged and committed concurrently. This is an important safety property of Raft.
        configuring = entry.index();
        raft.getCluster().configure(new Configuration(entry.index(), entry.entry().term(), entry.entry().timestamp(), entry.entry().members()));

        return appender.appendEntries(entry.index()).whenComplete((commitIndex, commitError) -> {
          raft.checkThread();
          if (isRunning() && commitError == null) {
            raft.getServiceManager().<OperationResult>apply(entry.index());
          }
          configuring = 0;
        });
      }, raft.getThreadContext());
}
 
示例4
/**
 * Tests joining a server after many entries have been committed.
 */
private void testServerJoinLate(RaftMember.Type type, RaftServer.Role role) throws Throwable {
  createServers(3);
  RaftClient client = createClient();
  TestPrimitive primitive = createPrimitive(client);
  submit(primitive, 0, 100);
  await(15000);
  RaftServer joiner = createServer(nextNodeId());
  joiner.addRoleChangeListener(s -> {
    if (s == role) {
      resume();
    }
  });
  if (type == RaftMember.Type.ACTIVE) {
    joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  } else {
    joiner.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  }
  await(15000, 2);
  submit(primitive, 0, 10);
  await(15000);
  Thread.sleep(5000);
}
 
示例5
/**
 * Tests joining a server to an existing cluster.
 */
@Test
public void testCrashRecover() throws Throwable {
  List<RaftServer> servers = createServers(3);
  RaftClient client = createClient();
  TestPrimitive primitive = createPrimitive(client);
  submit(primitive, 0, 100);
  await(30000);
  Thread.sleep(15000);
  servers.get(0).shutdown().get(10, TimeUnit.SECONDS);
  RaftServer server = createServer(members.get(0).memberId());
  server.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  await(30000);
  submit(primitive, 0, 100);
  await(30000);
}
 
示例6
/**
 * Tests a member join event.
 */
private void testJoinEvent(RaftMember.Type type) throws Throwable {
  List<RaftServer> servers = createServers(3);

  RaftMember member = nextMember(type);

  RaftServer server = servers.get(0);
  server.cluster().addListener(event -> {
    if (event.type() == RaftClusterEvent.Type.JOIN) {
      threadAssertEquals(event.subject().memberId(), member.memberId());
      resume();
    }
  });

  RaftServer joiner = createServer(member.memberId());
  if (type == RaftMember.Type.ACTIVE) {
    joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  } else {
    joiner.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  }
  await(15000, 2);
}
 
示例7
/**
 * Tests demoting the leader.
 */
@Test
public void testDemoteLeader() throws Throwable {
  List<RaftServer> servers = createServers(3);

  RaftServer leader = servers.stream()
      .filter(s -> s.cluster().getMember().equals(s.cluster().getLeader()))
      .findFirst()
      .get();

  RaftServer follower = servers.stream()
      .filter(s -> !s.cluster().getMember().equals(s.cluster().getLeader()))
      .findFirst()
      .get();

  follower.cluster().getMember(leader.cluster().getMember().memberId()).addTypeChangeListener(t -> {
    threadAssertEquals(t, RaftMember.Type.PASSIVE);
    resume();
  });
  leader.cluster().getMember().demote(RaftMember.Type.PASSIVE).thenRun(this::resume);
  await(15000, 2);
}
 
示例8
/**
 * Creates a set of Raft servers.
 */
private List<RaftServer> createServers(int nodes) throws Throwable {
  List<RaftServer> servers = new ArrayList<>();

  for (int i = 0; i < nodes; i++) {
    members.add(nextMember(RaftMember.Type.ACTIVE));
  }

  for (int i = 0; i < nodes; i++) {
    RaftServer server = createServer(members.get(i).memberId());
    if (members.get(i).getType() == RaftMember.Type.ACTIVE) {
      server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
    } else {
      server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
    }
    servers.add(server);
  }

  await(30000 * nodes, nodes);

  return servers;
}
 
示例9
/**
 * Creates a set of Raft servers.
 */
private List<RaftServer> createServers(int live, int total) throws Throwable {
  List<RaftServer> servers = new ArrayList<>();

  for (int i = 0; i < total; i++) {
    members.add(nextMember(RaftMember.Type.ACTIVE));
  }

  for (int i = 0; i < live; i++) {
    RaftServer server = createServer(members.get(i).memberId());
    if (members.get(i).getType() == RaftMember.Type.ACTIVE) {
      server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
    } else {
      server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
    }
    servers.add(server);
  }

  await(30000 * live, live);

  return servers;
}
 
示例10
/**
 * Creates a set of Raft servers.
 */
private List<RaftServer> createServers(int nodes) throws Exception {
  List<RaftServer> servers = new ArrayList<>();

  for (int i = 0; i < nodes; i++) {
    members.add(nextMember(RaftMember.Type.ACTIVE));
  }

  CountDownLatch latch = new CountDownLatch(nodes);
  for (int i = 0; i < nodes; i++) {
    RaftServer server = createServer(members.get(i));
    server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(latch::countDown);
    servers.add(server);
  }

  latch.await(30000, TimeUnit.MILLISECONDS);

  return servers;
}
 
示例11
/**
 * Creates a Raft server.
 */
private RaftServer createServer(RaftMember member) {
  RaftServerProtocol protocol;
  if (USE_NETTY) {
    Address address = Address.from(++port);
    MessagingService messagingManager = new NettyMessagingService("test", address, new MessagingConfig()).start().join();
    messagingServices.add(messagingManager);
    addressMap.put(member.memberId(), address);
    protocol = new RaftServerMessagingProtocol(messagingManager, PROTOCOL_SERIALIZER, addressMap::get);
  } else {
    protocol = protocolFactory.newServerProtocol(member.memberId());
  }

  RaftServer.Builder builder = RaftServer.builder(member.memberId())
      .withProtocol(protocol)
      .withStorage(RaftStorage.builder()
          .withStorageLevel(StorageLevel.DISK)
          .withDirectory(new File(String.format("target/fuzz-logs/%s", member.memberId())))
          .withNamespace(STORAGE_NAMESPACE)
          .withMaxSegmentSize(1024 * 1024)
          .build());

  RaftServer server = builder.build();
  servers.add(server);
  return server;
}
 
示例12
/**
 * Creates a Raft client.
 */
private RaftClient createClient() throws Exception {
  MemberId memberId = nextNodeId();

  RaftClientProtocol protocol;
  if (USE_NETTY) {
    Address address = Address.from(++port);
    MessagingService messagingManager = new NettyMessagingService("test", address, new MessagingConfig()).start().join();
    addressMap.put(memberId, address);
    protocol = new RaftClientMessagingProtocol(messagingManager, PROTOCOL_SERIALIZER, addressMap::get);
  } else {
    protocol = protocolFactory.newClientProtocol(memberId);
  }

  RaftClient client = RaftClient.builder()
      .withMemberId(memberId)
      .withProtocol(protocol)
      .build();

  client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
  clients.add(client);
  return client;
}
 
示例13
public ConfigurationResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
  super(status, error);
  this.index = index;
  this.term = term;
  this.timestamp = timestamp;
  this.members = members;
}
 
示例14
public ConfigureRequest(long term, String leader, long index, long timestamp, Collection<RaftMember> members) {
  this.term = term;
  this.leader = leader;
  this.index = index;
  this.timestamp = timestamp;
  this.members = members;
}
 
示例15
@Override
public synchronized CompletableFuture<Void> listen(Collection<MemberId> cluster) {
  if (joinFuture != null) {
    return joinFuture;
  }

  // If no configuration was loaded from disk, create a new configuration.
  if (configuration == null) {
    member.setType(RaftMember.Type.PASSIVE);

    // Create a set of cluster members, excluding the local member which is joining a cluster.
    Set<RaftMember> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.memberId()))
        .map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
        .collect(Collectors.toSet());

    // If the set of members in the cluster is empty when the local member is excluded,
    // fail the join.
    if (activeMembers.isEmpty()) {
      return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
    }

    // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
    // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
    configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
  }
  return join();
}
 
示例16
@Override
public synchronized CompletableFuture<Void> join(Collection<MemberId> cluster) {
  if (joinFuture != null) {
    return joinFuture;
  }

  // If no configuration was loaded from disk, create a new configuration.
  if (configuration == null) {
    member.setType(RaftMember.Type.PROMOTABLE);

    // Create a set of cluster members, excluding the local member which is joining a cluster.
    Set<RaftMember> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.memberId()))
        .map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
        .collect(Collectors.toSet());

    // If the set of members in the cluster is empty when the local member is excluded,
    // fail the join.
    if (activeMembers.isEmpty()) {
      return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
    }

    // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
    // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
    configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
  }

  return join().thenCompose(v -> {
    if (member.getType() == RaftMember.Type.ACTIVE) {
      return CompletableFuture.completedFuture(null);
    } else {
      return member.promote(RaftMember.Type.ACTIVE);
    }
  });
}
 
示例17
/**
 * Updates the member type.
 *
 * @param type The member type.
 * @return The member.
 */
public DefaultRaftMember update(RaftMember.Type type, Instant time) {
  if (this.type != type) {
    this.type = checkNotNull(type, "type cannot be null");
    if (time.isAfter(updated)) {
      this.updated = checkNotNull(time, "time cannot be null");
    }
    if (typeChangeListeners != null) {
      typeChangeListeners.forEach(l -> l.accept(type));
    }
  }
  return this;
}
 
示例18
/**
 * Demotes the server to the given type.
 */
private CompletableFuture<Void> configure(RaftMember.Type type) {
  if (type == this.type) {
    return CompletableFuture.completedFuture(null);
  }
  CompletableFuture<Void> future = new CompletableFuture<>();
  cluster.getContext().getThreadContext().execute(() -> configure(type, future));
  return future;
}
 
示例19
/**
 * Recursively reconfigures the cluster.
 */
private void configure(RaftMember.Type type, CompletableFuture<Void> future) {
  // Set a timer to retry the attempt to leave the cluster.
  configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> {
    configure(type, future);
  });

  // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state.
  // Non-leader states should forward the request to the leader if there is one. Leader states
  // will log, replicate, and commit the reconfiguration.
  cluster.getContext().getRaftRole().onReconfigure(ReconfigureRequest.builder()
      .withIndex(cluster.getConfiguration().index())
      .withTerm(cluster.getConfiguration().term())
      .withMember(new DefaultRaftMember(id, type, updated))
      .build()).whenComplete((response, error) -> {
        if (error == null) {
          if (response.status() == RaftResponse.Status.OK) {
            cancelConfigureTimer();
            cluster.configure(new Configuration(response.index(), response.term(), response.timestamp(), response.members()));
            future.complete(null);
          } else if (response.error() == null
              || response.error().type() == RaftError.Type.UNAVAILABLE
              || response.error().type() == RaftError.Type.PROTOCOL_ERROR
              || response.error().type() == RaftError.Type.NO_LEADER) {
            cancelConfigureTimer();
            configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout().multipliedBy(2), () -> configure(type, future));
          } else {
            cancelConfigureTimer();
            future.completeExceptionally(response.error().createException());
          }
        } else {
          future.completeExceptionally(error);
        }
      });
}
 
示例20
public Configuration(long index, long term, long time, Collection<RaftMember> members) {
  checkArgument(time > 0, "time must be positive");
  checkNotNull(members, "members cannot be null");
  this.index = index;
  this.term = term;
  this.time = time;
  this.members = members;
}
 
示例21
/**
 * Handles a cluster event.
 */
private void handleClusterEvent(ClusterMembershipEvent event) {
  raft.getThreadContext().execute(() -> {
    RaftMember leader = raft.getLeader();
    if (leader != null && event.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED && event.subject().id().equals(leader.memberId())) {
      raft.setLeader(null);
      sendPollRequests();
    }
  });
}
 
示例22
/**
 * Tests a server joining the cluster.
 */
private void testServerJoin(RaftMember.Type type) throws Throwable {
  createServers(3);
  RaftServer server = createServer(nextNodeId());
  if (type == RaftMember.Type.ACTIVE) {
    server.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  } else {
    server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  }
  await(15000);
}
 
示例23
/**
 * Tests joining and leaving the cluster, resizing the quorum.
 */
@Test
public void testResize() throws Throwable {
  RaftServer server = createServers(1).get(0);
  RaftServer joiner = createServer(nextNodeId());
  joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
  await(15000);
  server.leave().thenRun(this::resume);
  await(15000);
  joiner.leave().thenRun(this::resume);
}
 
示例24
private RaftServer recreateServerWithDataLoss(RaftMember leader, RaftMember member, RaftServer server, RaftStorage storage) throws TimeoutException {
  server.shutdown().thenRun(this::resume);
  await(30000);
  deleteStorage(storage);

  final RaftServer newServer = createServer(member.memberId(), b -> b.withStorage(storage));
  newServer.bootstrap(leader.memberId()).thenRun(this::resume);
  await(30000);
  return newServer;
}
 
示例25
private RaftClient createClient(List<RaftMember> members) throws Throwable {
  final MemberId memberId = nextNodeId();
  final List<MemberId> memberIds = members.stream().map(RaftMember::memberId).collect(Collectors.toList());
  final RaftClient client = RaftClient.builder()
          .withMemberId(memberId)
          .withPartitionId(PartitionId.from("test", 1))
          .withProtocol(protocolFactory.newClientProtocol(memberId))
          .build();
  client.connect(memberIds).thenRun(this::resume);
  await(30000);
  clients.add(client);
  return client;
}
 
示例26
/**
 * Schedules the given server to be shutdown for a period of time and then restarted.
 */
private void scheduleRestart(boolean remove, int serverIndex, ThreadContext context) {
  shutdownTimers.put(serverIndex, context.schedule(Duration.ofSeconds(randomNumber(120) + 10), () -> {
    shutdownTimers.remove(serverIndex);
    RaftServer server = servers.get(serverIndex);
    CompletableFuture<Void> leaveFuture;
    if (remove) {
      System.out.println("Removing server: " + server.cluster().getMember().memberId());
      leaveFuture = server.leave();
    } else {
      System.out.println("Shutting down server: " + server.cluster().getMember().memberId());
      leaveFuture = server.shutdown();
    }
    leaveFuture.whenComplete((result, error) -> {
      restartTimers.put(serverIndex, context.schedule(Duration.ofSeconds(randomNumber(120) + 10), () -> {
        restartTimers.remove(serverIndex);
        RaftServer newServer = createServer(server.cluster().getMember());
        servers.set(serverIndex, newServer);
        CompletableFuture<RaftServer> joinFuture;
        if (remove) {
          System.out.println("Adding server: " + newServer.cluster().getMember().memberId());
          joinFuture = newServer.join(members.get(members.size() - 1).memberId());
        } else {
          System.out.println("Bootstrapping server: " + newServer.cluster().getMember().memberId());
          joinFuture = newServer.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()));
        }
        joinFuture.whenComplete((result2, error2) -> {
          scheduleRestarts(context);
        });
      }));
    });
  }));
}
 
示例27
public LeaveResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
  super(status, error, index, term, timestamp, members);
}
 
示例28
public JoinRequest(RaftMember member) {
  super(member);
}
 
示例29
public LeaveRequest(RaftMember member) {
  super(member);
}
 
示例30
public JoinResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
  super(status, error, index, term, timestamp, members);
}