07-Netty原理篇

网络通信之Java IO模型详解

网络通信之Socket

image-20221017153946010
  • Socket起源于Unix,而Unix/Linux基本思想之一就是”一切皆文件”,也称为文件描述符

  • Socket是对“open—write/read—close”模式的一种实现

  • Socket是对TCP/IP协议的封装,Socket本身不是协议,通过Socket才能使用TCP/IP协议

基础概念

阻塞和非阻塞

当线程访问资源时,该资源是否准备就绪的一种处理方式。

若线程访问时,资源未准备就绪,线程什么也不做,就一直等待着资源就绪,这种处理方法就叫阻塞

但如果资源是不一直等待该资源,而是去做其他事情,那就是非阻塞

同步和异步

同步和异步是访问数据的机制

同步:调用者一旦开始调用方法,则必须等待调用方法的结果返回后,才能去做其他事。

异步:调用更像一个消息传递,调用一开始,方法调用马上就会返回,让调用者可以继续后续的操作。而异步方法通常会在另外一个线程中,“真实”地执行着。整个过程,不会阻碍调用者的工作,异步方法完成后,再通知调用者。(一般用回调函数实现)

简单例子:

你去商城买东西,你看上了一款手机,能和店家说你一个这款手机,他就去仓库拿货,你得在店里等着,不能离开,这叫做同步。

现在你买手机赶时髦直接去京东下单,下单完成后你就可用做其他时间(追剧、打王者、lol)等货到了去签收就ok了,这就叫异步。

阻塞、非阻塞与同步、异步

它们之间的区别

阻塞/非阻塞:关注的是程序(线程)等待消息通知时的状态

同步/异步:关注的是消息通知的机制

同步阻塞

小明一直盯着下载进度条,到100%的时候完成。

**同步体现在:**小明关注下载进度条并等待完成通知。(可以看成同步是我主动关注任务完成的通知)

**阻塞体现在:**在等待过程中,小明不去做别的东西。(可以看成异步是被动的,任务完成后再通知我)

同步非阻塞

小明提交下载任务后,就去干别的事了,但每过一段时间就去瞄一眼进度条,看到100%就完成。

同步体现在:小明关注下载进度条并等待完成通知。

非阻塞体现在:等待下载完成通知过程中,去干别的任务了,只是时不时会瞄一眼进度条;【小明必须要在两个任务间切换,关注下载进度】

这种方式是效率低下的,因为程序需要在不同任务的线程中频繁切换。

异步阻塞

小明换了个有下载完成通知功能的软件,下载完成就“叮”一声,

**异步体现在:**小明不用时刻关注进度条,在下载完成后,消息通知机制是由“叮”一声去通知小明的。

**阻塞体现在:**小明在等待“叮”的时候,不能去做其他事情。

异步非阻塞

小明仍然使用那个下载完会“叮”一声的软件,小明在提交下载任务后,就不管了,转而去做其他事情。而当下载完成后,下载软件会通过“叮”去主动通知小明。

异步体现在:小明不用时刻关注下载任务,而是让下载软件下载完成之后通过“叮”来通知他。

非阻塞体现在:小明在下载过程中,并非什么都不做,而是去做其他事情了。【软件处理下载任务,小明处理其他任务,不需关注进度,只需接收软件“叮”声通知,即可】

Java五种IO模型

阻塞IO模型(BIO)

概述

同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。这里使用那个经典的烧开水例子,这里假设一个烧开水的场景,有一排水壶在烧开水,BIO的工作模式就是, 叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶。但是实际上线程在等待水壶烧开的时间段什么都没有做。

如下图:

img

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,BIO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了

socket.accept()socket.read()socket.write()三个主要函数都是同步阻塞的

在这里插入图片描述

socket交互整体流程

image-20221017160645483
image-20221017160709032

连接四要素

怎么知道哪个连接属于哪个socket:

  • 连接四要素:[源IP、目标IP、源端口、目标端口]

socket底层数据结构

image-20221017162452394

实现

服务端

客户端

打开两个客户端,进行输入,观察,发现只有一个客户端被接收。

方法改进

通过使用线程池/多线程,每一个线程管理一个客户端

非阻塞IO模型(NIO)

概述

img

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。

从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,NIO特点是用户进程需要不断的主动询问内核数据准备好了吗?

在非阻塞式 I/O 模型中,应用程序把一个套接口设置为非阻塞,就是告诉内核,当所请求的I/O操作无法完成时,不要将进程睡眠而是返回一个“错误”,应用程序基于 I/O 操作函数将不断的轮询数据是否已经准备好,如果没有准备好,继续轮询,直到数据准备好为止。

在NIO模式中,一切都是非阻塞的

  • accept()方法是非阻塞的,如果没有客户端连接,就返回error

  • read()方法是非阻塞的,如果read()方法读取不到数据就返回error,如果读取到数据时只阻塞read()方法读数据的时间

在NIO模式中,只有一个线程:当一个客户端与服务端进行连接,这个socket就会加入到一个数组中,隔一段时间遍历一次,看这个socket的read()方法能否读到数据,这样一个线程就能处理多个客户端的连接和读取了

解决的问题

NIO成功的解决了BIO需要开启多线程的问题,NIO中一个线程就能解决多个socket,但是还存在2个问题。

