SegmentFault 小青蛙课堂最新的文章
2022-02-26T18:02:49+08:00
https://segmentfault.com/feeds/blogs
https://creativecommons.org/licenses/by-nc-nd/4.0/
无微不至之Zookeeper源码深度讲解(3)-选举过程
https://segmentfault.com/a/1190000041465718
2022-02-26T18:02:49+08:00
2022-02-26T18:02:49+08:00
小青蛙的倔强
https://segmentfault.com/u/leonard_5ed89eaaab5fc
0
<pre><code> public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view for a replica in the voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}</code></pre><p>选举过程:</p><pre><code> proposedLeader(选中为leader的节点id);
proposedZxid(选中为leader的lastZxid);
proposedEpoch(选中为leader的当前年代);</code></pre><p>1.所有节点首先将选票投给自己<br>2.调用sendNotifications方法,将自己当前选票发送给其他节点</p><pre><code> private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
sendqueue.offer(notmsg);
}
}</code></pre><p>这里可以看出其实就是将当前选票封装成ToSend结构体,然后放到sendqueue这个阻塞队列里,等待Sender线程(后面讲消息流转时讲解)从队列中拉取要发送的消息,然后发送给其他节点.</p><p>3.然后所有节点进入到while循环中,直到选举完成,也就是PeerState!=Looking,我们从while循环中可以看出首先是从recvqueue这个阻塞队列中取拉取其他节点发个过来的投票信息,recvqueue队列中存储的结构体为Notification:</p><table><thead><tr><th>属性</th><th>描述</th></tr></thead><tbody><tr><td>version</td><td>单元 2</td></tr><tr><td>leader</td><td>选举的leaderId</td></tr><tr><td>zxid</td><td>选举的leader的zxid</td></tr><tr><td>electionEpoch</td><td>当前选举轮次</td></tr><tr><td>state</td><td>发送者的节点状态</td></tr><tr><td>sid</td><td>发送者id</td></tr><tr><td>peerEpoch</td><td>选举的leader节点epoch</td></tr></tbody></table><p>此时需要对Notification内容有效性做一些校验,并做一些处理:<br>1)Notification为空,出现这种情况需要判断其他节点的发送队列中的消息是否某一个已经全部发送出去,如果存在这种情况,则需要将自身的选票再次发送给所有其他节点。<br>2)判断发送者的sid和选举leader的sid是否在有效视图中,如果是无效的sid,则直接丢弃这张选票。<br>当前选票确定是有效时,需要对发送的节点状态进行一些单独的处理:<br>①发送者节点状态也为Looking时:<br>1)当electionEpoch>logicallock(当前选票的选举轮次大于当前节点的选举轮次):<br>则当前节点清空recvset(投票箱)中的所有选票,并判断当前选票是否优于自身,如果优于自身,则更新自身选票为当前选票的投票内容,否则就投票给自身.<br>如何判断选票优于当前选票:</p><pre><code> protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}</code></pre><ul><li>首先时判断新选票的选举的leader的epoch是否大于当前节点选票的epoch (epoch越大说明经历的选举次数越大,状态更新)</li><li>如果epoch相同,则判断新选票的选举的leader的zxid是否大于当前节点选票的zxid(zxid越大,说明节点的事务数据存储越多)</li><li>如果zxid相同,则判断当前新选票选举的leader的sid是否大于当前节点选票的sid(sid判断策略则是作为兜底策略)<br>2)当electionEpoch<logicallock(当前选票的选举轮次小于当前节点的选举轮次)<br>出现这种情况,说明当前选票已经不属于此次选举轮次中的选票,则直接忽略掉此选票<br>3)当electionEpoch=logicallock(当前选票的选举轮次等于当前节点的选举轮次)<br>此时说明,新选票和自身选票处于一个投票轮次中,则直接判断新选票是否优于当前选票,如果优于当前选票,则更新当前选票内容为新选票内容,并将自身新选举内容发送给其他节点.<br>最后将新选票放入到recvset投票箱中,并判断投票箱中的投票是否有超过一半已经和自身的选票内容一致,如果未超过一半则再次重新进行上面选举流程,如果已经达到一半,则进行最后的判断,把recvqueue中的投票信息全部取出来进行判断,判断是否还存在优与当前自身选票的投票消息,如果有的话,则将当前选票重新放入recvqueue中,重新进行选举流程,没有的话则直接结束选举.</li></ul><pre><code>//判断投票箱中,是否已经超过一半节点的投票内容和当前节点投票内容一致
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
//最后判断其他节点发送过来的投票信息是否还存在优于当前节点投票信息的消息
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//说明队列消息为空或队列中的投票消息已经不会对当前节点的投票内容产生改变
if (n == null) {
//变更当前节点状态,如果选举的leader时自身,则状态变更未Leading,否则未Folloing或Observing
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//保存最终投票内容
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}</code></pre><p>②发送者节点状态也为Observing时:<br>Observer角色不参与选举过程,则来自Observer的选票直接忽略掉.<br>③发送者节点状态也为Following和Leading时:</p><p>投票消息流转过程:<br>在FastLeaderElection的构造方法中,会构造出来一个Messenger实例,该实例中会新开启2个线程,并一直轮询从FastLeaderElection#sendQuene中和QuorumCnxManager#recvQueue中拉取接受到的消息和需要发送的消息.</p><p>发送端:<br>FastLeaderElection发送选票(ToSend)<br>->存储在FastLeaderElection的sendQueue队列中<br>->FastLeaderElection.Messenger.WorkerSender#run轮询从sendQueue拉取消息并转换成ByteBuffer<br>->QuorumCnxManager#toSend方法<br>1)如果要发送的sid为当前节点,则直接放入QuorumCnxManager#recvQueue接受队列中<br>2)放入QuorumCnxManager#queueSendMap()队列中,ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>><br>->QuorumCnxManager.SendWorker#run方法从queueSendMap中拉取要发送的消息,通过Socket发送给其他节点,上一节,讲诉了和其他每个节点都维护了一个SendWorker线程<br>接受端:<br>->QuorumCnxManager.RecvWorker#run方法通过Socket阻塞读取其他节点发送的消息,转换成Message消息体,并放入QuorumCnxManager#recvQueue阻塞队列中,上一节,讲诉了和其他每个节点都维护了一个RecvWorker线程<br>->FastLeaderElection.Messenger.WorkerReceiver轮询通过QuorumCnxManager#pollRecvQueue方法从QuorumCnxManager#recvQueue队列中拉取消息并转换成Notification,并Notification放入FastLeaderElection#recvqueue队列中.<br>->FastLeaderElection#lookForLeader方法从FastLeaderElection#recvqueue拉取选票信息并处理.</p>
无微不至之Zookeeper源码深度讲解(2)-核心流程梳理
https://segmentfault.com/a/1190000041433623
2022-02-20T22:01:24+08:00
2022-02-20T22:01:24+08:00
小青蛙的倔强
https://segmentfault.com/u/leonard_5ed89eaaab5fc
0
<h3>一.源码仓库:</h3><p><a href="https://link.segmentfault.com/?enc=t44HmIA%2BBq0yRsCSm0jBAA%3D%3D.%2BjFPtHUMvCLOu6%2FDf%2BKSaCeD8q7sU7oYI61XYNzroxWabkSjjIE3CRK9WQausX8O" rel="nofollow">zookeeper</a><br>基于分支3.4.14分支在windows系统启动流程进行分析。</p><h3>二.流程分析:</h3><ol><li>源码入口<br>通过zkServer.cmd可执行文件内容可以看出zookeeper的服务端是通过org.apache.zookeeper.server.quorum.QuorumPeerMain这个类的main作为入口来启动服务端程序的.main方法传入的是我们zoo.cfg文件的地址,然后通过解析zoo.cfg文件,将key,value的配置信息转换成QuorumPeerConfig的对象,转换细节可以看QuorumPeerConfig.parse方法,其中转换后的核心配置参数有:</li></ol><table><thead><tr><th>参数名</th><th>参数描述</th></tr></thead><tbody><tr><td>dataLogDir</td><td>事务日志存储路径</td></tr><tr><td>dataDir</td><td>快照存储路径</td></tr><tr><td>electionType</td><td>选举算法,目前只支持3-快速选举算法</td></tr><tr><td>myid</td><td>当前服务id</td></tr><tr><td>tickTime</td><td>时间单位</td></tr><tr><td>initLimit</td><td> </td></tr><tr><td>syncLimit</td><td>事务存储路径</td></tr><tr><td>minSessionTimeout</td><td>最小会话超时时间</td></tr><tr><td>maxSessionTimeout</td><td>最大会话超时时间</td></tr><tr><td>peerType</td><td>角色类型-OBSERVER,PARTICIPANT</td></tr><tr><td>clientPort</td><td>客户端连接端口</td></tr><tr><td>clientPortAddress</td><td>客户端连接Host</td></tr><tr><td>snapRetainCount</td><td>快照保留个数,最小为3</td></tr><tr><td>purgeInterval</td><td>快照清除间隔</td></tr><tr><td>server.sid</td><td>hostName:port(通信端口):electionPort(选举端口):peerType</td></tr><tr><td>maxClientCnxns</td><td>最大客户端连接数</td></tr></tbody></table><p>拿到解析后的参数后,可以通过是否配置了server.id参数来决定是否集群启动还是单机启动,单机启动运行通过ZooKeeperServerMain#main方法启动,集群启动则还是在QuorumPeerMain#runFromConfig方法进行处理的,这里我们就直接讲解集群模式,因为集群模式比单机模式多了集群间的通信相关的处理,如Leader选举,数据同步,请求转发等.</p><pre><code> public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
quorumPeer = getQuorumPeer();
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}</code></pre><p>可以从代码片段中可以看出,新创建出了一个QuorumPeer对象,其实这就是OOP思想,当前实例代表着集群的一个节点,然后将QuorumPeerConfig重新设置给QuorumPeer对象,在这里出现几个新的类:</p><table><thead><tr><th>类名</th><th>类描述</th></tr></thead><tbody><tr><td>FileTxnSnapLog</td><td>持久化核心类别,包括快照,事务日志操作</td></tr><tr><td>ServerCnxnFactory 3</td><td>服务端网络处理核心类,其实现包含NIO和Netty两种实现</td></tr><tr><td>ZKDatabase</td><td>内存操作核心类,通过树结构存储</td></tr></tbody></table><p>在设置了参数之后,接下来调用了QuorumPeer#initialize方法,在这个方法里主要是一些鉴权类的对象实例化。核心还是QuorumPeer#start方法:</p><pre><code> loadDataBase();//将数据从快照和事务日志加载到内存中
cnxnFactory.start(); //网络服务启动
startLeaderElection(); //选举工作准备
super.start(); </code></pre><p>loadDataBase:<br>在这个方法里主要是通过委托给ZKDatabase#loadDataBase进行加载工作的</p><pre><code> public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}</code></pre><pre><code> public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions); //数据反序列化
return fastForwardFromEdits(dt, sessions, listener);
}</code></pre><pre><code> public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
//找到有效的100个快照文件,降序
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
//真正序列化的地方
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
//校验快照文件的完整性
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
//快照文件命名为snapshot.lastZxid
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}</code></pre><p>在ZkDataBase里有一下几个核心属性:</p><table><thead><tr><th>表列 A</th><th>表列 B</th></tr></thead><tbody><tr><td>DataTree dataTree</td><td>存储树结构</td></tr><tr><td>FileTxnSnapLog snapLog</td><td>事务快照持久化类别</td></tr><tr><td>,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts</td><td>会话管理,sessionId</td></tr></tbody></table><p>在loadDataBase方法中,可以看到调用的snapLog#restore方法,进入到restore方法中可以看到调用到的是FileTxnSnapLog#deserialize进行反序化,然后保存到传入的dt,sessions参数中,可以定位到FileTxnSnapLog#deserialize(DataTree dt, Map<Long, Integer> sessions,</p><pre><code> InputArchive ia)的这个重载方法来看下,如何对快照文件进行反序列化的:
</code></pre><pre><code> public void deserialize(DataTree dt, Map<Long, Integer> sessions,
InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers "
+ header.getMagic() +
" != " + FileSnap.SNAP_MAGIC);
}
</code></pre><p>首先通过文件输入流的包装类InputArchive进行读取,调用的是FileHeader#deserialize方法:</p><pre><code> public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
magic=a_.readInt("magic");
version=a_.readInt("version");
dbid=a_.readLong("dbid");
a_.endRecord(tag);
}</code></pre><p>FileHeader实现Record接口,其实后面所有需要的序列化和反序列化的都实现了这个接口,通过传进来的输入流对象来自定义自己的序列化和反序列化细节.<br>在这里可以看到FileHeader的存储结构为:</p><table><thead><tr><th>属性值</th><th>占用大小</th><th>描述</th></tr></thead><tbody><tr><td>magic</td><td>4字节</td><td>魔法数字</td></tr><tr><td>version</td><td>4字节</td><td>版本号</td></tr><tr><td>version</td><td>8字节</td><td>数据库id</td></tr></tbody></table><p>经过FileHedare#deserialize方法后,已经从文件流读取了16个字节,接下来调用的是 SerializeUtils#deserializeSnapshot(dt,ia,sessions)进行其他内容的加载,</p><pre><code> public static void deserializeSnapshot(DataTree dt,InputArchive ia,
Map<Long, Integer> sessions) throws IOException {
//会话数量
int count = ia.readInt("count");
while (count > 0) {
//会话id
long id = ia.readLong("id");
//会话超时时间
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id
+ " with timeout: " + to);
}
count--;
}
dt.deserialize(ia, "tree");
}</code></pre><p>可以看到首先是从流里面读取了4个字节的count属性,也就是会话数量,接着再遍历读取了8个字节sessionId(会话id)和4个字节的timeout(会话超时时间),再赋值个给了sessions(也就是ZkDataBase的sessionsWithTimeouts属性),最后调用的是DataTree#deserialize进行真正存储内容的反序列化工作:</p><pre><code> public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!path.equals("/")) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
node.parent = nodes.get(parentPath);
if (node.parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
node.parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
setupQuota();
aclCache.purgeUnused();
}</code></pre><hr><ol start="2"><li>网络传输(NIO)<br>zookeeper与客户端建立连接与请求与响应的数据传输都是通过ServerCnxnFactory这个类的实现类进行处理的,我们这里直接通过NIO的实现类NIOServerCnxnFactory来进行讲解,再QuorumPeer的start方法里我们看到调用NIOServerCnxnFactory#start方法.</li></ol><pre><code> public void start() {
// ensure thread is started once and only once
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}</code></pre><p>再start方法里我们看到就简单调用了Thread#start方法启动线程.至于thread方法是在哪里进行初始化的,我可以定位到NIOServerCnxnFactory#configure方法里:</p><pre><code> public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
//初始化线程对象
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
//设置最大连接数参数
maxClientCnxns = maxcc;
//初始化Socket相关配置
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}</code></pre><ol start="3"><li><p>选举<br>在进启动了网络传输服务之后,就开始准备着选举前的一些准备工作,我们可以从QuorumPeer#start方法中的QuorumPeer#startLeaderElection()调用进行一个选举的切入点:</p><pre><code> synchronized public void startLeaderElection() {
try {
//设置初始化投票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
//启动响应线程
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
//根据配置的选举算法进行一些初始化工作
this.electionAlg = createElectionAlgorithm(electionType);
}</code></pre><p>从startLeaderElection这个方法中可以看出,主要是将初始化投票设置为自身,sid为自身serverId,zxid为通过快照和事务日志加载后的最大lastZxid,还有peerEpoch(选举年代)也就是当前自身的选举年代,然后就是启动了ReponseThread这个响应线程,核心逻辑还是在createElectionAlgorithm这个方法中,我们可以跟进去看一下具体的代码逻辑:</p></li></ol><pre><code> protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
//已过时
le = new AuthFastLeaderElection(this);
break;
case 2:
//已过时
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//创建连接管理器
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
//启动监听其他节点的连接请求
listener.start();
//实例化快速选举算法核心类
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}</code></pre><p>从上述代码中,可以看出主要工作是实例化了一个QuorumCnxManager这个对象,也就是通过这个对象中的Listener这个类来处理和其他节点的连接请求,调用了Listener#start方法实际是运行到了Listener#run方法代码中:</p><pre><code> public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
//实例化ServerSocket
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
//阻塞等待其他节点请求连接
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
//接受请求核心逻辑
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}</code></pre><p>该方法主要是使用jdk的阻塞io与其他节点建立连接,不了解的可以去自行补充一下jdk的socket编程基础知识,在第二个while循环中的ss.accept()代码是会一直阻塞等待其他节点请求连接,当其他节点建立连接后,就会返回一个Socket实例,然后将Socket实例传入receiveConnection方法中,然后我们就可以和其他节点进行通信了,具体receiveConnection代码逻辑如下:</p><pre><code> public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
//将输入流进行多次包装
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
//真正处理连接
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
closeSocket(sock);
}
}</code></pre><p>将io输入流包装后,进一步调用了handleConnection进行连接的处理:</p><pre><code> private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null;
try {
// 阻塞等待另外一个节点发送建立请求的第一个包
//先读取8个字节,又可能sid(服务id),也有可能是protocolVersion(协议版本)
sid = din.readLong();
//读取到的是协议版本
if (sid < 0) {
//进一步读取8个字节,就是真正的sid
sid = din.readLong();
//读取4个字节,也就是读取到的是剩余的其他内容的字节数
int num_remaining_bytes = din.readInt();
//进行字数校验
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
//一次性将所有剩下的字节内容读取到b这个字节数组中
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
LOG.debug("Authenticating learner server.id: {}", sid);
authServer.authenticate(sock, din);
//如果读取的sid小于当前节点的sid,则关闭之前建立过的连接
if (sid < this.mySid) {
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
//关闭之前的连接后,由当前节点发起连接请求
connectOne(sid);
} else {
//发送线程
SendWorker sw = new SendWorker(sock, sid);
//接受线程
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
//启动发送线程
sw.start();
//启动接受线程
rw.start();
return;
}
}</code></pre><p>从这段代码中可以看出<strong>,建立请求只能由sid大的一方发起,由sid小的一方接受</strong>,如现在有sid=1,sid=2,sid=3三个节点,那么只能由2这个节点发起连接请求,1这个这个节点处理连接请求.这样就保证了双方只保持着一条连接,因为Socket是全双工模式,支持双方进行通信.Socket可以通过ss.accept获取到,还可以通过当前方法的connectOne这个方法去和sid较小的节点进行连接:</p><pre><code> synchronized public void connectOne(long sid){
//就是判断sendWorkerMap中是否包含了当前sid
if (!connectedToPeer(sid)){
InetSocketAddress electionAddr;
if (view.containsKey(sid)) {
//拿到之前配置的server.id的选举地址
electionAddr = view.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
LOG.debug("Opening channel to server " + sid);
//实例化Socket对象
Socket sock = new Socket();
setSockOpts(sock);
//进行连接
sock.connect(view.get(sid).electionAddr, cnxTO);
LOG.debug("Connected to server " + sid);
if (quorumSaslAuthEnabled) {
initiateConnectionAsync(sock, sid);
} else {
//同步初始化连接,也就是将当前自身的一些信息发送给其他节点
initiateConnection(sock, sid);
}
} catch (UnresolvedAddressException e) {
LOG.warn("Cannot open channel to " + sid
+ " at election address " + electionAddr, e);
if (view.containsKey(sid)) {
view.get(sid).recreateSocketAddresses();
}
throw e;
} catch (IOException e) {
LOG.warn("Cannot open channel to " + sid
+ " at election address " + electionAddr,
e);
if (view.containsKey(sid)) {
view.get(sid).recreateSocketAddresses();
}
}
} else {
LOG.debug("There is a connection already for server " + sid);
}
}</code></pre><pre><code> public void initiateConnection(final Socket sock, final Long sid) {
try {
startConnection(sock, sid);
} catch (IOException e) {
LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
new Object[] { sid, sock.getRemoteSocketAddress() }, e);
closeSocket(sock);
return;
}
}</code></pre><pre><code> private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
dout = new DataOutputStream(sock.getOutputStream());
//将自身sid发送给其他节点
dout.writeLong(this.mySid);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
authLearner.authenticate(sock, view.get(sid).hostname);
if (sid > this.mySid) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + this.mySid + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
//以下逻辑就和通过ss.accept拿到socket对象之后一样的逻辑
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}</code></pre><p>从以上几个方法中可以看出,在通过ServerSocket.accpet和socket.connect拿到了Socket对象之后,实例化出来一个SendWorker和一个RecvWorker这个对象,并调用了各自的start方法去启动两个线程,其实就是通过这2个线程去完成和其他节点的请求和响应的数据传输工作,一<strong>个节点维护一个SendWorker、一个RecvWorker和通过queueSendMap来存储一个队列来进行通信的。</strong><br>具体后面这3个对象是如何发挥作用的,会在选举细节中具体讲解.完成这一系列的选举准备工作后,我们回到QuorumPeer#start方法中,接下来QuorumPeer#start方法调用super.start()方法,因为QuorumPeer这个对象继承了ZooKeeperThread,而ZooKeeperThread又继承了jdk的Thread类,所以调用了super.start之后,就会单独开辟一个线程去执行QuorumPeer#run方法,也就是真正进行选举的地方:</p><pre><code> public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
//1.jmx拓展点
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
2.//选举逻辑
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
//1.Looking状态
case LOOKING:
LOG.info("LOOKING");
//开启只读模式
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
//调用ElectionAlg#lookForLeader方法,然后返回选举后的投票信息
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
//选举结束,observer角色进如到此处
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
//选举结束,Follower角色进入到此
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
//选举结束,Leader角色进入到此
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}</code></pre><p>我们可以从上诉代码中的MainLoop处开始看,进入while循环后,因为当前节点还是looking状态,苏所以进入到looking分支,在这个分支中可以看到首先判断当前节点是否是只读模式,因为当前不讲解只读模式,所以直接进入到另外一个分支:</p><pre><code> setBCVote(null);
//调用ElectionAlg#lookForLeader方法,然后返回选举后的投票信息
setCurrentVote(makeLEStrategy().lookForLeader());</code></pre><p>makeLEStrategy方法返回的其实就是我们在QuorumPeer#startLeaderElection方法中实例话出来的FastLeaderElection实例,然后调用FastLeaderElection#lookForLeader方法进行Leader选举:<br><a href="https://segmentfault.com/a/1190000041465718">选举讲解</a></p><p>未完待续.......</p>
无微不至之Zookeeper源码深度讲解(1)-大纲介绍
https://segmentfault.com/a/1190000041430490
2022-02-19T21:32:54+08:00
2022-02-19T21:32:54+08:00
小青蛙的倔强
https://segmentfault.com/u/leonard_5ed89eaaab5fc
0
<p>本系列大纲大致为:</p><p>1.服务端:<br>选举过程(zab协议)<br>发现阶段<br>同步阶段<br>广播阶段<br>请求处理链路<br>2.客户端:<br>请求发送过程<br>客户端监听机制</p>