在上一篇讲到,NettyServerCnxnFactory用于处理客户端的连接和请求。
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
...
// 重要,处理器
pipeline.addLast("servercnxnfactory", channelHandler);
}
});
复制代码
其中,channelHandler处理器用于客户端的请求。
CnxnChannelHandler.java
, 主要看channelActive()、channelRead()方法。
@Sharable
class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel active {}", ctx.channel());
}
final Channel channel = ctx.channel();
if (limitTotalNumberOfCnxns()) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
channel.close();
return;
}
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}
// 新建一个NettyServerCnxn,相当于一个Session,并设置到CONNECTION_ATTRIBUTE
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
...
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...
allChannels.remove(ctx.channel());
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
...
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
....
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("message received called {}", msg);
}
try {
LOG.debug("New message {} from {}", msg, ctx.channel());
// 获得通过channelActive设置的NettyServerCnxn
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
// 处理网络包
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
LOG.error("Unexpected exception in receive", ex);
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
// 写回去给客户端
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
promise.addListener(onWriteCompletedTracer);
}
super.write(ctx, msg, promise);
}
}
复制代码
在channelHandler
处理器中,通过channelActive()
, 去创建一个NettyServerCnxn
,相当于服务器端的一个Session,用于接收和发送协议。再通过channelRead()
方法,调用cnxn.processMessage((ByteBuf) msg);
处理协议。
NettyServerCnxn.java
NettyServerCnxn#processMessage(ByteBuf buf)
void processMessage(ByteBuf buf) {
...
if (throttled.get()) {
...
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
// 接收消息
receiveMessage(buf);
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
...
}
}
}
复制代码
NettyServerCnxn#receiveMessage(buf);
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
if (bb != null) {
...
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
message.readBytes(bb);
bb.limit(bb.capacity());
...
if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
// 处理消息
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
...
}
}
} catch (IOException e) {
...
}
}
复制代码
这一步,主要是把Netty的ByteBuf
转化为NIO的ByteBuffer
,并交给ZookeeperServer
的processPacket(this, bb);
处理消息。
ZookeeperServer.java
ZookeeperServer#processPacket()
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
// 这一步主要是反序列化
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
...
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(incomingBuffer, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
// 通常是这一步处理消息
// 把它封装成一个Request对象
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
}
复制代码
这一步主要是把ByteBuffer
字节内容反序列化,并把它封装成一个Request
对象,再提交这个Request
对象给下一步处理。
ZookeeperServer#submitRequest()
public void submitRequest(Request si) {
enqueueRequest(si);
}
复制代码
ZookeeperServer#enqueueRequest()
public void enqueueRequest(Request si) {
if (requestThrottler == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (requestThrottler == null) {
throw new RuntimeException("Not started");
}
}
}
// 提交
requestThrottler.submitRequest(si);
}
复制代码
RequestThrottler#submitRequest()
public void submitRequest(Request request) {
if (stopping) {
LOG.debug("Shutdown in progress. Request cannot be processed");
dropRequest(request);
} else {
request.requestThrottleQueueTime = Time.currentElapsedTime();
submittedRequests.add(request);
}
}
复制代码
这一步主要是把请求提交到submittedRequests队列中,然后等到RequestThrottler
中的run()
去执行。
RequestThrottler#run()
@Override
public void run() {
try {
while (true) {
if (killed) {
break;
}
Request request = submittedRequests.take();
...
// Throttling is disabled when maxRequests = 0
...
// A dropped stale request will be null
if (request != null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
// 真正执行
zks.submitRequestNow(request);
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
int dropped = drainQueue();
LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
}
复制代码
这一步主要是从submittedRequests队列中take()出请求,然后交给ZookeeperServer
中的submitRequestNow(request);
方法马上执行。
ZookeeperServer#submitRequestNow()
public void submitRequestNow(Request si) {
...
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
// 从第一个处理器开始处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limits
requestFinished(si);
}
}
复制代码
这一步就是直接从第一个处理器开始处理请求,也在上一篇中提到的
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor 处理链。
总结一下,主要讲了一个网络包,从字节流,反序列化,然后封装成一个Request对象,
最后交给PrepRequestProcessor处理器去处理器。
下一篇继续分析具体的处理过程。