问题一:

这个模型在客户端少的时候十分好用,但是客户端如果很多,比如有1万个客户端进行连接,那么每次循环就要遍历1万个socket,如果一万个socket中只有10个socket有数据,也会遍历一万个socket,就会做很多无用功,每次遍历遇到 read 返回 -1 时仍然是一次浪费资源的系统调用。

问题二:

而且这个遍历过程是在用户态进行的,用户态判断socket是否有数据还是调用内核的read()方法实现的,这就涉及到用户态和内核态的切换,每遍历一个就要切换一次,开销很大因为这些问题的存在。

  • 优点:不会阻塞在内核的等待数据过程,每次发起的 I/O 请求可以立即返回,不用阻塞等待,实时性较好。

  • 缺点:轮询将会不断地询问内核,这将占用大量的 CPU 时间,系统资源利用率较低,所以一般 Web 服务器不使用这种 I/O 模型。

结论:让Linux内核搞定上述需求,我们将一批文件描述符通过一次系统调用传给内核由内核层去遍历,才能真正解决这个问题。

IO多路复用因此应运而生,也即将上述工作直接放进Linux内核,不再两态转换而是直接从内核获得结果,因为内核是非阻塞的。

  1. 数据量大,轮询遍历浪费资源

  2. 需要用户态用read()方法去和内核态,沟通,成本高

让上面两步,直接交给linux的内核提供

  • 内核态是不会有阻塞的

  • 直接内核态处理完成后,直接给用户最终结果,直接交付,不涉及用户态和内核态的切换

实现

服务端

客户端

两个客户端

多路复用IO模型(IO Multiplexing)

概述

image-20221018164502831
  • channel:介于字节缓冲区和套接字之间,可以同时读写,支持异步IO

  • buffer:字节缓冲区,是应用程序和通道之间进行IO数据传输的中转

  • selector:多路复用器,监听服务端和客户端的管道上注册的事件

Java NIO(New IO) 不是IO模型中的NIO模型,而是另外的一种模型,叫做IO多路复用模型( IO multiplexing )。

实现

客户端

服务端

什么是多路复用

IO多路复用模型,就是通过一种新的系统调用,一个进程可以监视多个文件描述符(如socket),一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核kernel能够通知程序进行相应的IO系统调用。

目前支持IO多路复用的系统调用,有 select,epoll等等。select系统调用,是目前几乎在所有的操作系统上都有支持,具有良好跨平台特性。epoll是在linux 2.6内核中提出的,是select系统调用的linux增强版本。而Java NIO库中的 selector 底层就是IO多用复用技术。

多路复用与NIO的区别

NIO需要在用户程序的循环语句中不停地检查各个socket是否有数据读入,而IO多路复用在用户程序层面则不需要循环语句,虽然IO多路复用也是轮询,但是IO多路复用是交给内核进行各个socket的监控的。

其次,由于NIO多次调用read这种系统调用,因此会频繁造成用户态和内核态的转换,而IO多路复用则是先调用select这个系统调用去查询是否有数据就绪的socket,然后有数据就绪,才调用read这个系统调用来读。所以从性能上来说,IO多路复用会比NIO好。

在一定程度上来说,IO多路复用算是同步阻塞的一种,因为select会阻塞到有socket数据就绪为止。所以在应用上,一般会开一条程序来专门给select查询。

如下图为IO对路复用的过程:

(1)进行select/epoll系统调用,查询可以读的连接。kernel会查询所有select的可查询socket列表,当任何一个socket中的数据准备好了,select就会返回。

当用户进程调用了select,那么整个线程会被block(阻塞掉)。

(2)用户线程获得了目标连接后,发起read系统调用,用户线程阻塞。内核开始复制数据。它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存),然后kernel返回结果。

(3)用户线程才解除block的状态,用户线程终于真正读取到数据,继续执行。

img

IO多路复用的特点

IO多路复用模型,建立在操作系统kernel内核能够提供的多路分离系统调用select/epoll基础之上的。多路复用IO需要用到两个系统调用(system call), 一个select/epoll查询调用,一个是IO的读取调用。

和NIO模型相似,多路复用IO需要轮询。负责select/epoll查询调用的线程,需要不断的进行select/epoll轮询,查找出可以进行IO操作的连接。

另外,多路复用IO模型与前面的NIO模型,是有关系的。对于每一个可以查询的socket,一般都设置成为non-blocking模型。只是这一点,对于用户程序是透明的(不感知。因为是在内核处理的)。

**优点:**用select/epoll的优势在于,它可以同时处理成千上万个连接(connection)。与一条线程维护一个连接相比,I/O多路复用技术的最大优势是:系统不必创建线程,也不必维护这些线程,从而大大减小了系统的开销。

**缺点:**本质上,select/epoll系统调用,属于同步IO,也是阻塞IO。都需要在读写事件就绪后,自己负责进行读写,也就是说这个读写过程是阻塞的。

异步IO模型(AIO)

异步非阻塞无需一个线程去轮询所有IO操作的状态改变,在相应的状态改变后,系统会通知对应的线程来处理。

对应到烧开水中就是,为每个水壶上面装了一个开关,水烧开之后,水壶会自动通知我水烧开了。

