本文共 18641 字,大约阅读时间需要 62 分钟。
Copycat server之间的configure是如何,何时被同步的?
1. 显式的调用LeaderState.configure
/** * Commits the given configuration. */ protected CompletableFutureconfigure(Collection members) { final long index; try (ConfigurationEntry entry = context.getLog().create(ConfigurationEntry.class)) { entry.setTerm(context.getTerm()) .setTimestamp(System.currentTimeMillis()) .setMembers(members); index = context.getLog().append(entry); //先把configuration写入local log LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry); // Store the index of the configuration entry in order to prevent other configurations from // being logged and committed concurrently. This is an important safety property of Raft. configuring = index; //configuring用于互斥 context.getClusterState().configure(new Configuration(entry.getIndex(), entry.getTerm(), entry.getTimestamp(), entry.getMembers())); //更新ClusterState } return appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //调用appendEntries同步configuration context.checkThread(); if (isOpen()) { // Reset the configuration index to allow new configuration changes to be committed. configuring = 0; //configuring完成 } }); }
private void appendInitialEntries() {
// Append a configuration entry to propagate the leader's cluster configuration. configure(context.getCluster().members());
public CompletableFuturejoin(final JoinRequest request) {
configure(members).whenComplete((index, error) -> {
@Override public CompletableFuturereconfigure(final ReconfigureRequest request) {
configure(members).whenComplete((index, error) -> {
@Override public CompletableFutureleave(final LeaveRequest request) {
configure(members).whenComplete((index, error) -> {
2. 在发送hb,和appendEntries时,也会自动发生同步
LeaderAppender.appendEntries(MemberState member)
// If the member term is less than the current term or the member's configuration index is less // than the local configuration index, send a configuration update to the member. // Ensure that only one configuration attempt per member is attempted at any given time by storing the // member state in a set of configuring members. // Once the configuration is complete sendAppendRequest will be called recursively. else if (member.getConfigTerm() < context.getTerm() || member.getConfigIndex() < context.getClusterState().getConfiguration().index()) { if (member.canConfigure()) { sendConfigureRequest(member, buildConfigureRequest(member)); } }
/** * Connects to the member and sends a configure request. */ protected void sendConfigureRequest(MemberState member, ConfigureRequest request) { // Start the configure to the member. member.startConfigure(); context.getConnections().getConnection(member.getMember().serverAddress()).whenComplete((connection, error) -> { context.checkThread(); if (open) { if (error == null) { sendConfigureRequest(connection, member, request); } else { // Complete the configure to the member. member.completeConfigure(); // 将configuring设置成false,表示configuring结束 // Trigger reactions to the request failure. handleConfigureRequestFailure(member, request, error); } } }); } /** * Sends a configuration message. */ protected void sendConfigureRequest(Connection connection, MemberState member, ConfigureRequest request) { LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().address(), request, member.getMember().serverAddress()); connection.send(request).whenComplete((response, error) -> { context.checkThread(); // Complete the configure to the member. member.completeConfigure(); if (open) { if (error == null) { LOGGER.debug("{} - Received {} from {}", context.getCluster().member().address(), response, member.getMember().serverAddress()); handleConfigureResponse(member, request, response); } else { LOGGER.warn("{} - Failed to configure {}", context.getCluster().member().address(), member.getMember().serverAddress()); handleConfigureResponseFailure(member, request, error); } } }); }
@Override protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) { // Trigger commit futures if necessary. updateHeartbeatTime(member, null); // 判断是否大部分member都已经完成configuration同步 super.handleConfigureResponse(member, request, response); }
/** * Handles a configuration response. */ protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) { if (response.status() == Response.Status.OK) { handleConfigureResponseOk(member, request, response); } else { handleConfigureResponseError(member, request, response); } }
/** * Handles an OK configuration response. */ @SuppressWarnings("unused") protected void handleConfigureResponseOk(MemberState member, ConfigureRequest request, ConfigureResponse response) { // Reset the member failure count and update the member's status if necessary. succeedAttempt(member); // Update the member's current configuration term and index according to the installed configuration. member.setConfigTerm(request.term()).setConfigIndex(request.index()); // Recursively append entries to the member. appendEntries(member); }
public void connectServer(Connection connection) {
connection.handler(ConfigureRequest.class, request -> state.configure(request));
@Override public CompletableFutureconfigure(ConfigureRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); // 更新leader term Configuration configuration = new Configuration(request.index(), request.term(), request.timestamp(), request.members()); // 根据request创建Configuration // Configure the cluster membership. This will cause this server to transition to the // appropriate state if its type has changed. context.getClusterState().configure(configuration); // 调用ClusterState.configure // If the configuration is already committed, commit it to disk. // Check against the actual cluster Configuration rather than the received configuration in // case the received configuration was an older configuration that was not applied. if (context.getCommitIndex() >= context.getClusterState().getConfiguration().index()) { context.getClusterState().commit(); // 调用commit } return CompletableFuture.completedFuture(logResponse(ConfigureResponse.builder() .withStatus(Response.Status.OK) // 返回 .build())); }
public CompletableFutureconfigure(ConfigureRequest request) { CompletableFuture future = super.configure(request); resetHeartbeatTimeout(); return future; }
final class ClusterState implements Cluster, AutoCloseable { private final ServerContext context; private final ServerMember member; private volatile Configuration configuration;
/** * Configures the cluster state. * * @param configuration The cluster configuration. * @return The cluster state. */ ClusterState configure(Configuration configuration) { // If the configuration index is less than the currently configured index, ignore it. // Configurations can be persisted and applying old configurations can revert newer configurations. if (this.configuration != null && configuration.index() <= this.configuration.index()) //如果是老的configuration,丢弃 return this; Instant time = Instant.ofEpochMilli(configuration.time()); // Iterate through members in the new configuration, add any missing members, and update existing members. boolean transition = false; for (Member member : configuration.members()) { if (member.equals(this.member)) { //如果有我的配置变更 transition = this.member.type().ordinal() < member.type().ordinal(); //看下type是否promote,只有promote才要transition,不让demote this.member.update(member.type(), time).update(member.clientAddress(), time); members.add(this.member); } else { //如果是更新其他member的配置 // If the member state doesn't already exist, create it. MemberState state = membersMap.get(member.id()); if (state == null) { //如果是新member,初始化 state = new MemberState(new ServerMember(member.type(), member.serverAddress(), member.clientAddress(), time), this); state.resetState(context.getLog()); this.members.add(state.getMember()); this.remoteMembers.add(state); membersMap.put(member.id(), state); addressMap.put(member.address(), state); joinListeners.accept(state.getMember()); } // If the member type has changed, update the member type and reset its state. state.getMember().update(member.clientAddress(), time); if (state.getMember().type() != member.type()) { //如果member的type发生了变化,更新数据 state.getMember().update(member.type(), time); state.resetState(context.getLog()); } // If the member status has changed, update the local member status. if (state.getMember().status() != member.status()) { //如果status发送变化,更新 state.getMember().update(member.status(), time); } // Update the optimized member collections according to the member type. for (ListmemberType : memberTypes.values()) { memberType.remove(state); } List memberType = memberTypes.get(member.type()); if (memberType == null) { memberType = new CopyOnWriteArrayList<>(); memberTypes.put(member.type(), memberType); } memberType.add(state); } } // Transition the local member only if the member is being promoted and not demoted. // Configuration changes that demote the local member are only applied to the local server // upon commitment. This ensures that e.g. a leader that's removing itself from the quorum // can commit the configuration change prior to shutting down. if (transition) { //做state transition context.transition(this.member.type()); } // Iterate through configured members and remove any that no longer exist in the configuration. int i = 0; while (i < this.remoteMembers.size()) { MemberState member = this.remoteMembers.get(i); if (!configuration.members().contains(member.getMember())) { this.members.remove(member.getMember()); this.remoteMembers.remove(i); for (List memberType : memberTypes.values()) { memberType.remove(member); } membersMap.remove(member.getMember().id()); addressMap.remove(member.getMember().address()); leaveListeners.accept(member.getMember()); } else { i++; } } // If the local member was removed from the cluster, remove it from the members list. if (!configuration.members().contains(member)) { members.remove(member); } this.configuration = configuration; //更新configuration对象 // Store the configuration if it's already committed. if (context.getCommitIndex() >= configuration.index()) { //如果这个configuration已经被commit,store到存储上 context.getMetaStore().storeConfiguration(configuration); } // Reassign members based on availability. reassign(); //更新passive member的assignment,因为member变了,所以follower所对应的passive可能需要重新分配 return this; }
/** * Commit the current configuration to disk. * * @return The cluster state. */ ClusterState commit() { // Apply the configuration to the local server state. context.transition(member.type()); if (!configuration.members().contains(member) && leaveFuture != null) { leaveFuture.complete(null); } // If the local stored configuration is older than the committed configuration, overwrite it. if (context.getMetaStore().loadConfiguration().index() < configuration.index()) { context.getMetaStore().storeConfiguration(configuration); } return this; }
@Override public CompletableFuturepromote() { return configure(Type.values()[type.ordinal() + 1]); }
/** * Demotes the server to the given type. */ CompletableFutureconfigure(Member.Type type) { CompletableFuture future = new CompletableFuture<>(); cluster.getContext().getThreadContext().executor().execute(() -> configure(type, future)); return future; }
/** * Recursively reconfigures the cluster. */ private void configure(Member.Type type, CompletableFuturefuture) { // Set a timer to retry the attempt to leave the cluster. configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> { configure(type, future); //设定schedule,反复重试 }); // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state. // Non-leader states should forward the request to the leader if there is one. Leader states // will log, replicate, and commit the reconfiguration. cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() .withIndex(cluster.getConfiguration().index()) .withTerm(cluster.getConfiguration().term()) .withMember(new ServerMember(type, serverAddress(), clientAddress(), updated)) .build()).whenComplete((response, error) -> { if (error == null) { if (response.status() == Response.Status.OK) { cancelConfigureTimer(); //如果成功就cancel掉schedule cluster.configure(new Configuration(response.index(), response.term(), response.timestamp(), response.members())); //更新clusterState中的configuration配置 future.complete(null); } else if (response.error() == null || response.error() == CopycatError.Type.NO_LEADER_ERROR) { cancelConfigureTimer(); configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout().multipliedBy(2), () -> configure(type, future)); } else { cancelConfigureTimer(); future.completeExceptionally(response.error().createException()); } } }); }
public void connectServer(Connection connection) {
connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request));
@Override public CompletableFuturereconfigure(ReconfigureRequest request) { context.checkThread(); logRequest(request); if (context.getLeader() == null) { return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build())); } else { return this. forward(request) .exceptionally(error -> ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build()) .thenApply(this::logResponse); } }
@Override public CompletableFuturereconfigure(final ReconfigureRequest request) { // If another configuration change is already under way, reject the configuration. // If the leader index is 0 or is greater than the commitIndex, reject the promote requests. // Configuration changes should not be allowed until the leader has committed a no-op entry. // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E if (configuring() || initializing()) { //如果正在configure或leader初始化,不能做配置变更 return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .build())); } // If the configuration request index is less than the last known configuration index for // the leader, fail the request to ensure servers can't reconfigure an old configuration. if (request.index() > 0 && request.index() < context.getClusterState().getConfiguration().index() || request.term() != context.getClusterState().getConfiguration().term() && (existingMember.type() != request.member().type() || existingMember.status() != request.member().status())) { return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.CONFIGURATION_ERROR) .build())); } Member member = request.member(); // If the client address is being set or has changed, update the configuration. if (member.clientAddress() != null && (existingMember.clientAddress() == null || !existingMember.clientAddress().equals(member.clientAddress()))) { existingMember.update(member.clientAddress(), Instant.now()); } // Update the member type. existingMember.update(request.member().type(), Instant.now()); Collection members = context.getCluster().members(); CompletableFuture future = new CompletableFuture<>(); configure(members).whenComplete((index, error) -> { //调用configure context.checkThread(); if (isOpen()) { if (error == null) { //如果成功,更新local配置 future.complete(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.OK) .withIndex(index) .withTerm(context.getClusterState().getConfiguration().term()) .withTime(context.getClusterState().getConfiguration().time()) .withMembers(members) .build())); } else { future.complete(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.INTERNAL_ERROR) .build())); } } }); return future; }