Java源码示例:io.atomix.protocols.raft.cluster.RaftMember
示例1
@Override
public CompletableFuture<Void> bootstrap(Collection<MemberId> cluster) {
if (joinFuture != null) {
return joinFuture;
}
if (configuration == null) {
member.setType(RaftMember.Type.ACTIVE);
// Create a set of active members.
Set<RaftMember> activeMembers = cluster.stream()
.filter(m -> !m.equals(member.memberId()))
.map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
.collect(Collectors.toSet());
// Add the local member to the set of active members.
activeMembers.add(member);
// Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
}
return join();
}
示例2
/**
* Transitions the server to the base state for the given member type.
*/
public void transition(RaftMember.Type type) {
switch (type) {
case ACTIVE:
if (!(role instanceof ActiveRole)) {
transition(RaftServer.Role.FOLLOWER);
}
break;
case PROMOTABLE:
if (this.role.role() != RaftServer.Role.PROMOTABLE) {
transition(RaftServer.Role.PROMOTABLE);
}
break;
case PASSIVE:
if (this.role.role() != RaftServer.Role.PASSIVE) {
transition(RaftServer.Role.PASSIVE);
}
break;
default:
if (this.role.role() != RaftServer.Role.INACTIVE) {
transition(RaftServer.Role.INACTIVE);
}
break;
}
}
示例3
/**
* Commits the given configuration.
*/
protected CompletableFuture<Long> configure(Collection<RaftMember> members) {
raft.checkThread();
final long term = raft.getTerm();
return appendAndCompact(new ConfigurationEntry(term, System.currentTimeMillis(), members))
.thenComposeAsync(entry -> {
// Store the index of the configuration entry in order to prevent other configurations from
// being logged and committed concurrently. This is an important safety property of Raft.
configuring = entry.index();
raft.getCluster().configure(new Configuration(entry.index(), entry.entry().term(), entry.entry().timestamp(), entry.entry().members()));
return appender.appendEntries(entry.index()).whenComplete((commitIndex, commitError) -> {
raft.checkThread();
if (isRunning() && commitError == null) {
raft.getServiceManager().<OperationResult>apply(entry.index());
}
configuring = 0;
});
}, raft.getThreadContext());
}
示例4
/**
* Tests joining a server after many entries have been committed.
*/
private void testServerJoinLate(RaftMember.Type type, RaftServer.Role role) throws Throwable {
createServers(3);
RaftClient client = createClient();
TestPrimitive primitive = createPrimitive(client);
submit(primitive, 0, 100);
await(15000);
RaftServer joiner = createServer(nextNodeId());
joiner.addRoleChangeListener(s -> {
if (s == role) {
resume();
}
});
if (type == RaftMember.Type.ACTIVE) {
joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
} else {
joiner.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}
await(15000, 2);
submit(primitive, 0, 10);
await(15000);
Thread.sleep(5000);
}
示例5
/**
* Tests joining a server to an existing cluster.
*/
@Test
public void testCrashRecover() throws Throwable {
List<RaftServer> servers = createServers(3);
RaftClient client = createClient();
TestPrimitive primitive = createPrimitive(client);
submit(primitive, 0, 100);
await(30000);
Thread.sleep(15000);
servers.get(0).shutdown().get(10, TimeUnit.SECONDS);
RaftServer server = createServer(members.get(0).memberId());
server.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
await(30000);
submit(primitive, 0, 100);
await(30000);
}
示例6
/**
* Tests a member join event.
*/
private void testJoinEvent(RaftMember.Type type) throws Throwable {
List<RaftServer> servers = createServers(3);
RaftMember member = nextMember(type);
RaftServer server = servers.get(0);
server.cluster().addListener(event -> {
if (event.type() == RaftClusterEvent.Type.JOIN) {
threadAssertEquals(event.subject().memberId(), member.memberId());
resume();
}
});
RaftServer joiner = createServer(member.memberId());
if (type == RaftMember.Type.ACTIVE) {
joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
} else {
joiner.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}
await(15000, 2);
}
示例7
/**
* Tests demoting the leader.
*/
@Test
public void testDemoteLeader() throws Throwable {
List<RaftServer> servers = createServers(3);
RaftServer leader = servers.stream()
.filter(s -> s.cluster().getMember().equals(s.cluster().getLeader()))
.findFirst()
.get();
RaftServer follower = servers.stream()
.filter(s -> !s.cluster().getMember().equals(s.cluster().getLeader()))
.findFirst()
.get();
follower.cluster().getMember(leader.cluster().getMember().memberId()).addTypeChangeListener(t -> {
threadAssertEquals(t, RaftMember.Type.PASSIVE);
resume();
});
leader.cluster().getMember().demote(RaftMember.Type.PASSIVE).thenRun(this::resume);
await(15000, 2);
}
示例8
/**
* Creates a set of Raft servers.
*/
private List<RaftServer> createServers(int nodes) throws Throwable {
List<RaftServer> servers = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
members.add(nextMember(RaftMember.Type.ACTIVE));
}
for (int i = 0; i < nodes; i++) {
RaftServer server = createServer(members.get(i).memberId());
if (members.get(i).getType() == RaftMember.Type.ACTIVE) {
server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
} else {
server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}
servers.add(server);
}
await(30000 * nodes, nodes);
return servers;
}
示例9
/**
* Creates a set of Raft servers.
*/
private List<RaftServer> createServers(int live, int total) throws Throwable {
List<RaftServer> servers = new ArrayList<>();
for (int i = 0; i < total; i++) {
members.add(nextMember(RaftMember.Type.ACTIVE));
}
for (int i = 0; i < live; i++) {
RaftServer server = createServer(members.get(i).memberId());
if (members.get(i).getType() == RaftMember.Type.ACTIVE) {
server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
} else {
server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}
servers.add(server);
}
await(30000 * live, live);
return servers;
}
示例10
/**
* Creates a set of Raft servers.
*/
private List<RaftServer> createServers(int nodes) throws Exception {
List<RaftServer> servers = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
members.add(nextMember(RaftMember.Type.ACTIVE));
}
CountDownLatch latch = new CountDownLatch(nodes);
for (int i = 0; i < nodes; i++) {
RaftServer server = createServer(members.get(i));
server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(latch::countDown);
servers.add(server);
}
latch.await(30000, TimeUnit.MILLISECONDS);
return servers;
}
示例11
/**
* Creates a Raft server.
*/
private RaftServer createServer(RaftMember member) {
RaftServerProtocol protocol;
if (USE_NETTY) {
Address address = Address.from(++port);
MessagingService messagingManager = new NettyMessagingService("test", address, new MessagingConfig()).start().join();
messagingServices.add(messagingManager);
addressMap.put(member.memberId(), address);
protocol = new RaftServerMessagingProtocol(messagingManager, PROTOCOL_SERIALIZER, addressMap::get);
} else {
protocol = protocolFactory.newServerProtocol(member.memberId());
}
RaftServer.Builder builder = RaftServer.builder(member.memberId())
.withProtocol(protocol)
.withStorage(RaftStorage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(new File(String.format("target/fuzz-logs/%s", member.memberId())))
.withNamespace(STORAGE_NAMESPACE)
.withMaxSegmentSize(1024 * 1024)
.build());
RaftServer server = builder.build();
servers.add(server);
return server;
}
示例12
/**
* Creates a Raft client.
*/
private RaftClient createClient() throws Exception {
MemberId memberId = nextNodeId();
RaftClientProtocol protocol;
if (USE_NETTY) {
Address address = Address.from(++port);
MessagingService messagingManager = new NettyMessagingService("test", address, new MessagingConfig()).start().join();
addressMap.put(memberId, address);
protocol = new RaftClientMessagingProtocol(messagingManager, PROTOCOL_SERIALIZER, addressMap::get);
} else {
protocol = protocolFactory.newClientProtocol(memberId);
}
RaftClient client = RaftClient.builder()
.withMemberId(memberId)
.withProtocol(protocol)
.build();
client.connect(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).join();
clients.add(client);
return client;
}
示例13
public ConfigurationResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
super(status, error);
this.index = index;
this.term = term;
this.timestamp = timestamp;
this.members = members;
}
示例14
public ConfigureRequest(long term, String leader, long index, long timestamp, Collection<RaftMember> members) {
this.term = term;
this.leader = leader;
this.index = index;
this.timestamp = timestamp;
this.members = members;
}
示例15
@Override
public synchronized CompletableFuture<Void> listen(Collection<MemberId> cluster) {
if (joinFuture != null) {
return joinFuture;
}
// If no configuration was loaded from disk, create a new configuration.
if (configuration == null) {
member.setType(RaftMember.Type.PASSIVE);
// Create a set of cluster members, excluding the local member which is joining a cluster.
Set<RaftMember> activeMembers = cluster.stream()
.filter(m -> !m.equals(member.memberId()))
.map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
.collect(Collectors.toSet());
// If the set of members in the cluster is empty when the local member is excluded,
// fail the join.
if (activeMembers.isEmpty()) {
return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
}
// Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
// will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
}
return join();
}
示例16
@Override
public synchronized CompletableFuture<Void> join(Collection<MemberId> cluster) {
if (joinFuture != null) {
return joinFuture;
}
// If no configuration was loaded from disk, create a new configuration.
if (configuration == null) {
member.setType(RaftMember.Type.PROMOTABLE);
// Create a set of cluster members, excluding the local member which is joining a cluster.
Set<RaftMember> activeMembers = cluster.stream()
.filter(m -> !m.equals(member.memberId()))
.map(m -> new DefaultRaftMember(m, RaftMember.Type.ACTIVE, member.getLastUpdated()))
.collect(Collectors.toSet());
// If the set of members in the cluster is empty when the local member is excluded,
// fail the join.
if (activeMembers.isEmpty()) {
return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
}
// Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
// will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
configure(new Configuration(0, 0, member.getLastUpdated().toEpochMilli(), activeMembers));
}
return join().thenCompose(v -> {
if (member.getType() == RaftMember.Type.ACTIVE) {
return CompletableFuture.completedFuture(null);
} else {
return member.promote(RaftMember.Type.ACTIVE);
}
});
}
示例17
/**
* Updates the member type.
*
* @param type The member type.
* @return The member.
*/
public DefaultRaftMember update(RaftMember.Type type, Instant time) {
if (this.type != type) {
this.type = checkNotNull(type, "type cannot be null");
if (time.isAfter(updated)) {
this.updated = checkNotNull(time, "time cannot be null");
}
if (typeChangeListeners != null) {
typeChangeListeners.forEach(l -> l.accept(type));
}
}
return this;
}
示例18
/**
* Demotes the server to the given type.
*/
private CompletableFuture<Void> configure(RaftMember.Type type) {
if (type == this.type) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
cluster.getContext().getThreadContext().execute(() -> configure(type, future));
return future;
}
示例19
/**
* Recursively reconfigures the cluster.
*/
private void configure(RaftMember.Type type, CompletableFuture<Void> future) {
// Set a timer to retry the attempt to leave the cluster.
configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> {
configure(type, future);
});
// Attempt to leave the cluster by submitting a LeaveRequest directly to the server state.
// Non-leader states should forward the request to the leader if there is one. Leader states
// will log, replicate, and commit the reconfiguration.
cluster.getContext().getRaftRole().onReconfigure(ReconfigureRequest.builder()
.withIndex(cluster.getConfiguration().index())
.withTerm(cluster.getConfiguration().term())
.withMember(new DefaultRaftMember(id, type, updated))
.build()).whenComplete((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
cancelConfigureTimer();
cluster.configure(new Configuration(response.index(), response.term(), response.timestamp(), response.members()));
future.complete(null);
} else if (response.error() == null
|| response.error().type() == RaftError.Type.UNAVAILABLE
|| response.error().type() == RaftError.Type.PROTOCOL_ERROR
|| response.error().type() == RaftError.Type.NO_LEADER) {
cancelConfigureTimer();
configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout().multipliedBy(2), () -> configure(type, future));
} else {
cancelConfigureTimer();
future.completeExceptionally(response.error().createException());
}
} else {
future.completeExceptionally(error);
}
});
}
示例20
public Configuration(long index, long term, long time, Collection<RaftMember> members) {
checkArgument(time > 0, "time must be positive");
checkNotNull(members, "members cannot be null");
this.index = index;
this.term = term;
this.time = time;
this.members = members;
}
示例21
/**
* Handles a cluster event.
*/
private void handleClusterEvent(ClusterMembershipEvent event) {
raft.getThreadContext().execute(() -> {
RaftMember leader = raft.getLeader();
if (leader != null && event.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED && event.subject().id().equals(leader.memberId())) {
raft.setLeader(null);
sendPollRequests();
}
});
}
示例22
/**
* Tests a server joining the cluster.
*/
private void testServerJoin(RaftMember.Type type) throws Throwable {
createServers(3);
RaftServer server = createServer(nextNodeId());
if (type == RaftMember.Type.ACTIVE) {
server.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
} else {
server.listen(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}
await(15000);
}
示例23
/**
* Tests joining and leaving the cluster, resizing the quorum.
*/
@Test
public void testResize() throws Throwable {
RaftServer server = createServers(1).get(0);
RaftServer joiner = createServer(nextNodeId());
joiner.join(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
await(15000);
server.leave().thenRun(this::resume);
await(15000);
joiner.leave().thenRun(this::resume);
}
示例24
private RaftServer recreateServerWithDataLoss(RaftMember leader, RaftMember member, RaftServer server, RaftStorage storage) throws TimeoutException {
server.shutdown().thenRun(this::resume);
await(30000);
deleteStorage(storage);
final RaftServer newServer = createServer(member.memberId(), b -> b.withStorage(storage));
newServer.bootstrap(leader.memberId()).thenRun(this::resume);
await(30000);
return newServer;
}
示例25
private RaftClient createClient(List<RaftMember> members) throws Throwable {
final MemberId memberId = nextNodeId();
final List<MemberId> memberIds = members.stream().map(RaftMember::memberId).collect(Collectors.toList());
final RaftClient client = RaftClient.builder()
.withMemberId(memberId)
.withPartitionId(PartitionId.from("test", 1))
.withProtocol(protocolFactory.newClientProtocol(memberId))
.build();
client.connect(memberIds).thenRun(this::resume);
await(30000);
clients.add(client);
return client;
}
示例26
/**
* Schedules the given server to be shutdown for a period of time and then restarted.
*/
private void scheduleRestart(boolean remove, int serverIndex, ThreadContext context) {
shutdownTimers.put(serverIndex, context.schedule(Duration.ofSeconds(randomNumber(120) + 10), () -> {
shutdownTimers.remove(serverIndex);
RaftServer server = servers.get(serverIndex);
CompletableFuture<Void> leaveFuture;
if (remove) {
System.out.println("Removing server: " + server.cluster().getMember().memberId());
leaveFuture = server.leave();
} else {
System.out.println("Shutting down server: " + server.cluster().getMember().memberId());
leaveFuture = server.shutdown();
}
leaveFuture.whenComplete((result, error) -> {
restartTimers.put(serverIndex, context.schedule(Duration.ofSeconds(randomNumber(120) + 10), () -> {
restartTimers.remove(serverIndex);
RaftServer newServer = createServer(server.cluster().getMember());
servers.set(serverIndex, newServer);
CompletableFuture<RaftServer> joinFuture;
if (remove) {
System.out.println("Adding server: " + newServer.cluster().getMember().memberId());
joinFuture = newServer.join(members.get(members.size() - 1).memberId());
} else {
System.out.println("Bootstrapping server: " + newServer.cluster().getMember().memberId());
joinFuture = newServer.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList()));
}
joinFuture.whenComplete((result2, error2) -> {
scheduleRestarts(context);
});
}));
});
}));
}
示例27
public LeaveResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
super(status, error, index, term, timestamp, members);
}
示例28
public JoinRequest(RaftMember member) {
super(member);
}
示例29
public LeaveRequest(RaftMember member) {
super(member);
}
示例30
public JoinResponse(Status status, RaftError error, long index, long term, long timestamp, Collection<RaftMember> members) {
super(status, error, index, term, timestamp, members);
}