img

(1)当用户线程调用了read系统调用,立刻就可以开始去做其它的事,用户线程不阻塞。

(2)内核(kernel)就开始了IO的第一个阶段:准备数据。当kernel一直等到数据准备好了,它就会将数据从kernel内核缓冲区,拷贝到用户缓冲区(用户内存)。

(3)kernel会给用户线程发送一个信号(signal),或者回调用户线程注册的回调接口,告诉用户线程read操作完成了。

(4)用户线程读取用户缓冲区的数据,完成后续的业务操作。

信号驱动IO模型

在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。这个一般用于UDP中,对TCP套接口几乎是没用的,原因是该信号产生得过于频繁,并且该信号的出现并没有告诉我们发生了什么事情。

Reactor模型和Proactor模型详解

概述

Reactor模型是针对同步IO的,而Proactor是针对异步IO的。

Reactor模型

概述

image-20221020200611860

**Reactor:**Reactor主要是用来监听事件的,无论accept事件还是read事件,至于它到底监听什么事件根据模型来决定,假设我们的服务器是一个餐厅,那么Reactor就相当于大门的迎宾工作人员

**Acceptor:**就是Reactor接收到事件后交给acceptor,然后接收到把得到的read事件负责交个handler处理, 那么在餐厅中就相当于把客人交给服务员。

**Handlers:**用来处理网络IO事件,处理IO操作。在餐厅中就相当于服务员。

单Reactor单线程模型

概述

来看下这张图,了解下,在单Reactor单线程模型中,他们的作用以及实现逻辑,首先客户端访问服务 端,在服务端这边首先是使用Reactor监听accept事件和read事件,当有连接过来,就交给acceptor处理 accept事件,当触发read事件,同时accept或把read事件交给handler处理。所有动作都是由一个线程完成 的,有人可能说一个线程完成看不出问题,那么我们在对应到餐厅中来看。

image-20221020200949058

在下图中,也就是单reactor单线程模型,迎宾负责看大门,负责迎接客人,负责给接收客人的要求,负责给客人做菜。负责给客人上菜。

image-20221020201018851

实现

SingleMain

在这个里面我们可以看到在主线程中启动了一个线程,那么这个线程要做什么呢,是不是就是我们之前NIO 中的每个步骤要在这个里面实现,所以我们来写下这个run方法的实现,我们定义一个Reactor的类,然后把监听的8080端口传入进去。

Reactor

我们来看下这个Reactor类如何实现,在他的构造函数中,我们去获取selector的对象,然后得到一个 ServerSocketChannel,同时绑定8080端口,设置为非阻塞,然后把serverSocketChannel注册到该 selector上,并且注册一个Accept事件。

在runnable中都会有一个run方法,然后在run方法中进行分发任务。当客户端连接过来之后,就会得到一个 SocketChannel,然后就可以处理客户端的IO事件的。接下来通过NIO中有一个attement的功能传递对象,然后去做这个事情。

Acceptor

Handler

单Reactor多线程模型

概述

我们既然已经知道了单线程Reactor模型的缺点,所以我们就可以去使用单Reactor多线程模型去解决这个问题,我们来看下这个图,在这个图中就看到使用了多线程的方式让线程不阻塞,也有点BIO的解决问题的思想了,但是还是有很大区别的。我们再来对比下餐厅。

image-20221020201744177

如下图,就相当于原来是一个迎宾在做,现在弄成多个服务员去处理客人的请求,那这样是不是就 已经很好了呢?我们先把代码实现了,我们再来看。

image-20221020201826335

实现

MutilMain

MutilReactor

MutilAcceptor

MutilDispatchHandler

主从Reactor多线程模型

概述

那么我们先来分析下,主从该如何实现,来看这张图,这个图中,我们可以看到安排了一个主Reactor去接收客户端的请求单,然后把接收到的请求交给Acceptor,在Acceptor的构造函数中初始化了多个SubReactor的子Reactor,然后封装成线程启动,所有的SubReactor线程都监听Read事件。

image-20221020202423180

当客户端的连接事件过来的时候,我们可以获取到SocketChannel,但是现在我们的Sub是阻塞的,所以我们需要唤醒selector,然后注册read事件,最后交给handler去处理。

image-20221020202446317

那么就相当于,现在不仅迎宾有一个铃,每个服务员都有一个铃,进来客人如果想要点餐,只需要在服务员的铃按一下就好了,就不用在迎宾的铃上按。

实现

MainReactorThread

MainReactor

SubReactor

MainAcceptor

WorkerHandler

Proactor模型

概述

Proactor模型,这个和Reactor模型不同的是他是基于异步IO的。

image-20221020203916532

先来看下整个图的逻辑,这张图总共是两个步骤:

  • 启动初始化

    • Proactorinitiator创建Proactor

    • Proactorinitiator创建handler

    • Proactor通过内核提供的Asynchronous Operation processor把Proactor和handler注册到内核

  • 客户端调用

    • 当IO操作完成是内核通知Proactor

    • Proactor根据不同的IO事件回调不同的handler完成业务处理

实现

ProactorMain

AIOProactor

AIOAcceptorHandler

ReadHandler

Reactor模型和Proactor模型区别

