博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Copycat - configure
阅读量:6882 次
发布时间:2019-06-27

本文共 18641 字,大约阅读时间需要 62 分钟。

Copycat server之间的configure是如何,何时被同步的?

 

大家可以看到,只有leader可以同步配置

 

1. 显式的调用LeaderState.configure

LeaderState.configure

/**   * Commits the given configuration.   */  protected CompletableFuture
configure(Collection
members) { final long index; try (ConfigurationEntry entry = context.getLog().create(ConfigurationEntry.class)) { entry.setTerm(context.getTerm()) .setTimestamp(System.currentTimeMillis()) .setMembers(members); index = context.getLog().append(entry); //先把configuration写入local log LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry); // Store the index of the configuration entry in order to prevent other configurations from // being logged and committed concurrently. This is an important safety property of Raft. configuring = index; //configuring用于互斥 context.getClusterState().configure(new Configuration(entry.getIndex(), entry.getTerm(), entry.getTimestamp(), entry.getMembers())); //更新ClusterState } return appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //调用appendEntries同步configuration context.checkThread(); if (isOpen()) { // Reset the configuration index to allow new configuration changes to be committed. configuring = 0; //configuring完成 } }); }

何处显式调用?

private void appendInitialEntries() {
// Append a configuration entry to propagate the leader's cluster configuration. configure(context.getCluster().members());

 

public CompletableFuture
join(final JoinRequest request) {
configure(members).whenComplete((index, error) -> {

 

@Override public CompletableFuture
reconfigure(final ReconfigureRequest request) {
configure(members).whenComplete((index, error) -> {

 

@Override public CompletableFuture
leave(final LeaveRequest request) {
configure(members).whenComplete((index, error) -> {

 

2. 在发送hb,和appendEntries时,也会自动发生同步

因为上面调用configure也是会调用到

LeaderAppender.appendEntries(MemberState member)

和普通append不同的是,会走到这个逻辑,

// If the member term is less than the current term or the member's configuration index is less    // than the local configuration index, send a configuration update to the member.    // Ensure that only one configuration attempt per member is attempted at any given time by storing the    // member state in a set of configuring members.    // Once the configuration is complete sendAppendRequest will be called recursively.    else if (member.getConfigTerm() < context.getTerm() || member.getConfigIndex() < context.getClusterState().getConfiguration().index()) {      if (member.canConfigure()) {        sendConfigureRequest(member, buildConfigureRequest(member));      }    }

注意虽然是在appendEntries里面,

这里发出的是sendConfigureRequest,而不是appendRequest

因为leader的configuration发生变化,所以member.getConfigIndex一定是小的,所以要更新

AbstractAppender

/**   * Connects to the member and sends a configure request.   */  protected void sendConfigureRequest(MemberState member, ConfigureRequest request) {    // Start the configure to the member.    member.startConfigure();    context.getConnections().getConnection(member.getMember().serverAddress()).whenComplete((connection, error) -> {      context.checkThread();      if (open) {        if (error == null) {          sendConfigureRequest(connection, member, request);        } else {          // Complete the configure to the member.          member.completeConfigure(); // 将configuring设置成false,表示configuring结束          // Trigger reactions to the request failure.          handleConfigureRequestFailure(member, request, error);        }      }    });  }  /**   * Sends a configuration message.   */  protected void sendConfigureRequest(Connection connection, MemberState member, ConfigureRequest request) {    LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().address(), request, member.getMember().serverAddress());    connection.
send(request).whenComplete((response, error) -> { context.checkThread(); // Complete the configure to the member. member.completeConfigure(); if (open) { if (error == null) { LOGGER.debug("{} - Received {} from {}", context.getCluster().member().address(), response, member.getMember().serverAddress()); handleConfigureResponse(member, request, response); } else { LOGGER.warn("{} - Failed to configure {}", context.getCluster().member().address(), member.getMember().serverAddress()); handleConfigureResponseFailure(member, request, error); } } }); }

 

LeaderAppender

@Override  protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) {    // Trigger commit futures if necessary.    updateHeartbeatTime(member, null); // 判断是否大部分member都已经完成configuration同步    super.handleConfigureResponse(member, request, response);  }

 

AbstractAppender

 

/**   * Handles a configuration response.   */  protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) {    if (response.status() == Response.Status.OK) {      handleConfigureResponseOk(member, request, response);    } else {      handleConfigureResponseError(member, request, response);    }  }

 

/**   * Handles an OK configuration response.   */  @SuppressWarnings("unused")  protected void handleConfigureResponseOk(MemberState member, ConfigureRequest request, ConfigureResponse response) {    // Reset the member failure count and update the member's status if necessary.    succeedAttempt(member);    // Update the member's current configuration term and index according to the installed configuration.    member.setConfigTerm(request.term()).setConfigIndex(request.index());    // Recursively append entries to the member.    appendEntries(member);  }

 

在server上收到ConfigureRequest

ServerContext

public void connectServer(Connection connection) {
connection.handler(ConfigureRequest.class, request -> state.configure(request));
在connectServer中,说明configuration只能在server间调用,client是不能调用的
 

configure,只在两个state中有实现

InactiveState
@Override  public CompletableFuture
configure(ConfigureRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); // 更新leader term Configuration configuration = new Configuration(request.index(), request.term(), request.timestamp(), request.members()); // 根据request创建Configuration // Configure the cluster membership. This will cause this server to transition to the // appropriate state if its type has changed. context.getClusterState().configure(configuration); // 调用ClusterState.configure // If the configuration is already committed, commit it to disk. // Check against the actual cluster Configuration rather than the received configuration in // case the received configuration was an older configuration that was not applied. if (context.getCommitIndex() >= context.getClusterState().getConfiguration().index()) { context.getClusterState().commit(); // 调用commit } return CompletableFuture.completedFuture(logResponse(ConfigureResponse.builder() .withStatus(Response.Status.OK) // 返回 .build())); }

 

FollowerState
public CompletableFuture
configure(ConfigureRequest request) { CompletableFuture
future = super.configure(request); resetHeartbeatTimeout(); return future; }

可以看到Follower里面只是多了resetHB

 

继续,

可以看到首先是ClusterState.configure

ClusterState中保存了当前server,所知道cluster的所有信息,其中包含Configuration对象

final class ClusterState implements Cluster, AutoCloseable {  private final ServerContext context;  private final ServerMember member;  private volatile Configuration configuration;

configure

/**   * Configures the cluster state.   *   * @param configuration The cluster configuration.   * @return The cluster state.   */  ClusterState configure(Configuration configuration) {    // If the configuration index is less than the currently configured index, ignore it.    // Configurations can be persisted and applying old configurations can revert newer configurations.    if (this.configuration != null && configuration.index() <= this.configuration.index()) //如果是老的configuration,丢弃      return this;    Instant time = Instant.ofEpochMilli(configuration.time());    // Iterate through members in the new configuration, add any missing members, and update existing members.    boolean transition = false;    for (Member member : configuration.members()) {      if (member.equals(this.member)) { //如果有我的配置变更        transition = this.member.type().ordinal() < member.type().ordinal(); //看下type是否promote,只有promote才要transition,不让demote        this.member.update(member.type(), time).update(member.clientAddress(), time);        members.add(this.member);      } else { //如果是更新其他member的配置        // If the member state doesn't already exist, create it.        MemberState state = membersMap.get(member.id());        if (state == null) { //如果是新member,初始化          state = new MemberState(new ServerMember(member.type(), member.serverAddress(), member.clientAddress(), time), this);          state.resetState(context.getLog());          this.members.add(state.getMember());          this.remoteMembers.add(state);          membersMap.put(member.id(), state);          addressMap.put(member.address(), state);          joinListeners.accept(state.getMember());        }        // If the member type has changed, update the member type and reset its state.        state.getMember().update(member.clientAddress(), time);        if (state.getMember().type() != member.type()) { //如果member的type发生了变化,更新数据          state.getMember().update(member.type(), time);          state.resetState(context.getLog());        }        // If the member status has changed, update the local member status.        if (state.getMember().status() != member.status()) { //如果status发送变化,更新          state.getMember().update(member.status(), time);        }        // Update the optimized member collections according to the member type.        for (List
memberType : memberTypes.values()) { memberType.remove(state); } List
memberType = memberTypes.get(member.type()); if (memberType == null) { memberType = new CopyOnWriteArrayList<>(); memberTypes.put(member.type(), memberType); } memberType.add(state); } } // Transition the local member only if the member is being promoted and not demoted. // Configuration changes that demote the local member are only applied to the local server // upon commitment. This ensures that e.g. a leader that's removing itself from the quorum // can commit the configuration change prior to shutting down. if (transition) { //做state transition context.transition(this.member.type()); } // Iterate through configured members and remove any that no longer exist in the configuration. int i = 0; while (i < this.remoteMembers.size()) { MemberState member = this.remoteMembers.get(i); if (!configuration.members().contains(member.getMember())) { this.members.remove(member.getMember()); this.remoteMembers.remove(i); for (List
memberType : memberTypes.values()) { memberType.remove(member); } membersMap.remove(member.getMember().id()); addressMap.remove(member.getMember().address()); leaveListeners.accept(member.getMember()); } else { i++; } } // If the local member was removed from the cluster, remove it from the members list. if (!configuration.members().contains(member)) { members.remove(member); } this.configuration = configuration; //更新configuration对象 // Store the configuration if it's already committed. if (context.getCommitIndex() >= configuration.index()) { //如果这个configuration已经被commit,store到存储上 context.getMetaStore().storeConfiguration(configuration); } // Reassign members based on availability. reassign(); //更新passive member的assignment,因为member变了,所以follower所对应的passive可能需要重新分配 return this; }

 

ClusterState.commit

/**   * Commit the current configuration to disk.   *   * @return The cluster state.   */  ClusterState commit() {    // Apply the configuration to the local server state.    context.transition(member.type());    if (!configuration.members().contains(member) && leaveFuture != null) {      leaveFuture.complete(null);    }    // If the local stored configuration is older than the committed configuration, overwrite it.    if (context.getMetaStore().loadConfiguration().index() < configuration.index()) {      context.getMetaStore().storeConfiguration(configuration);    }    return this;  }

逻辑,就是把configuration存入本地盘

 

 

reconfigure

在什么地方被调用,

 

ServerMember

@Override  public CompletableFuture
promote() { return configure(Type.values()[type.ordinal() + 1]); }

可以用reconfigure来更新配置,比如promote,demote

 

/**   * Demotes the server to the given type.   */  CompletableFuture
configure(Member.Type type) { CompletableFuture
future = new CompletableFuture<>(); cluster.getContext().getThreadContext().executor().execute(() -> configure(type, future)); return future; }

 

/**   * Recursively reconfigures the cluster.   */  private void configure(Member.Type type, CompletableFuture
future) { // Set a timer to retry the attempt to leave the cluster. configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> { configure(type, future); //设定schedule,反复重试 }); // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state. // Non-leader states should forward the request to the leader if there is one. Leader states // will log, replicate, and commit the reconfiguration. cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() .withIndex(cluster.getConfiguration().index()) .withTerm(cluster.getConfiguration().term()) .withMember(new ServerMember(type, serverAddress(), clientAddress(), updated)) .build()).whenComplete((response, error) -> { if (error == null) { if (response.status() == Response.Status.OK) { cancelConfigureTimer(); //如果成功就cancel掉schedule cluster.configure(new Configuration(response.index(), response.term(), response.timestamp(), response.members())); //更新clusterState中的configuration配置 future.complete(null); } else if (response.error() == null || response.error() == CopycatError.Type.NO_LEADER_ERROR) { cancelConfigureTimer(); configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout().multipliedBy(2), () -> configure(type, future)); } else { cancelConfigureTimer(); future.completeExceptionally(response.error().createException()); } } }); }

 

处理ReconfigureRequest

public void connectServer(Connection connection) {
connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request));

 

ReserveState
@Override  public CompletableFuture
reconfigure(ReconfigureRequest request) { context.checkThread(); logRequest(request); if (context.getLeader() == null) { return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build())); } else { return this.
forward(request) .exceptionally(error -> ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build()) .thenApply(this::logResponse); } }

只是forward到leader

 

LeaderState

@Override  public CompletableFuture
reconfigure(final ReconfigureRequest request) { // If another configuration change is already under way, reject the configuration. // If the leader index is 0 or is greater than the commitIndex, reject the promote requests. // Configuration changes should not be allowed until the leader has committed a no-op entry. // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E if (configuring() || initializing()) { //如果正在configure或leader初始化,不能做配置变更 return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .build())); } // If the configuration request index is less than the last known configuration index for // the leader, fail the request to ensure servers can't reconfigure an old configuration. if (request.index() > 0 && request.index() < context.getClusterState().getConfiguration().index() || request.term() != context.getClusterState().getConfiguration().term() && (existingMember.type() != request.member().type() || existingMember.status() != request.member().status())) { return CompletableFuture.completedFuture(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.CONFIGURATION_ERROR) .build())); } Member member = request.member(); // If the client address is being set or has changed, update the configuration. if (member.clientAddress() != null && (existingMember.clientAddress() == null || !existingMember.clientAddress().equals(member.clientAddress()))) { existingMember.update(member.clientAddress(), Instant.now()); } // Update the member type. existingMember.update(request.member().type(), Instant.now()); Collection
members = context.getCluster().members(); CompletableFuture
future = new CompletableFuture<>(); configure(members).whenComplete((index, error) -> { //调用configure context.checkThread(); if (isOpen()) { if (error == null) { //如果成功,更新local配置 future.complete(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.OK) .withIndex(index) .withTerm(context.getClusterState().getConfiguration().term()) .withTime(context.getClusterState().getConfiguration().time()) .withMembers(members) .build())); } else { future.complete(logResponse(ReconfigureResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.INTERNAL_ERROR) .build())); } } }); return future; }

转载地址:http://xkbbl.baihongyu.com/

你可能感兴趣的文章
前端工程师必知之Promise的实现
查看>>
react简易实现(1) 组件的挂载
查看>>
以太坊构建DApps系列教程(三):编译部署测试TNS代币
查看>>
Angular开发实践(五):深入解析变化监测
查看>>
前端错误监控的方法
查看>>
JNI Java与C的相互调用与基本操作
查看>>
IOS下box-shadow的诡异bug的修复
查看>>
地图导航业下半场,高德与百度地图各缺一子?
查看>>
《深入浅出统计学》笔记
查看>>
读书笔记:架构探险 从零开始写javaweb框架 第三章
查看>>
搭建Redis原生集群
查看>>
RxJava2.X 学习笔记 -- 创建操作符
查看>>
JavaEE 银联支付之网站支付-消费类交易
查看>>
教女朋友写方法 -- 就要学习 Go 语言
查看>>
java里的基本知识
查看>>
我理解的MVC
查看>>
CI第二篇 集成项目(SVN)workspace编译生成ipa到fir 蒲公英(jenkins)
查看>>
Stetho查看数据库(android 直接查看数据库)
查看>>
Ruby 2.7 — Enumerable#tally
查看>>
230. Kth Smallest Element in a BST
查看>>