Netty event monitoring and processing (below) [with benefits]

Netty event monitoring and processing (below) [with benefits]

The previous article introduced the basic concepts of event monitoring, responsibility chain model, socket interface and IO model, thread model, and the overall structure of Netty. This article will talk about one of the three core modules of Netty: event monitoring and processing.

As mentioned earlier, Netty is a NIO framework. It abstracts the establishment, readability, and writability of IO channels into events, which are transmitted in the form of a chain of responsibility. A custom Handler can be inserted into the processing chain. Interested events are monitored and processed.

Through the introduction, you will learn:

  • Event monitoring and processing model
  • Event monitoring: EventLoop
  • Event handling: ChannelPipeline and ChannelHandler
  • Use Netty to implement Websocket protocol

There are benefits at the end of the article ~

Event monitoring and processing model

When network programming, the general writing process is like this:

  • Create a server-side Socket and monitor a certain port;
  • When there is a client connection, a new client Socket will be created to monitor the readable and writable status of the data. Each connection request will create a client Socket;
  • Reading and writing data will call the interface provided by Socket. The interface list was mentioned in the previous article;

In the traditional model, each client Socket creates a separate thread to listen to socket events. On the one hand, the number of threads that the system can create is limited, which limits the number of concurrency. On the other hand, there are too many threads and frequent thread switching, which leads to serious performance degradation.

With the development of the operating system IO model, multiplexed IO can be used, one thread monitors multiple Sockets. In addition, the server handles the client connection, and the monitoring of the client Socket can be processed in different threads.

Netty uses multiplexed IO for event monitoring. In addition, it uses different threads to handle client connections and data reading and writing.

The whole processing structure is as shown in the figure below, a brief description:

  • Boss EventLoopGroup mainly handles the client's connect event, including multiple EventLoops, one thread for each EventLoop;
  • Worker EventLoopGroup mainly handles the data read and write events of the client Socket, including multiple EventLoops, each with a thread;
  • Whether it is Boos or Worker, the processing of events is organized through Channel Pipleline, which is the realization of the responsibility chain model and contains one or more Handlers;
  • Listening to a port will only be bound to one Eventloop in the Boss EventLoopGroup;
  • An Eventloop in Worker EventLoopGroup can monitor multiple client Sockets;

EventLoop

An EventLoop is actually bound to a specific thread, and during its life cycle, the bound thread will not be changed.

EventLoop is responsible for two tasks:

  • The first is as an IO thread, performing IO operations related to Channel, including calling select to wait for ready IO events, reading and writing data and data processing, etc.;
  • The second task is used as a task queue to execute tasks in taskQueue. For example, the timing task submitted by the user calling eventLoop.schedule is also executed by this thread;

The first task is easier to understand, and the second is mainly explained: from socket data to data processing, to writing response data, Netty is processed in one thread, mainly for thread safety considerations, to reduce competition and thread switching, Through the task queue, the processing logic can be submitted in the user thread and executed in the Eventloop.

The whole EventLoop does is select -> processIO -> runAllTask, processIO processes the logic related to IO events, runAllTask processes tasks in the task queue, if too many tasks are executed, it will affect the processing of IO events, so it will limit the task processing Time, the whole processing process is as follows:

The run code of EventLoop is as follows:

protected void run() {
     for (; ; ) {
         oldWakenUp = wakenUp.getAndSet(false);
         try {
             if (hasTasks()) { //
                 selectNow();
             } else {
                 select(); //
                 if (wakenUp.get()) {
                     selector.wakeup();
                 }
             }
             cancelledKeys = 0;
             final long ioStartTime = System.nanoTime();
             needsToSelectAgain = false;

             //IO 
             if (selectedKeys != null) {
                 processSelectedKeysOptimized(selectedKeys.flip());
             } else {
                 processSelectedKeysPlain(selector.selectedKeys());
             }

             //IO 
             final long ioTime = System.nanoTime() - ioStartTime;
             final int ioRatio = this.ioRatio; //50

             //
             runAllTasks(ioTime * (100 - ioRatio)/ioRatio);

             if (isShuttingDown()) {
                 closeAll();
                 if (confirmShutdown()) {
                     break;
                 }
             }
         } catch (Throwable t) {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
             }
         }
     }
 }
 

ChannelPipeline and ChannelHandler