Reactor 是在事件发生时就通知事先注册的事件。

Proactor 是在事件发生时基于异步 I/O 完成读写操作(由内核完成),待 I/O 操作完成后才回调应用程序的处理器来进行业务处理。

深入浅出Netty编解码底层原理

Netty概述

什么是Netty?

Netty是一个异步的、基于事件驱动的网络应用框架,用于快速的开发可维护、高性能的网络服务器和客户端。

Netty底层使用的是NIO。

Netty与NOI的区别

  • NIO编写代码工作量大,bug多

  • NIO中需要自己解决粘包拆包问题,Netty提供了相应的API

  • epoll模型空轮询问题导致CPU 100%,Netty解决了该问题

  • 性能优化,ByteBuffer优化成ByteBuf,ThreadLocal优化成了FastThreadLocal

Netty与Mina的对比

  • Mina是apach维护,而且3.x会有很大的重构,API会不兼容

  • Netty的开发迭代更新迅速,API更加简洁与优秀(Netty自己说的)

  • Mina 将内核和一些特性的联系过于紧密,使得用户在不需要这些特性的时候无法脱离,相比之下性能会有所下降

  • Netty 比 Mina 使用起来更简单,如果上手只需要掌握模板代码 + 自定义 Handler 即可

Netty历史

Netty版本:

  • 3.x版本

  • 4.x版本(稳定版本,4.1.67.final)

  • 5.x版本(不稳定,已废弃)

简单示例

环境准备

服务端

NettyServer

SimpleServerHandler

客户端

NettyClient

SimpleClientHandler

ByteBuf

ByteBuf数据结构

image-20221021173422852

通过代码打印查看bytebuf的信息

image-20221022131604140

可以看出,当数据为空的时候

image-20221022131655330

当向ByteBuf中写入数据时,byteBuf.writeBytes(new byte[]{1,2,3,4});

image-20221022131754595

当读出数据的时候,指针也随之改变,byte a = byteBuf.readByte();,在打印看下

image-20221022131848934

在读取数据后变为废弃字节了,但是如果读取之后,又还想读取呢?

image-20221022131912554

ByteBuf扩容

如果写入的数据越来越多,Buf已经装不下了怎么办,之前看到每个buffer都有自己的容量,默认是256,所以Netty的Buf是可以自动扩容的。

image-20221022132033666

创建ByteBuf的方式

创建ByteBuf的方式有哪些?

image-20221022132254768

通过这个图可以看到,其实堆内的意思就是java虚拟机里面的堆空间,而堆外的意思是java进程中系统为它分配的堆空间。

  • jvm堆中的数据如果想要写入磁盘,就会进行write系统调用,调用过程为:jvm堆->系统堆->PageCache->磁盘

  • 如果数据是放在系统heap(进程的堆)中,调用过程为:系统堆->PageCache->磁盘

可以看到,使用堆外内存写入数据会少一次的拷贝次数

堆外内存和堆内内存的对比:

  • 堆内内存由JVM GC自动回收管理,但是GC需要时间开销成本;堆外内存不受JVM管理,一定程度上可以降低GC对应用运行带来的影响。

  • 堆外内存需要手动释放,稍有不慎会造成应用程序内存泄漏,当出现内存泄漏时问题排查相对困难

  • 当进行网络I/O操作、文件读写时,堆内内存需要转换为堆外内存,然后再与底层设备进行交互。直接使用堆外内存可以减少一次内存拷贝。

  • 堆外内存可以进行进程之间、JVM多实例之间的数据共享。

拆包与粘包

什么是粘包

粘包:数据从客户端传输到服务端,可能会把多个小的数据包封装成一个大包发送

image-20221022133022811

什么是拆包

拆包:数据从客户端传输到服务端,可能会把一个大的数据包拆成多个小数据包发送

image-20221022133037728

Netty编码与解码

一次编码和解码器

Netty中有4种解码器都是继承了ByteToMessageDecoder类,我们把这个类称之为一次解码器,他的作用是通过ByteBuf去传递数据,我们把这种把ByteBuf(原始数据流) -> ByteBuf(用户数据) 称之为一次解码器, Netty中对于这个解码器提供了4种。

Netty的4种解码器

FixedLengthFrameDecoder: 固定长度解码器

客户端和服务端都使用FixedLengthFrameDecoder,然后设置每次传输数据的长度,所以这种方式就规定我们的数据的长度要是固定的,可以很容易的就分辨出这种解码器不太好用。

LineBasedFrameDecoder: 发送消息加上换行符

既然上面说的需要固定长度,那么肯定有可以不用固定长度的,我们只需要告诉服务端,你告诉我你的数据是从什么时候结束就可以了,所以可以通过换行符来表示数据的结束,在发送数据的结尾添加“\n”换行符,这就是换行符解码器。

DelimiterBasedFrameDecoder: 自定义分隔符解码器

因为既然可以用换行符来表示结束,那么是不是应该也可以用其他的来表示结束了,所以Netty提供了一种自定义分隔符的方式表示数据结束,只需要在发送数据的时候在数据后面加上分隔的符号就行了,只是分隔的符号需要包装成一个ByteBuf。

LengthFieldBasedFrameDecoder: 长度域解码器

