Netty的使用

转载请标注原文地址:https://blog.csdn.net/lilyssh/article/details/84306563
项目源码地址:https://gitee.com/lilyssh/lilyssh-rpc

项目需求

用netty实现两个项目之间的通讯。

一、Server端:

1. 在pom.xml中添加以下依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>

2. 接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.lilyssh.receiver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ReferenceCountUtil;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class Receiver {
public void start(){
//根据给定的字符串内容创建一个ByteBuf。
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));

//(1)、初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
//NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。默认的线程数目是 CPU 核数 × 2。
EventLoopGroup group=new NioEventLoopGroup();
try {
//(2)、ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求。
ServerBootstrap serverBootstrap = new ServerBootstrap();
//(3)、通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池";
serverBootstrap.group(group)
//(4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(2222))
//(5)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) {
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
}
});
}
}
);
//(6)、 绑定并侦听某个端口
ChannelFuture f = serverBootstrap.bind().sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}
}
}

3. 调用、启动:

1
2
3
4
5
6
7
8
9
10
package cn.lilyssh;

import cn.lilyssh.receiver.Receiver;

public class MainReceiver {
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.start();
}
}

二、Client端:

1. 在pom.xml中添加以下依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>

2. 发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package cn.lilyssh.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class Sender {
public void start(){
//worker负责读写数据
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//设置线程池
b.group(worker);
//设置socket工厂
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
//设置管道
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//获取管道
ChannelPipeline pipeline = channel.pipeline();
//字符串解码器
pipeline.addLast(new StringDecoder());
//字符串编码器
pipeline.addLast(new StringEncoder());
//处理类
pipeline.addLast(new ClientHandler4());
}
});
//发起异步连接操作
ChannelFuture futrue = b.connect(new InetSocketAddress("127.0.0.1",2222)).sync();
//等待客户端链路关闭
futrue.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放NIO线程组
worker.shutdownGracefully();
}
}
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> {

//接受服务端发来的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server response : "+msg);
}

//与服务器建立连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//给服务器发消息
ctx.channel().writeAndFlush("i am client !");

System.out.println("channelActive");
}

//与服务器断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}

//异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭管道
ctx.channel().close();
//打印异常信息
cause.printStackTrace();
}

@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {

}
}

3. 调用、启动:

1
2
3
4
5
6
7
8
package cn.lilyssh.rpc;

public class MainSender {
public static void main(String[] args) {
Sender sender = new Sender();
sender.start();
}
}

会看到客户端regisry控制台打印了:channelActive,服务端lrpc控制台打印了:i am client !。
大功告成!

本文由 lilyssh创作。可自由转载、引用,但需署名作者且注明文章出处。


当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器