ChannelPipeline is an interface, which has a default implementation class DefaultChannelPipeline, which has two internal attributes: head and tail, both of which implement the ChannelHandler interface, corresponding to the head and tail of the processing chain.

 protected DefaultChannelPipeline(Channel channel) {
     this.channel = ObjectUtil.checkNotNull(channel, "channel");
     succeededFuture = new SucceededChannelFuture(channel, null);
     voidPromise =  new VoidChannelPromise(channel, true);

     tail = new TailContext(this);
     head = new HeadContext(this);

     head.next = tail;
     tail.prev = head;
}
 

When each Channel is created, a ChannelPipeline object is created to handle various events of the channel, and the ChannelHandler can be dynamically modified at runtime.

Where ChannelHandler carries the business processing logic, the classes we touch the most, we can customize the Handler and add it to the processing chain to implement custom logic.

ChannelHandler can be divided into two categories: ChannelInboundHandler and ChannelOutboundHandle, which correspond to the processing of inbound and outbound messages respectively, and are used for data reading and data writing. They provide interface methods for us to implement to handle various events:

public interface ChannelInboundHandler extends ChannelHandler {
	void channelRegistered(ChannelHandlerContext ctx) throws Exception;
	void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
	void channelActive(ChannelHandlerContext ctx) throws Exception;
	void channelInactive(ChannelHandlerContext ctx) throws Exception;
	void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
	void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
	void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
	void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}
 

When customizing Handler, generally inherit ChannelInboundHandlerAdapter or ChannelOutboundHandlerAdapter.

It should be noted that it is not recommended to directly implement time-consuming or blocking operations in the ChannelHandler, because this may block the Netty worker thread and cause Netty to fail to respond to IO processing in time.

Use Netty to implement Websocket protocol

Websocket protocol

Not the focus of this article, let s briefly explain:

  • It is a long connection protocol, which is supported by most browsers. Through websocket, the server can actively send messages to the client;
  • Websocket protocol, use HTTP protocol in the handshake phase, after the handshake is completed, use Websocket's own protocol;
  • Websocket is a binary protocol;
initialization

Netty provides the ChannelInitializer class to facilitate our initialization, create a WebSocketServerInitializer class, inherit the ChannelInitializer class, and add a ChannelHandler:

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

	@Resource
	private CustomTextFrameHandler customTextFrameHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("codec-http", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        
        pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
        pipeline.addLast("custome-handler", customTextFrameHandler);
    }
}
 

Under analysis, these Handlers are all provided by Netty by default:

  • HttpServerCodec: used to parse Http requests, mainly in the handshake phase;
  • HttpObjectAggregator: used to merge Http request header and request body, mainly processed in the handshake phase;
  • WebSocketServerProtocolHandler: handles the Websocket protocol;
  • CustomTextFrameHandler: Custom Handler, used to add your own business logic.

Is it very convenient? After being processed by the WebSocketServerProtocolHandler, the text data is read out, so you don't need to deal with the problems of data combination and unpacking yourself.

CustomTextFrameHandler

Custom Handler for business processing:

public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        final String content = frame.text();
        System.out.println(" "+content);   
        
        // 
        TextWebSocketFrame respFrame = new TextWebSocketFrame(" ");
        if (ctx.channel().isWritable()) {
		      ChannelFuture future = ctx.writeAndFlush(respFrame);
		  }			        
    }
}
 

Benefit description

Finally, let's talk about the welfare: the F code of the Xiaoai speaker.

Two copies are prepared, mainly to thank the friends of the "WeChat Official Account" and "Nuggets Community". Each copy includes 1 Xiao Ai Speaker F code and 1 Xiao Ai Speaker mini F code.

The F code of Xiaomi mobile phone is derived from the English word "Friend", which is the first right of purchase provided by Xiaomi to core users of Xiaomi and netizens who have contributed to Xiaomi. If you have a Xiaomi F code, you can directly use the Xiaomi F code without waiting. Buy related products!

Simply put, the F code is no need to grab it, you can buy it directly~

Lottery deadline

April 9th at 12 noon

Lottery rules
Nuggets community
  • Need to pay attention to my Nuggets account to be effective, personal homepage ;

  • Use WeChat lottery assistant to randomly draw for the Nuggets community;

WeChat public account
  • Need to follow my WeChat public account to be effective;

  • Use WeChat lottery assistant to randomly draw for WeChat official accounts;