长度域解码器,该解码器比较特殊,是自定义协议的一种解码器,可以自己按照规定的方式去灵活的定义报文的格式,但是基本报文的格式就是如下几种,要想自定义协议,其实我们自己就知道,必须要知道数据从什么时候开始,从什么时候开始结束,这时我们最关心的,所以在定义报文的格式时,也的有相应的数据来表示数据开始,数据结束,我们先来看下这种比较灵活的解码器他的参数。

第一种,报文格式由length和content组成

image-20221022143759887

第二种,报文格式由length和content组成

image-20221022144326981

第三种,报文格式由type和leng的长度和content组成

image-20221022144401236

第四种,报文格式由leng的长度和type和content组成

image-20221022144423009

第五种,报文格式由type1和leng的长度和type2和content组成

image-20221022144500440
  • 开始的1个字节是长度域,所以需要设置长度域偏移为1

  • 长度域4个字节

  • 然后需要把type2+body当做body处理,所以数据长度需要加1

  • 接收数据不包括type1和长度域,所以需要跳过5个字节。

二次编码和解码器

将数据帧 转为正确的byte数组我们称之为一次解码,将byte数据流转换为java对象 , 我们称之为二次解码,那么在netty中的二次加码器都要去继承MessageToMessageDecoder类。

一次解码是将数据正确的拆分或者拼合 , 二次解码是将数据正确的序列化或者反序列化 。

我们可以来通过这个例子来了解netty中的二次解码器的使用,在这里我们需要传递的是一个序列化的对象,客户端发送到服务端后,服务端接收的不再是一个ByteBuf,直接接收的是一个对象。

首先定义一个对象,该对象要进行序列化

首先编写服务端代码

编写ServerHandler类

然后在通过客户端去发送数据,代码如下

然后看ClientHandler如何发送数据的

实现原理,首先客户端建立连接后,客户端就会调用channelActive方法去发送数据,在调用ctx.writeAndFlush(person)的时候会去调用到之前配置的new ObjectEncoder()编码器,因为客户端先编码再发送到服务端,然后服务端再解码。

自定义消息协议实战

协议的报文格式

概述

比如,以RocketMQ的报文的格式就是Header+body。

那么这个一定会有一个总长度,length,那么报文就是 length | Header| Body 但是如果只有一个总长度的 length,是无法知道Body的长度的,有人可能说加一个body的长度,但是在这里,我们可以用另外一种方式,就是把Header的长度算出来,然后用总的Length-Header Length-HeaderData,就能得到body的长度了。

为什么这样定义报文呢?

首先我们一定要知道我们的整体的报文长度,不然我们根本就不知道报文什么时候开始什么时候结束,那么这个长度能不能定义在结尾呢?肯定不行,因为报文如果在发送到长度字段时发了一半断了怎么办,就根本无法识别报文的长度。

为什么需要头部呢?

image-20221025141731561

头部中存放的数据是头的长度和头的数据,在HeadData中存储一些非正文数据,比如序列化方式,请求方式等。HeadData主要是为了记录报文的一些额外信息。当然头部并不是必须的。每个字段含义如下:

image-20221125102031970

消息协议

尝试自定义一个消息协议类。首先定义一个消息传输的对象MessageRecord,通过消息长度+消息头+消息体构成。

然后对消息头进行如下定义:

其中消息头包括消息头的长度以及消息头内容。对消息头内容进行封装成一个对象HeaderData,里面记录该消息的非正文数据,比如版本号,语言,序列化方式、请求类型等。

为了方便进行后续使用,对请求类型、序列化方式、语言进行枚举封装。

  • 请求类型

  • 语言

  • 序列化方式

消息客户端

  • NettyClient

  • ClientHandler

  • MyEncode:定义序列化方式

  • User:实体类,用于测试

消息服务端

  • NettyServer

  • ServerHandler

  • MyDecode

Netty实现长连接

短连接

短连接:短连接意味着每一次的数据传输都需要建立一个新的连接,用完再马上关闭它。下次再用的时候重新建立一个新的连接。

image-20221126172738774

优点:每次使用的连接都是新建的,所以基本上只要能够建立连接,数据就大概率能送达到对方

缺点:1. 每个连接都需要经过三次握手和四次握手的过程,耗时大大增;2. 系统端口使用过快,不仅影响当前进程,还会影响其他进程。

长连接

长连接:长连接意味着进行一次数据传输后,不关闭连接,长期保持连通状态。如果两个应用程序之间有新的数据需要传输,则直接复用这个连接,无需再建立一个新的连接。

image-20221126172912909

TCP长连接实现原理

image-20221126172939828

长连接是TCP底层实现的,TCP的长连接机制主要是看对方是否存活,而Http基于TCP实现的长连接,其实主要目的是为了复用连接。

Netty长连接实现原理

image-20221126172952759

在Netty中,提供了一个handler,用来发送心跳包,可以借助这个心跳来确定连接时候还正常,然后在做一些逻辑处理,首先在客户端添加心跳handler。

然后把客户端代码稍微的改下

然后在common项目下定义handler包,建立一个公共的handler类

然后修改clientHandler

最后修改下serverHandler

主流序列化和反序列化技术原理

