第一个Netty应用
Table of Contents
首先要确保有一个可以工作的开发环境,并通过构建一个简单的客户端和服务器来进行测试。虽然在开始下一章节前,还不会开始学习的 Netty 框架的细节,但在这里我们将会仔细观察我们所引入的 API 方面的内容,即通过 ChannelHandler 来实现应用的逻辑
设置开发环境
如果你已经有了 Maven 的开发环境,那你可以跳过本节
客户端/服务器 总览
我们将构建一个完整的的 Netty客 户端和服务器。虽然你可能集中在写客户端是浏览器的基于 Web 的服务,接下来你将会获得更完整了解 Netty 的 API 是如何实现客户端和服务器的:
图中显示了连接到服务器的多个并发的客户端
echo(回声)客户端和服务器之间的交互是很简单的;客户端启动后,建立一个连接发送一个或多个消息发送到服务器,其中每相呼应消息返回给客户端。诚然,这个应用程序并不是非常有用。但这项工作是为了更好的理解请求 - 响应交互本身,这是一个基本的模式的客户端/服务器系统
echo 服务器
Netty 实现的 echo 服务器都需要下面这些:
- 一个服务器 handler:这个组件实现了服务器的业务逻辑,决定了连接创建后和接收到信息后该如何处理
- Bootstrapping: 这个是配置服务器的启动代码。最少需要设置服务器绑定的端口,用来监听连接请求
通过 ChannelHandler 来实现服务器的逻辑
Echo Server 将会将接受到的数据的拷贝发送给客户端。因此,需要实现 ChannelInboundHandler 接口,用来定义处理入站事件的方法。由于应用很简单,只需要继承 ChannelInboundHandlerAdapter 就行了。这个类 提供了默认 ChannelInboundHandler 的实现,所以只需要覆盖下面的方法:
- channelRead(): 每个信息入站都会调用
- channelReadComplete(): 通知处理器最后的 channelread() 是当前批处理中的最后一条消息时调用
- exceptionCaught(): 读操作时捕获到异常时调用
EchoServerHandler
@Sharable //1 标识这类的实例之间可以在 channel 里面共享 public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); //2 日志消息输出到控制台 ctx.write(in); //3 将所接收的消息返回给发送者。注意,这还没有冲刷数据 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) //4 冲刷所有待审消息到远程节点。关闭通道后,操作完成 .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); //5 打印异常堆栈跟踪 ctx.close(); //6 关闭通道 } }
这种使用 ChannelHandler 的方式体现了 关注点分离 的设计原则,并简化业务逻辑的迭代开发的要求。处理程序很简单, 它的每一个方法可以覆盖到“hook”在活动周期适当的点。很显然,我们覆盖 channelRead因为我们需要处理所有接收到的数据
覆盖 exceptionCaught 使我们能够应对任何 Throwable 的子类型。在这种情况下我们记录,并关闭所有可能处于未知状态的连接。它通常是难以 从连接错误中恢复,所以干脆关闭远程连接。当然,也有可能的情况是可以从错误中恢复的,所以可以用一个更复杂的措施来尝试识别和处理 这样的情况
如果异常没有被捕获,会发生什么?
每个 Channel 都有一个关联的 ChannelPipeline,它代表了 ChannelHandler 实例的链。适配器处理的实现只是将一个处理方法调用转发到链中的下一个处理器。因此,如果一个 Netty 应用程序不覆盖exceptionCaught ,那么这些错误将最终到达 ChannelPipeline,并且结束警告将被记录。出于这个原因,你应该提供至少一个实现 exceptionCaught 的 ChannelHandler
关键点要牢记:
- ChannelHandler 是给不同类型的事件调用
- 应用程序实现或扩展 ChannelHandler 挂接到事件生命周期和提供自定义应用逻辑
引导服务器
了解完业务核心处理逻辑 EchoServerHandler 后,下面要引导服务器自身了:
- 监听和接收进来的连接请求
- 配置 Channel 来通知一个关于入站消息的 EchoServerHandler 实例
Transport
在网络的多层视图协议里面,传输层提供了用于端至端或主机到主机的通信服务。互联网通信的基础是 TCP 传输。当我们使用术语“NIO transport”我们指的是一个传输的实现,它是大多等同于 TCP ,除了一些由 Java NIO 的实现提供了服务器端的性能增强
EchoServer
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]); //1 设置端口值 new EchoServer(port).start(); //2 启动服务器 } public void start() throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(); //3 创建 EventLoopGroup try { ServerBootstrap b = new ServerBootstrap(); b.group(group) //4 创建 ServerBootstrap .channel(NioServerSocketChannel.class) //5 指定使用 NIO 的传输 Channel .localAddress(new InetSocketAddress(port)) //6 设置 socket 地址使用所选的端口 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //7 添加 EchoServerHandler 到 Channel 的 ChannelPipeline new EchoServerHandler()); } }); ChannelFuture f = b.bind().sync(); //8 绑定的服务器sync: 等待服务器关闭 System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress()); f.channel().closeFuture().sync(); //9 关闭 channel 和块,直到它被关闭 } finally { group.shutdownGracefully().sync(); //10 关闭 EventLoopGroup,释放所有资源 } } }
在这个例子中,代码创建 ServerBootstrap 实例(步骤4)。由于我们使用在 NIO 传输,我们已指定 NioEventLoopGroup(3)接受和处理新连接,指定 NioServerSocketChannel(5)为信道类型。在此之后,我们设置本地地址是 InetSocketAddress 与所选择的端口(6)。服务器将绑定到此地址来监听新的连接请求。
第7步是关键:在这里我们使用一个特殊的类,ChannelInitializer 。当一个新的连接被接受,一个新的子 Channel 将被创建, ChannelInitializer 会添加我们EchoServerHandler 的实例到 Channel 的 ChannelPipeline。正如我们如前所述,如果有入站信息,这个处理器将被通知
虽然 NIO 是可扩展性,但它的正确配置是不简单的。特别是多线程,要正确处理也非易事。幸运的是,Netty 的设计封装了大部分复杂性,尤其是通过抽象,例如 EventLoopGroup,SocketChannel 和 ChannelInitializer,其中每一个以后会更详细地讨论
在步骤8,我们绑定的服务器,等待绑定完成,调用 sync() 的原因是当前线程阻塞。在第9步的应用程序将等待服务器 Channel 关闭,因为我们 在 Channel 的 CloseFuture 上调用 sync()。现在,我们可以关闭下 EventLoopGroup 并释放所有资源,包括所有创建的线程(10)
总结
- EchoServerHandler 实现了的业务逻辑
- 在 EchoServer.main() 方法,引导了服务器:
- 创建 ServerBootstrap 实例来引导服务器并随后绑定
- 创建并分配一个 NioEventLoopGroup 实例来处理事件的处理,如接受新的连接和读/写数据
- 指定本地 InetSocketAddress 给服务器绑定
- 通过 EchoServerHandler 实例给每一个新的 Channel 初始化
- 最后调用 ServerBootstrap.bind() 绑定服务器
这样服务器初始化完成,可以被使用了
echo 客户端
客户端要做的是:
- 连接服务器
- 发送信息
- 发送的每个信息,等待和接收从服务器返回的同样的信息
- 关闭连接
用 ChannelHandler 实现客户端逻辑
跟写服务器一样,需要提供 ChannelInboundHandler 来处理数据。下面例子,我们用 SimpleChannelInboundHandler 来处理所有的任务,需要覆盖三个方法:
- channelActive(): 服务器的连接被建立后调用
- channelRead0(): 数据后从服务器接收到调用
- exceptionCaught(): 捕获一个异常时调用
EchoClientHandler
@Sharable //1 标记这个类的实例可以在 channel 里共享 public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", //2 当被通知该 channel 是活动的时候就发送信息 CharsetUtil.UTF_8)); } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); //3 记录接收到的消息 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //4 记录日志错误并关闭 channel cause.printStackTrace(); ctx.close(); } }
建立连接后该 channelActive() 方法被调用一次。逻辑很简单:一旦建立了连接,字节序列被发送到服务器。该消息的内容并不重要,在这里,我们使用了 Netty 编码字符串 “Netty rocks!” 通过覆盖这种方法,确保东西被尽快写入到服务器
接下来,覆盖方法 channelRead0()。这种方法会在接收到数据时被调用。注意:由服务器所发送的消息可以以块的形式被接收,当服务器发送 5 个字节是不是保证所有的 5 个字节会立刻收到 (即使是只有 5 个字节,channelRead0() 方法可被调用两次,第一次用一个ByteBuf(Netty的字节容器)装载3个字节和第二次一个 ByteBuf 装载 2 个字节)。唯一要保证的是, 该字节将按照它们发送的顺序分别被接收
第三个方法重写是 exceptionCaught()。就像所述的记录 Throwable 并且关闭通道,在这种情况下终止 连接到服务器
SimpleChannelInboundHandler vs. ChannelInboundHandler
何时用这两个要看具体业务的需要。在客户端,当 channelRead0() 完成,我们已经拿到的入站的信息。当方法返回时,SimpleChannelInboundHandler 会小心的释放对 ByteBuf(保存信息) 的引用。而在 EchoServerHandler,我们需要将入站的信息返回给发送者,由于 write() 是异步的,在 channelRead() 返回时,可能还没有完成。所以,我们使用 ChannelInboundHandlerAdapter,无需释放信息。最后在 channelReadComplete() 会调用 ctxWriteAndFlush() 来释放信息
引导服务器
客户端引导需要 host 、port 两个参数连接服务器
EchoClientServer
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); //1 创建 Bootstrap b.group(group) //2 指定 EventLoopGroup 来处理客户端事件。由于使用 NIO 传输,所以用到了 NioEventLoopGroup 的实现 .channel(NioSocketChannel.class) //3 使用的 channel 类型是一个用于 NIO 传输 .remoteAddress(new InetSocketAddress(host, port)) //4 设置服务器的 InetSocketAddress .handler(new ChannelInitializer<SocketChannel>() { //5 当建立一个连接和一个新的通道时,创建添加到 EchoClientHandler 实例 到 channel pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); //6 连接到远程,等待连接完成 f.channel().closeFuture().sync(); //7 阻塞直到 Channel 关闭 } finally { group.shutdownGracefully().sync(); //8 调用 shutdownGracefully() 来关闭线程池和释放所有资源 } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } final String host = args[0]; final int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
与以前一样,在这里使用了 NIO 传输。请注意,您可以在 客户端和服务器 使用不同的传输 ,例如 NIO 在服务器端和 OIO 客户端
总结
- 一个 Bootstrap 被创建来初始化客户端
- 一个 NioEventLoopGroup 实例被分配给处理该事件的处理,这包括创建新的连接和处理入站和出站数据
- 一个 InetSocketAddress 为连接到服务器而创建
- 一个 EchoClientHandler 将被安装在 pipeline 当连接完成时
- Bootstrap.connect() 被调用连接到远程的 echo(回声) 服务器