Apache Mina2实现类似QQ、飞秋的聊天功能

至于MINA2是什么,这里就不介绍了,有兴趣的可以自己去问一下度娘!

首先使用MINA2的时候,你最起码的知道怎么下载jar包和在工程中引入哪些jar包。

附上下载地址:http://mirrors.cnnic.cn/apache/mina/mina/2.0.7/dist/apache-mina-2.0.7-bin.zip 和必须引入工程的jar包:mina-core-2.0.7.jar;slf4j-api-1.6.6.jar;slf4j-log4j12-1.7.5.jar,不多说直接上代码:

 

第一步:编写通信要用的编码器类、解码器类和编解码工厂类

1、字符编码:

/**
 * @Description: 
 *
 * @Title: CharsetEncoder.java
 * @Package com.joyce.mina.code
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:48:53
 * @version V2.0
 */
package com.joyce.mina.code;

import java.nio.ByteOrder;
import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

/**
 * @Description: 字符编码
 *
 * @ClassName: CharsetEncoder
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:48:53
 * @version V2.0
 */
public class CharsetEncoder implements ProtocolEncoder {
	
	/**
	 * Log4j日志
	 */
	private static final Logger logger = Logger.getLogger(CharsetEncoder.class);
	
	private static final Charset charset = Charset.forName("UTF-8");

	@Override
	public void dispose(IoSession session) throws Exception {
		logger.info("#####################dispose#########################");
	}

	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		logger.debug("#####################encode#########################");
		IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);
		buff.order(ByteOrder.LITTLE_ENDIAN);
		String smsgbody = message.toString();
		byte[] bytearr = smsgbody.getBytes(charset);
		buff.put(bytearr);
		// 为下一次读取数据做准备
		buff.flip();
		out.write(buff);
	}
}

 

2、字符解码:

/**
 * @Description: 
 *
 * @Title: CharsetDecoder.java
 * @Package com.joyce.mina.code
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:35:49
 * @version V2.0
 */
package com.joyce.mina.code;

import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * @Description:  字符解码
 *
 * @ClassName: CharsetDecoder
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:35:49
 * @version V2.0
 */
public class CharsetDecoder implements ProtocolDecoder {
	
	/**
	 * Log4j日志
	 */
	private final static Logger logger = Logger.getLogger(CharsetDecoder.class);
	
	private static final Charset charset = Charset.forName("UTF-8");
	// 可变的IoBuffer数据缓冲区
	private IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);

	@Override
	public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		logger.debug("#####################decode#########################");
		// 如果有信息
		if (in.hasRemaining()) {
			// 判断消息是否是结束符,不同平台的结束符也不一样;
                        // windows换行符(\r\n)就认为是一个完整消息的结束符了; UNIX 是\n;MAC 是\r
			byte b = in.get();
			if (b == '\n') {
				buff.flip();
				byte[] bytes = new byte[buff.limit()];
				buff.get(bytes);
				String message = new String(bytes, charset);
				buff = IoBuffer.allocate(100).setAutoExpand(true);
				// 如果结束了,就写入转码后的数据
				out.write(message);
			} else {
				buff.put(b);
			}
		}
	}

	@Override
	public void dispose(IoSession session) throws Exception {
		logger.info("#####################dispose#########################");
		logger.info(session.getCurrentWriteMessage());
		
	}

	@Override
	public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
		logger.info("#####################完成解码#########################");
	}
}

上面的decode方法是解码方法,它主要是把读取到数据中的换行符去掉。因为在mina通信协议中以换行符为结束符,如果不定义结束符那么程序会在那里一直等待下一条发送的数据。

这里用到了IoBuffer,MiNa中传输的所有二进制信息都存放在IoBuffer中,IoBuffer是对Java NIO中ByteBuffer的封装(Mina2.0以前版本这个接口也是ByteBuffer),提供了更多操作二进制数据,对象的方法,并且存储空间可以自增长,用起来非常方便;简单理解,它就是个可变长度的byte字节数组!

1). static IoBuffer allocate(int capacity,boolean useDirectBuffer)

创建IoBuffer实例,第一个参数指定初始化容量,第二个参数指定使用直接缓冲区还是JAVA 内存堆的缓存区,默认为false。