为什么需要序列化

  • 序列化:把对象转换为字节序列的过程称为对象的序列化。

  • 反序列化:把字节序列恢复为对象的过程称为对象的反序列化。

通俗易懂点,就是下面两种情况:

  • 第一种情况:一般情况下Java对象的声明周期都比Java虚拟机的要短,实际应用中我们希望在JVM停止运行之后能够持久化指定的对象,这时候就需要把对象进行序列化之后保存到磁盘

  • 第二种情况:需要把Java对象通过网络进行传输的时候。因为数据只能够以二进制的形式在网络中进行传输,因此当把对象通过网络发送出去之前需要先序列化成二进制数据,在接收端读到二进制数据之后反序列化成Java对象

序列化的方式

概述

常见序列化方式如下:

  • Java序列化

  • XML序列化

  • Json序列化

  • Hessian序列化

  • Avro序列化

  • Protobuf序列化

  • jute序列化

  • messagepack序列化

  • Thrift序列化

  • Kyro序列化

Java序列化

在java序列化中,序列化的对象必须实现Serializable接口, 才能去进行序列化和反序列化,这里先来看下java的序列化

把序列化的代码提取出来,定义一个Serializer接口类,然后在里面可以写默认的方法,默认使用Java序列化方式。

现在自定义一个Serializer的接口,定义两个方法,一个是序列化,一个是反序列化。

然后新建JavaSerializer类,实现刚才的定义的接口,并通过EnumSerializer中定义的序列化类型通过其代表的一个字节进行表示,然后可以根据这个传递的类型决定使用什么样的序列化方式。

现在枚举中添加一个方法,这个方法就是会根据序列化类型执行返回相应的对象,知道枚举类定义的属性就是一个对象。

然后定义一个工具类,根据序列化类型来选择其中一种序列化方式,建立类SerializerUtil,在这个里面会根据枚举的类型判断使用那种序列化的类型。

然后修改自定义协议中关于序列化的方式,在MyEncode中有三处需要修改:

还有一个MyDecode中也修改下,但是在MyDecode收到的是byteBuffer对象,这个对象已经序列化了,所

以无法获取到序列化的类型了,所以需要做一点改变,也就是把序列化的类型放入到头长度的前8位里面保存起来。

markProtocolType方法如下:

然后在解码器中要重新去获取header长度值,但是这个值是包含了序列化类型的,要想真的拿到header长度的值,只需要&0xFFFFFF 24个1,就可以得到长度的值。(因为前8位用于存储的是类型)

序列化的类型通过到oriHeaderLen中去获取,只需要向右移动24位,然后&FF 8位1,那么就可以得到序列

化。

然后启动客户端和服务端。就发现可以实现这种序列化的判断了。

Json序列化

首先需要导入json相关包,这里使用fastjson

然后创建JsonSerializer类定义Json序列化方式

之后修改下SerializerUtil类

一般情况下,在序列化的时候耗费的时间,fastjson >=jackjson>Gson;在反序列化的时候,Gson稍微好一点。

  • Gson

XML序列化

Hessian序列化

dubbo默认就是使用Hassian序列化。

Protobuf序列化

Protobuf是由谷歌提供的一个序列化方案,它是一个灵活高效的用于序列化数据的协议,相比较XML和JSON格式,protobuf更小、更快、更便捷。google protobuf是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以编译成Java、python、C++、C#、Go等代码,然后就可以直接使用,不需要再写其他代码,自带有解析的代码。

首先在idea安装protobuf插件。

image-20221129203438795

然后再建立proto文件夹,

image-20221129203549031

之后make生成如图所示

image-20221129220225394

引入protobuf的jar包

首先定义实体类,在proto文件夹下添加MessageRecords.proto,(注意一定不要在中文路径下,不然会报错)

然后重新maven,clean一下,选中项目,然后查看plugins,选择protobuf-compile,然后把生成再target目录下的文件拷贝到相应的位置上。

然后把刚刚的序列化替换成Protobuf,首先无论是服务端还是客户端都需要去配置相应的handler。

  • ClientHandler

  • ServerHandler

  • ProtoToJsonUtil工具

Protobuf原理

由于protobuf的重要性,因此需要深入学习了解。

protobuf的二进制流结构

image-20221130100647320

通过StudentProtos进行生成StudentProtos.Student类,如下

实例:

之后进行测试打印查看

image-20221130101450024

其中,name表示为如下三个字段构成:

  • 第一位tag=10 = 1(属性定义时的位置) << 3 | 2(编码类型)= 1<<3 | 2 = 10;

  • 第二位length是长度,表示有多少个字符;

  • 第三位value是字符编码数组

age表示为如下两个字段构成:

  • 第一位tag = 16 = 2(属性定义时的位置) << 3 | 2(编码类型)= 2<<3 | 0 = 16;

  • 第二位value是36,是根据protobuf的算法设计的(正数varint算法和负数ZigZag算法)。

protobuf的正数varint算法

image-20221130102611004

其中300的二进制为 0000 0001 0010 1100。

Protobuf 负数ZigZag算法

image-20221130103009981对于32位整数,(n<<1)^(n>>31),即能实现zigzag编码。之后再使用varint算法进行压缩。

对于32位整数,(n>>1)^-(n&1),即能实现zigzag解码。

Protobuf为什么性能好

