Zookeeper 可以说是业界最流行的分布式协调解决方案,其源码值得我们好好静下心来学习和研究。
这篇文章主要分析 NIOServerCnxnFactory 这个类。NIOServerCnxnFactory 和 NettyServerCnxnFactory 是 Zookeeper 服务端用来处理连接的核心类,前者基于 NIO,后者基于 Netty 框架。废话少说,让我们一起来看下 NIOServerCnxnFactory 这个类是如何实现的:)。
NIOServerCnxnFactory 基于 NIO 实现了一个多线程的 ServerCnxnFactory,线程间的通信都是通过 queue 来完成的。NIOServerCnxnFactory 包含的线程如下:
NIOServerCnxnFactory 的启动入口为 startup 方法,如下所示:
public void startup(ZooKeeperServer zks, boolean startServer)
throws IOException, InterruptedException {
//自身的启动逻辑
start();
//设置 zkServer
setZooKeeperServer(zks);
if (startServer) {
//启动 zkServer
zks.startdata();
zks.startup();
}
}
start() 方法包含自身的启动逻辑,而 zks.startdata() 和 zks.startup() 用来启动 zkServer。NIOServerCnxnFactory 是用来管理连接的,而数据处理逻辑则由 zkServer 完成。start() 方法的逻辑如下所示:
public void start() {
stopped = false;
//worker 线程服务,用来进行 socket 的 I/O
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
//selector 线程,用来监听 socket 事件
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// accept 线程
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
// 连接管理线程
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
可以看到,start 方法主要生成或启动上述的 accept 线程、selector 线程、worker 线程和连接管理线程。
accept 线程的 run() 方法如下:
public void run() {
try {
//判断是否需要退出
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
//监听连接事件,并建立连接
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
//关闭 selector
closeSelector();
if (!reconfiguring) {
//唤醒 selector 线程并通知 worker 线程关闭
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}
accept 线程主要监听连接事件,并建立连接,并分派给 selector。在退出时,关闭它自身的 selector,然后唤醒用来进行 socket I/O 的 selector 线程,最后通知 worker 线程退出。
accept 线程在 select 方法中监听连接事件,然后进入 doAccept() 方法建立连接,分派给 selector 线程,doAccept() 方法如下所示:
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
//建立连接
sc = acceptSocket.accept();
accepted = true;
//防止来自一个 IP 的连接是否过多
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
throw new IOException("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
}
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
//使用轮询来将连接分派给某个 selector 线程
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException(
"Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
acceptErrorLogger.rateLimitLog(
"Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
如代码注释所示,doAccept 方法主要做了两件事:
selector 线程的 run 方法如下所示:
public void run() {
try {
while (!stopped) {
try {
//监听读写事件并处理
select();
//处理 accept 线程新分派的连接
processAcceptedConnections();
//更新连接监听事件
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
//......
} finally {
closeSelector();
// 唤醒 accept 线程及其他线程,并通知 worker 线程退出
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}
可以看到,selector 线程主要做三件事:
其中在 select() 方法中,selector 线程会把有事件发生的连接封装成 IOWorkRequest 对象,然后调用 workerPool.schedule(workRequest) 来交给 worker 线程来处理。
worker 线程的核心处理逻辑在 IOWorkRequest 的 doWork() 中,如下所示:
public void doWork() throws InterruptedException {
//如果 Channel 已经关闭则清理该 SelectionKey
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
//如果可读或可写,则调用 NIOServerCnxn.doIO 方法,通知 NIOServerCnxn 连接对象进行 IO 读写及处理
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key);
//如果已经 shutdown 则关闭连接
if (stopped) {
cnxn.close();
return;
}
//如果 Channel 已经关闭则清理该 SelectionKey
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
//更新该会话的过期时间
touchCnxn(cnxn);
}
//已经处理完读写,重新标记该连接已准备好新的 select 事件监听
cnxn.enableSelectable();
//把该连接重新放到 selectThread 的 updateQueue 中,selectThread 会在处理处理完所有 Channel 的读写和新连接后,更新此 Channel 的注册监听事件
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close();
}
}
具体逻辑见代码注释。