2).IoBuffer setAutoExpand(boolean autoExpand)

这个方法设置IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那么可以看出长度可变这个特性默认是不开启的。

3). IoBuffer flip()

limit=position, position=0,重置mask,为了读取做好准备,一般是结束buffer操作,将buffer写入输出流时调用;这个必须要调用,否则极有可能position!=limit,导致position后面没有数据;每次写入数据到输出流时,必须确保position=limit。

4). IoBuffer clear()与IoBuffer reset()

clear:limit=capacity , position=0,重置mark;它是不清空数据,但从头开始存放数据做准备—相当于覆盖老数据。

reset就是清空数据

5). int remaining()与boolean hasRemaining()

这两个方法一般是在调用了flip方法后使用的,remaining()是返回limt-position的值!hasRemaining()则是判断当前是否有数据,返回position < limit的boolean值!

3、编解码过滤工厂:

/**
 * @Description: 
 *
 * @Title: CharsetCodecFactory.java
 * @Package com.joyce.mina.code.factory
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:34:14
 * @version V2.0
 */
package com.joyce.mina.code.factory;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

import com.joyce.mina.code.CharsetDecoder;
import com.joyce.mina.code.CharsetEncoder;

/**
 * @Description: 字符编码、解码工厂类,编码过滤工厂
 *
 * @ClassName: CharsetCodecFactory
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:34:14
 * @version V2.0
 */
public class CharsetCodecFactory implements ProtocolCodecFactory {

	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return new CharsetDecoder();
	}

	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return new CharsetEncoder();
	}
}

 

 

第二步:编写IoHandler实现类代码

IoHander是Io读写的事件驱动类,这里的Io操作都会触发里面的事件。所有的业务逻辑都应当在这个类中完成,个人建议如果能封装出来的尽量封装出来,否则业务代码会很多很大!我这里就实现了封装

/**
 * @Description: 
 *
 * @Title: ServerMessageReceived.java
 * @Package com.joyce.mina.message
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-21 上午08:54:13
 * @version V2.0
 */
package com.joyce.mina.message;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;

import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;

/**
 * @Description: 消息处理
 *
 * @ClassName: ServerMessageReceived
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-21 上午08:54:13
 * @version V2.0
 */
public class ServerMessageUtil {
	
	/**
	 * Log4j日志对象
	 */
	private static final Logger logger = Logger.getLogger(ServerMessageUtil.class);
	
	/**
	 * @Description: 服务器接收信息处理
	 *
	 * @param session
	 * @param message
	 *
	 * @Title: ServerMessageReceived.java
	 * @Copyright: Copyright (c) 2014
	 *
	 * @author Comsys-LZP
	 * @date 2014-3-21 上午08:55:53
	 * @version V2.0
	 */
	public static void messageReceived(IoSession session, Object message){
		String content = message.toString();
		String datetime = ServerMessageUtil.getNowDate();
		// 拿到所有的客户端Session
		Collection<IoSession> sessions = ServerMessageUtil.getAllClient(session);
		// 向所有客户端发送数据
		if(content.trim().equals("1")){
			logger.info("在线用户列表:");
			session.write("在线用户列表:\r\n");
			for (IoSession sess : sessions) {
				logger.info("Client" + sess.getId() + ":\t" + sess.getRemoteAddress());
				session.write("Client" + sess.getId() + ":\t" + sess.getRemoteAddress() + "\r\n");
			}
		} else if("2".equals(content.trim())){
			logger.info("说明:");
			StringBuffer sb = new StringBuffer();
			sb.append("说明:");
			sb.append("\r\n");
			sb.append("1、使用@符号进行私聊(@127.0.0.1#你好!-- @ip#你好!)");
			sb.append("\r\n");
			sb.append("2、无@符号属于群聊");
			sb.append("\r\n");
			sb.append("3、新增quit和exit命令退出");
			sb.append("\r\n");
			session.write(sb);
		} else if("quit".equalsIgnoreCase(content.trim()) || "exit".equalsIgnoreCase(content.trim())){
			session.close(true);
		} else if(content.trim().indexOf("@") != -1 && content.trim().indexOf("#") != -1){
			String ip = content.substring(content.indexOf("@") + 1,content.indexOf("#"));
			for (IoSession sess : sessions) {
				logger.info(sess.getRemoteAddress() + "-->IP:" + ip);
				if(sess.getRemoteAddress().toString().indexOf(ip) != -1){
					logger.info("Clinet:" + session.getRemoteAddress() + "即将对" + sess.getRemoteAddress() + "私聊");
					session.write("您对客户端:" + sess.getRemoteAddress() + "说:" + content.substring(content.indexOf("#")+1) + "\n");
					sess.write("客户端:" + session.getRemoteAddress() + "对您说:" + content.substring(content.indexOf("#")+1) + "\n");
				}
			}
		} else if(!"\r".equals(content)) {
			for (IoSession sess : sessions) {
				logger.info("转发给:" + session.getRemoteAddress() + "客户端   messageReceived: " + datetime + "\t" + content);
				sess.write("客户端:" + session.getRemoteAddress() + "在" + datetime + "对所有人说:\t" + content + "\n");
			}
		} else {
			session.write("\r");
		}
	}
	