序列化速度快:编码 / 解码 方式简单,采用 Protocol Buffer 自身的框架代码 和 编译器 共同完成。

序列化后的数据量体积小:独特的编码方式,如Varint、Zigzag编码;采用T - L - V 的数据存储方式。

序列化选型建议

  • 对性能要求不高的场景,可以采用基于XML的SOAP协议

  • 对性能和间接性有比较高要求的场景,那么Hessian、Protobuf都可以。

  • 基于前后端分离,或者独立的对外的api服务,选用JSON是比较好的,对于调试、可读性都很不错

基于Netty手写实现RPC框架

什么是RPC

image-20221130135303630
  • RPC:Remote Procedure Call,远程过程调用

  • RPC目的:像调用本地一样调用远程的服务

RPC实现原理

image-20221130140353553

通过上述图可以知道,说白了RPC的整个过程就是将一个类的方法名 + 参数 + 类型发送到对方,然后进行反射调用。

常见的RPC协议框架

RPC代码编写

创建父类项目

创建项目名称为netty-rpc的父项目。

image-20221130144056260

导入相关依赖

然后在四个模块中添加父模块,交给父模块进行管理

创建四个子项目模块

RPC 模块划分

  • netty-rpc-api:公共服务定义

  • netty-rpc-protocal:自定义协议

  • netty-rpc-provider:服务提供方

  • netty-rpc-consumer:服务调用方

image-20221130144748254

第一步:构建consumer与provider模块

首先在netty-rpc-api中添加IUserService接口类,然后lifecycle去install一下,把项目发布到本地的mvn中。

然后在netty-rpc-provider项目中进行jar包管理

然后在netty-rpc-provider项目中建立service包,实现该接口

然后重新mvn下,mvn install gp-netty-api,如果打包失败,比如**没有main class,则需要创建一个main类。**如下:

然后在netty-rpc-consumer需要引入api的jar包

然后在netty-rpc-consumer的controller包下创建HelloController类,将来想通过定义注解的方式就能实现远程调用。

第二步:定义注解

由于这里不能采用Autowire注解去注入,因为要生成代理,这个生成代理的对象方法中是发送数据到远程服务,在gp-netty-protocol项目中添加注解。

在com.example.netty.rpc.protocal.annotation包下添加两个注解

  • MyRemoteReference:调用方需要生成远程调用代理的注解

  • MyRemoteService:服务提供方的需要发布的service的注解

然后给netty-rpc-provider和comsumer项目中的service类添加MyRemoteService注解和给userService属性注入,先引入jar包,然后添加注解**@MyRemoteService@MyRemoteReference**注解。

第三步:定义消息类型以及序列化类型

那么,接下来需要完成,从客户端的访问到服务端的service中的say方法,并且返回到网页上。

之前学习的netty,就是用来在网络中传输数据,那么传输数据就要知道传输的是什么数据,其实就是把调用的类,方法名,参数传输到服务端,然后通过反射去调用,这样就达到了远程调用的目的。

那么就看如何在自定义协议模块定义的报文格式以及编码器和解码器。这里规定了下报文格式,看看如何定义的。

报文的格式定为:header | body组成,首先头部有5部分组成,一个magic,用来校验数据,第二个就是序列化的方式,第三个就是请求类型,请求ID,长度字段组成。

body的内容根据是请求类型看是哪一个报文,如果是请求报文,则是发送类名,方法名,参数,参数类型给接收端,如果是响应报文,就是发送数据,消息给到请求方。

image-20221201143750110

在自定义协议模块中定义,首先在gp-netty-protocol项目中的**.protocal.core包下添加Header类。

然后在**.protocol.constant常量包中定义消息类型的枚举类

然后在**.protocol.constant常量包中定义消息类型的枚举类

在**.protocol.constant常量包中定义序列化的枚举类

在**.protocol.constant常量包中定义RpcConstant,其中包含魔数校验报文正确性,以及记录包头长度

然后在core包下定义发送数据包的协议类RpcProtocol

发送报文类RpcRequest

返回报文类

这里使用java和Json两种序列化方式,那么在netty-rpc-protocol包中添加json的jar包

定义序列化的接口,在serial的包下

然后分别实现java序列化方式和Json序列化方式

  • JavaSerializer

  • JsonSerializer

然后对外提供一个序列化管理器,通过key获取到相应的序列化的方式

第四步:编写编码器和解码器

接下来要编写编码器和解码器,现在已经知道数据传输的格式了,那么就处理以下编码器和解码器,现在netty-rpc-protocol下添加依赖

首先在netty-rpc-protocol项目下建立code包**.protocal.code。

首先编码RpcEncoder

解码器RpcDecode

第五步:生成代理类

客户端的请求的接口就要去生成代理,然后通过Netty客户端发送数据,进行编码,序列化等操作发送到Netty服务端,然后Netty服务端,解码,反序列化得到数据然后进行调用,先来看下给接口生成代理如何生成。

然后在netty-rpc-protocol中添加代理类,建立一个**.protocol.spring包,在包下建立reference包和service包,一个是给服务端使用,一个是编写客户端代码,在reference包下添加SpringRpcReferencePostProcessor类,去生成代理类,因为在spring的生命周期中有一个步骤是实现了BeanFactoryPostProcessor接口的类,这个里面的postProcessBeanFactory方法会在对象实例化之前执行,那么我们可以通过在实例化之前,做一些事情。

