基于Netty实现聊天室

Netty简介

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程。

Netty更多详细介绍

Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package chatroom;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* @Author: Usher
* @Description:
* 聊天室
*/
public class ChatServer {

private int port;

public ChatServer(int port) {
this.port = port;
}

public void start(){
//配置服务端的NIO线程组
//实际上EventLoopGroup就是Reactor线程组
//两个Reactor一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
//ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//Set the EventLoopGroup for the parent (acceptor) and the child (client).
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//回调请求
.childHandler(new ChatServerInitialize())
//.localAddress(new InetSocketAddress(port))
//配置NioServerSocketChannel的TCP参数
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_KEEPALIVE,true);

//绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
ChannelFuture future = bootstrap.bind(port).sync();

System.out.println("服务器启动:");
//使用sync方法进行阻塞,等待服务端链路关闭之后Main函数才退出
future.channel().closeFuture().sync();
System.out.println("服务器关闭:");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//优雅退出,释放线程池资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new ChatServer(8888).start();
}
}

ChatServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package chatroom;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
* @Author: Usher
* @Description:
* 回调处理类,继承SimpleChannelInboundHandler处理出站入站数据,模板设计模式,让主要的处理逻辑保持不变,让变化的步骤通过接口实现来完成
*/
public class ChatServerHandler extends SimpleChannelInboundHandler <String>{


public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();//获得客户端通道
//通知其他客户端有新人进入
for (Channel channel : channels){
if (channel != inComing)
channel.writeAndFlush("[欢迎: " + inComing.remoteAddress() + "] 进入聊天室!\n");
}

channels.add(inComing);//加入队列
}

/**
* 断开连接
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel outComing = ctx.channel();//获得客户端通道
//通知其他客户端有人离开
for (Channel channel : channels){
if (channel != outComing)
channel.writeAndFlush("[再见: ]" + outComing.remoteAddress() + " 离开聊天室!\n");
}

channels.remove(outComing);
}

/**
* 每当从客户端有消息写入时
* @param channelHandlerContext
* @param s
* @throws Exception
*/
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel inComing = channelHandlerContext.channel();

for (Channel channel : channels){
if (channel != inComing){
channel.writeAndFlush("[用户" + inComing.remoteAddress() + " 说:]" + s + "\n");
}else {
channel.writeAndFlush("[我说:]" + s + "\n");
}
}
}

/**
* 当服务器监听到客户端活动时
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 在线");
}

/**
* 离线
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 离线");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel inComing = ctx.channel();
System.out.println(inComing.remoteAddress() + "通讯异常!");
ctx.close();
}
}

ChatServerInitialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package chatroom;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* @Author: Usher
* @Description:
ChannelInitializer继承于ChannelInboundHandler接口,
ChannelInitializer是一个抽象类,不能直接使用
用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作。ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。

使用场景:

a. 在ServerBootstrap初始化时,为监听端口accept事件的Channel添加ServerBootstrapAcceptor

b. 在有新链接进入时,为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler
*/
public class ChatServerInitialize extends ChannelInitializer<SocketChannel> {

protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("客户端连接:" + socketChannel.remoteAddress());
//用户定义的ChannelInitailizer加入到这个channel的pipeline上面去,这个handler就可以用于处理当前这个channel上面的一些事件
ChannelPipeline pipeline = socketChannel.pipeline();
//ChannelPipeline类似于一个管道,管道中存放的是一系列对读取数据进行业务操作的ChannelHandler。

/**
* 发送的数据在管道里是无缝流动的,在数据量很大时,为了分割数据,采用以下几种方法
* 定长方法
* 固定分隔符
* 将消息分成消息体和消息头,在消息头中用一个数组说明消息体的长度
*/
pipeline.addLast("frame",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decode",new StringDecoder());//解码器
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast("handler",new ChatServerHandler());
}
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package chatroom;

import echo.EchoClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;

/**
* @Author: Usher
* @Description:
*/
public class ChatClient {
private String host;
private int port;

public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
//.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChatClientInitializer());

Channel channel = b.connect(host,port).sync().channel();
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));

while (true){
channel.writeAndFlush(input.readLine() + "\n");
}
//ChannelFuture f = b.connect().sync();
//f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully().sync();
}
}

public static void main(String[] args) throws InterruptedException {
new ChatClient("localhost",8888).start();
}
}

ChatClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package chatroom;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
* @Author: Usher
* @Description:
*/
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
//直接输出消息
System.out.println(s);
}
}

ChatClientInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package chatroom;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
* @Author: Usher
* @Description:
*/
public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("frame",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decode",new StringDecoder());//解码器
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast("handler",new ChatClientHandler());
}
}

运行效果

Server端,当有客户端连接时,离开时

Client端,当其他连接发消息,离开时