	/**
	 * @Description: 获取连接所有客户端 
	 *
	 * @param session
	 * @return
	 *
	 * @Title: ServerMessageUtil.java
	 * @Copyright: Copyright (c) 2014
	 *
	 * @author Comsys-LZP
	 * @date 2014-3-21 上午11:26:47
	 * @version V2.0
	 */
	public static Collection<IoSession> getAllClient(IoSession session){
		// 拿到所有的客户端Session
        	return session.getService().getManagedSessions().values();
	}
	
	/**
	 * @Description: 获取现在时间,并且格式为yyyy-MM-dd hh:mm:ss
	 *
	 * @return
	 *
	 * @Title: ServerMessageUtil.java
	 * @Copyright: Copyright (c) 2014
	 *
	 * @author Comsys-LZP
	 * @date 2014-3-21 上午11:34:09
	 * @version V2.0
	 */
	public static String getNowDate(){
		 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
		 return sdf.format(new Date());
	}
}

 

上面的就是封装的一些常用的方法和业务处理等。接下来就是IoHandler实现类了

/**
 * @Description: 
 *
 * @Title: ServerMessageHandler.java
 * @Package com.joyce.mina.server.message
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:54:31
 * @version V2.0
 */
package com.joyce.mina.server.message;

import java.util.Collection;

import org.apache.log4j.Logger;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import com.joyce.mina.message.ServerMessageUtil;

/**
 * @Description: 处理服务器端消息
 *
 * @ClassName: ServerMessageHandler
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 上午11:54:31
 * @version V2.0
 */
public class ServerMessageHandler implements IoHandler {
	
	private static final Logger logger = Logger.getLogger(ServerMessageHandler.class);

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		logger.info("服务器发生异常:" + cause.getMessage());
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		logger.info("服务器接收到数据: " + message);
		ServerMessageUtil.messageReceived(session, message);
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		logger.info("服务器发送消息:" + message);
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		logger.info("关闭当前session:" + session.getId() + "#" + session.getRemoteAddress());
        	CloseFuture closeFuture = session.close(true);
        	closeFuture.addListener(new IoFutureListener<IoFuture>() {
			@Override
			public void operationComplete(IoFuture future) {
				if (future instanceof CloseFuture) {
					((CloseFuture) future).setClosed();
					logger.info("sessionClosed CloseFuture setClosed--> " + future.getSession().getId());
				}
			}
		});
        
        	// 获取所有客户端
        	Collection<IoSession> sessions = ServerMessageUtil.getAllClient(session);
        	// 给每个客户端发送离开信息
        	for (IoSession sess : sessions) {
			sess.write("客户端:" + session.getRemoteAddress() + "在" + ServerMessageUtil.getNowDate() + "离开");
		}
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		logger.info("创建一个新连接:" + session.getRemoteAddress());
		StringBuffer info = new StringBuffer();
		info.append("welcome to the joyce chat room !");
		info.append("\r\n\r\n");
		session.write(info);
		