那么先去实现ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor接口。

其实生成代理类用到了BeanFactoryPostProcessor接口和FactoryBean的结合使用

image-20221130144307295

实现postProcessBeanFactory方法,spring容器加载了bean的定义文件之后, 在bean实例化之前执行。

在BeanFactory标准初始化之后调用,这时所有的bean定义已经保存加载到beanFactory,但是bean的实例还未创建

编写SpringRpcReferenceBean代码

然后定义RpcInvokerProxy类

然后在core中定义RequerstHolder类,用来存储请求id和结果的映射关系

回到RpcInvokerProxy类中,因为Netty是异步的,所以在netty中有很多异步操作,其实jdk中的异步操作有future,netty中的promise继承了future,(可以暂时认为是一个不会阻塞可以异步获取结果的工具类),在core中定义一个RpcFuture类

然后回到上述的invoke方法

第六步:定义Netty的客户端和服务端

客户端

在**.protocol.netty下建立netty包,定义NettyClient

然后客户端发送数据

然后回到RpcInvokerProxy类中invoke方法中实例化一个RpcFuture,这个是获取未来客户端发送请求,通过future可以异步的获取到数据

将来调用客户端后,客户端返回数据需要接收数据,建立一个handler包,然后建立ClientHander

把RpcClientHandler添加到pipeline中

然后回到SpringRpcReferencePostProcessor类中的parseRpcReference方法,里面的端口和地址还没有设置把要访问的iP和端口定义在配置文件中,在provider项目下的resources下添加application.properties文件

然后在reference包下定义一个类与application文件中的属性对应的类RpcClientProperties

然后交给spring容器,这个里面的值通过环境变量来设置值,这个环境变量里面会收集比如jdk的版本,操作系统是windows还是linux等信息,这个是系统自带的,那么还有定义在application中的数据也会被解析然后放入到环境变量中。所以之前在SpringRpcReferencePostProcessor中设置了BeanDefinition中的属性中地址和端口都还没有赋值,因此先去实例化RpcClientProperties,并且传递给SpringRpcReferencePostProcessor。

然后在SpringRpcReferencePostProcessor的构造函数中传入对象

parseRpcReference方法中ip和端口还没有设置

最后把生成的BeanDefinition注入到spring容器中,回到SpringRpcReferencePostProcessor中的postProcessBeanFactory方法,后面注入到容器中去

这里客户端就写完了,那么接下来是服务端,现在已经写了客户端大部分代码,还有序列化方式,编解码的代码,接下来就完成服务端的编写

服务端

在netty-rpc-provider中,添加application.properties文件,里面配置需要监听的端口

在service类上添加的注解MyRemoteService,加了这个注解的应该有什么作用,还有服务器也需要去监听端口,所以先编写NettyServer,在**.protocol.netty下创建类

在handler包下创建ServerHandler

添加到pipeline中

来看下方法是如何调用的。service上加了注解,这个类就会单独保存到其他的容器中保存,与spring容器隔离,在这里大家就理解为放入到了一个map中。

定义一个类,可以获取容器的类,然后从容器中获取对象,再调用方法。在service包下添加Mediator。

然后在这个类中,定义一个map,这个map中存放需要远程调用的Bean

在service包下创建BeanMethod类

然后定义方法,该方法就是根据客户端发送的请求报文,报文中的类名,方法名,其实这个map中放入的也是根据类名和方法名作为key,然后BeanMethod中存放的就是实例对象和方法,所以需要定义的这个方法其实就是真正的调用了。

回到ServerHandler中,调用方法获取到该值

那么map中的数据是什么是否存放的呢?之前说过,加了MyRomoteService注解的类都会被放入到容器中去,所以在启动容器的时候,需要定义一个类实现BeanPostProcessor介入到bean的生命周期中去,判断是否有注解,有注解就把对象加入到刚才写的map中。

RpcServerProperties类属性映射,这里通过@ConfigurationProperties(prefix = "rpc")进行属性注入。

通过@Bean注解实例化SpringRpcProviderBean对象。其中,@EnableConfigurationProperties注解的作用是:使使用 @ConfigurationProperties 注解的类生效。

之前写了一个Netty的服务端,在spring容器启动的时候如何启动netty服务器呢?比如springboot中内嵌了tomcat服务器,所以也可以自己手动内嵌一个服务器。那么可以利用之前学的生命周期中的一个接口InitializingBean,在生命周期的初始化步骤去启动Netty服务端。

然后编写服务端启动类NettyRpcProviderApplication和客户端启动类NettyRpcComsumerApplication

  • NettyRpcProviderApplication

  • NettyRpcConsumerApplication

分别启动服务端,再启动客户端,访问连接,看是否完成远程调用。

Netty核心源码分析

NIO的步骤

Netty启动流程

主线程

Boss线程

Work线程

Pipeline实现原理

Pipeline什么时候实例化

Pipeline的数据结构

Pipeline的节点的调用顺序

Pipeline中的handler执行的声明周期

SelectKeys结构优化

NIO中的空轮询bug

最后更新于

这有帮助吗?