1. Model
Netty is an implementation of the Therero model, let’s take a look at the Reator model first
one topic
I use a real example to explain the above picture, I believe you can understand
If you open a new Hotpot restaurant because the early-stage money is relatively short, you won’t have enough money to hire a clerk. Only you and your wife are two (the place of husband and wife). To make it easier for your wife, don’t let your wife do anything. Responsibility to just listen to customers’ needs and then pass them on to you (actually, your wife is here). Simplify what you need to do
1. Get clients
2. Provide customer service
3. Other tasks such as cleaning
The normal workflow is as follows:
Your wife was sitting at the door and saw someone enter the store. She immediately pulled you over to take the customer and then you arranged the customer at the table. At present, it is possible to understand that a connection has been made, and then you directly tell the customer what you asked my wife to say, because your wife will tell you the customer’s needs, and then deal with it.
Then the client ordered the food, and ordered to tell his wife to order a good plate. You can cook the bottom of the pot and pots. Now your wife will relay to you the customer’s needs: Table 1 has already ordered a good dish. You go to get ready, you are currently sending the bottom of the pot and vegetables to the customer. So you filled in the request of the client and server without your knowledge
Next …
New clients here… Your wife asked you to get her
Table #1 for adding water… Your wife told you to handle it
…
There are also some tasks like time to do, such as cleaning up after work at night, and so on.
The above scene can be understood as a single REATOR model. All you need is to do it on your own. If you are busy adding water to Table 1, you are bound to not deal with it in time…we will not deal with it in time…we cannot handle it…we cannot deal with it in time. If you find that you can only open a small shop in this way, then the business is hot and you can’t handle it. Questionnaire We need to develop multiple form below
Multi – re-read
You made money for two years because of your hard work. You want to open a little bigger shop and make more money. He shouldn’t be able to count on you alone, which is why I brought so many waiters. Five waiters A, B, CD, E, your arrangements are as follows:
a, b is responsible for receiving; C, D, E is responsible for customer satisfaction. Now you are the boss, you don’t have to do anything, haha
Normal workflow should be like this
The content of your wife’s work has not changed and the role of the selector remains
When a new customer arrives, your wife will report to one of the two from A and B and let her be responsible for receiving (eg: Xiao A come to the guest, she will receive it), and then bring the customer to the table. At this time, we will understand that a connection has been established (the client and the factory connect with the server), it is done, it returns to its original position and waits for other new clients
Then the dish in table 1 was clicked on and then told your wife my dishes were complete. Currently, your wife is organized C, D, E order to cook and pot (pay attention here, Nettie will be in the future. Person fulfills all the needs of this table and avoids context switching – 10)
So, a new client here, your wife has been advised of a treatment with…
Table 11 must be added to the water. Notify your wife of the treatment…
…
The above process can help us understand the multiple model
a, b – flow into the receiver stream
C, D, and e – IO processing and NIO business threads
Since then, this hot pot restaurant has been getting bigger and bigger…
2. Small thread model
The above example simply shows that when a dining table (the customer’s dining table) is set by a waiter, all the last table’s needs are managed by that waiter. In other words, when a connection is established as a client and server in Netty, an EventLoop (temporarily understood as a thread binding) will be set. All operations of this channel (static connection) are this event. Thread execution – In this way, the performance overhead caused by thread context switching and sequential processing of messages can be avoided. We will see this in detail in the source code.
3. The relationship between the channel, the pipeline, the context, and the processor
The relationship between the channel, the pipeline, the context, and the handler is as follows
We can see from the source code below
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
4. Small source code analysis
Netty’s source code is relatively the same, and the client and server are the same. This is only the main server code, junior version: 4.1.45.final
Starting from the instance code, it actually starts with ServerBootStraps
Deploy the Netty client and server code section first, they are all the same
Server:
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
final RegisterCentorHandler registerCentorHandler = new RegisterCentorHandler();
serverBootstrap.group(boss,worker)
.option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.SO_REUSEADDR,true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast("frameEncoder",
new LengthFieldPrepender(2));
ch.pipeline().addLast("MessageDecoder",new KryoDecoder());
ch.pipeline().addLast("MessageEncoder",
new KryoEncoder());
ch.pipeline().addLast(registerCentorHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(Configuration.REGISTER_CENTOR_PORT);
channelFuture.syncUninterruptibly();
Client:
EventLoopGroup boss = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
final DiscoveryHandler discoveryHandler = new DiscoveryHandler();
bootstrap.group(boss)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast("frameEncoder",
new LengthFieldPrepender(2));
ch.pipeline().addLast("MessageDecoder",new KryoDecoder());
ch.pipeline().addLast("MessageEncoder", new KryoEncoder());
ch.pipeline().addLast(discoveryHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(Configuration.HOST, Configuration.REGISTER_CENTOR_PORT);
boolean ret = channelFuture.awaitUninterruptibly(3000, TimeUnit.MINUTES);
Preliminary analysis
1.ServerBootstrap#group()
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
Nothing is said about this code, i.e. setting variables including ServerBootStrap.handler().
Let’s start with the server binding method, that’s the point
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
Binding IP addresses and ports, network programming is such a chore. SocketAddress is set above
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
Focus on two approaches
initAndRegister (): channel initialization and registration
dobind0 (): binding
2.initAndRegister ()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
(1) Create a channel
Set the channel code on the servers
ServerBootStrap.Channel (noserversocketchAnellance.class), let’s see how to create a channel channel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
We return to the initandregister method of Note 1
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
It can be seen that it is actually generated by reflection. Remember, the channel created here is NioServersocketchannel
Then we will look at the initialization method of comment 2 below
(2) init (channel)
This method is the pattern design pattern. AbstractBootStrap parent class. The two subclasses are ServerBootStrap and bootstrap is used to handle the server and client. By the way, I’ll mention it, so we see ServerBoostrap # init (Cure
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
The above method is more important, but not difficult. I heard her analysis slowly
(One). Canal and pipeline
Channel, Pipeline, Context, Processor, we’ve seen it in detail before,
Chanel has a conveyor assembly line
Doubly linked list links multiple processors on the pipeline, but the processor is compiled in a context, so the pipeline is context connected
When an accident happens, it is necessary
Note here that the channel is currently Noserversocketchannel. This is specifically used to establish a connection. After connecting later, the channel (Niococketchannel) will be created again.
Handler handle set by NioServersocketchannel set by ServerBootStrap.Handler()
Niococketchannel is used for I/O channel processing.
On the server, only the servers should be distinguished, because there are two different channels on the server; The client is not needed, but the client is niocketchetchannel only
(2) Add a handler
Pipeline. (), let’s look at the logic in it
One). Add our Handler() installed via ServerBootStrap.handler() to the pipeline
We can determine if this is necessary
2). Add a very important handler: ServerBootStrapacceceptor
We are focusing on the ServerBootStrapacceceptor add-on processor. When the client is online, this handler will handle it. Let’s see how he handles it.
Also why is this done by feeding tasks that are not called directly (to be analyzed later)
For the server, the connect event is the login handler, so let’s take a look at the ChannelRead() method.
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
If this method is called, it means that there are clients to trigger the server connection event. ServerBootStrapac Findor, the client, is used to handle the business logic of the client connection. The processing method is to obtain a subchannel. OK. (later), then set the processor, manual processing options, etc.and full registrationThe registration method is the same as the parent channel (NioServersocketchannel), let’s talk about it when registering.
(2) Registration
Above we saw the configuration of NioServersocketchannel. Now let’s take a look at the registered config() file. a group(). register(channel);
nioeventloopgroup returned from group()
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
The following() method returns EventLoop(nioeventloop). Here we have to consider the flow model we mentioned above. The channel will be processed by EventLoop for life, let’s look at the register() method
To make it easier to understand, let’s first take a look at
Let’s focus on the NioEventLooop function. It has a single thread pool feature. Let’s make an impression first.
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
You will then call the register() method of the inner AbstractChannel class.
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
The above code completes the binding of Channel and EventLoop; In addition, there is a thread in EventLoop. This method must determine whether the current thread matches the EventLoop thread. If it is registered directly, put it into a synchronous queue and then execute the EventLoop thread. Why are you doing this? This is the sample Netty thread analyzed above. A channel always has an executing thread, and this thread and Eventloop are required to avoid the performance penalty caused by context switches, encoding complexity, and event processing entity order.
Then we move on to reading the logging logic
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
Continue to see the recording first
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
This should register the proper Java channel for Seletor, but there’s nothing interesting here (0 means nothing interested). Let’s see where he scored an event of interest. The above annotation code 4 BeginReam() code
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
}
}
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
First of all, this method is abstract.
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
I think everyone understands here…
Pipeline.invokehandleradedEdifneededed() will eventually call the channel conduit.
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
removeState(ctx);
}
}
}
He echoed us earlier. We said earlier that the ability to call the Channelinitializer. init() method is where you come in.
The subsequent Active() and read() methods will consider a hand passed each time in the pipeline
Now we still have a higher debt. From the above code, we often see it through EventLoop.execute (new Runnable ())… This method is executed, let’s see how it is implemented.
Come first to nioeventloop #execute()
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
}
}
});
}
Finally, SingleThreadEventExeCutor #run() will be executed. This method is very important. If you know NIO-specific programming, it is also easy to see this paragraph. This is actually where the mission begins.
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
Finally, look at the ProcessSelectKey() method.
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
The above method will not be explained, and it is almost the same as the way we write the original NIO
5- Simple development precautions
Netty framework has been compiled for us so it is very simple to use. The client and server are fixed. For attention when we write a handler?
1. Don’t forget to free up memory
When we write a handler, we need to release the MemoryBuf. Otherwise, memory leaks. But Tailhandler’s Netty will help us release memory automatically
So our approach
(1) Any event is passed to Tailhandler
We can just use ChannelinBoundHandlerAdapter and ChannelOutBoundHandlerAdapter. After we write our business logic, we can call Super.read() to pass it to Tailhandler
You can also use SimpleChannelInboundhandler, finally in that from ChannelRead of this class call SERECTENCOUNTUTIL.RELEASE() to release memory. We just need to implement the ChanneRude0() method.
(2) Anyone can manually edit it on their own
Netty introduced a tool class, let’s call it and launch it
ReferenceCountUtil. release()
2. Truncation Wizard
When we don’t need to pass a handler for each incident, we need to pay attention to the following two things
(1) If we use ChannelInboundHandlerAdapter and ChannelOutBoundlerAdapter
(2) Be sure to manually release the memory