		// 获取所有在线客户端
		Collection<IoSession> sessions = ServerMessageUtil.getAllClient(session);
		for (IoSession sess : sessions) {
			StringBuffer welcome = new StringBuffer();
			welcome.append("客户端:" + session.getRemoteAddress() + "在" + ServerMessageUtil.getNowDate() + "加入!");
			welcome.append("\r\n\r\n");
			sess.write(welcome);
		}
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		logger.info("当前连接" + session.getRemoteAddress() + "处于空闲状态:" + status);
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		logger.info("打开一个session:" + session.getId() + "#" + session.getBothIdleCount());
		StringBuffer info = new StringBuffer();
		info.append("1:获取所有在线用户\t2:说明");
		info.append("\r\n");
		session.write(info);
	}
}

sessionCreated:当一个新的连接建立时,由I/O processor thread调用;

sessionOpened:当连接打开是调用;

messageReceived: 当接收了一个消息时调用;

messageSent:当一个消息被(IoSession#write)发送出去后调用;

sessionIdle:当连接进入空闲状态时调用;

sessionClosed:当连接关闭时调用;

exceptionCaught:实现IoHandler的类抛出异常时调用;

一般情况下,我们最关心的只有messageReceived方法,接收消息并处理,然后调用IoSession的write方法发送出消息!(注意:这里接收到的消息都是Java对象,在IoFilter中所有二进制数据都被解码)一般情况下很少有人实现IoHandler接口,而是继承它的一个实现类IoHandlerAdapter,这样不用覆盖它的7个方法,只需要根据具体需求覆盖其中的几个方法就可以!

Iohandler的7个方法其实是根据session的4个状态值间变化来调用的:

 Connected:会话被创建并使用;

 Idle:会话在一段时间(可配置)内没有任何请求到达,进入空闲状态;

 Closing:会话将被关闭(剩余message将被强制flush);

 Closed:会话被关闭;

 

第三步:编写Server启动类,bind端口,设置编码过程和核心业务处理器

/**
 * @Description: 
 *
 * @Title: MinaServer.java
 * @Package com.joyce.mina.server
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 下午12:35:48
 * @version V2.0
 */
package com.joyce.mina.server;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.joyce.mina.code.factory.CharsetCodecFactory;
import com.joyce.mina.server.message.ServerMessageHandler;

/**
 * @Description: 服务器启动类 
 *
 * @ClassName: MinaServer
 * @Copyright: Copyright (c) 2014
 *
 * @author Comsys-LZP
 * @date 2014-3-19 下午12:35:48
 * @version V2.0
 */
public class MinaServer {
	
	private SocketAcceptor acceptor;

	/**
	 * 
	 */
	public MinaServer() {
		// 创建非阻塞的server端的Socket连接
        acceptor = new NioSocketAcceptor();
	}
	
	public boolean start(){
		// 获取过滤器链
		DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
		// 添加编码过滤器 处理乱码、编码问题
		filterChain.addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
		
		// 添加日志过滤器
		LoggingFilter loggingFilter = new LoggingFilter();
        	loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
        	loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
        	filterChain.addLast("logger", loggingFilter);
        
		// 设置核心消息业务处理器
        	acceptor.setHandler(new ServerMessageHandler());
		// 设置session配置,30秒内无操作进入空闲状态
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
		try {
			// 绑定端口3456
            		acceptor.bind(new InetSocketAddress(3456));
        	} catch (IOException e) {
            		return false;
        	}
		return true;
	}
	
	public static void main(String[] args) {
		MinaServer server = new MinaServer();
        	server.start();
	}
}

这里只是一个简单的MINA2服务器的启动,框架很成熟很强大,可以通过Spring配置!这方面的先不扯开,有机会再分享!

此时就可以通过输入http://localhost:3456 来访问了,在控制台会有大量数据输出,绝大部分是浏览器的相关信息和服务器发出去的信息!如果懂telnet的程序猿可以去通过telnet连接上去,效果会更佳!

就说到这里了!

 

来附上完整项目的地址:http://download.csdn.net/download/luo201227/7101013

 

转载至:http://blog.csdn.net/luo201227/article/details/22150043

此条目发表在Java分类目录。将固定链接加入收藏夹。

发表评论