handler处理如下
package com.amarky.websocket;
import org.apache.log4j.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
/**
*
* @author AMARKY
* @date 2016年9月8日
* @Desc 处理WEBSOCKET的请求
*/
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = Logger.getLogger(WebsocketServerHandler.class.getName());
private WebSocketServerHandshaker handsharker;
/**
* 接受数据
*/
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果是HTTP请求
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
// WEBSOCKET接入
else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* 数据处理完成后,刷新出去
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 处理HTTP请求
*
* @param ctx
* @param msg
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// 如果解码失败,返回异常
if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 构造握手相应
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8080/websocket", null, Boolean.FALSE);
handsharker = wsFactory.newHandshaker(req);
if (handsharker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handsharker.handshake(ctx.channel(), req);
}
}
/**
* 处理WEBSOCKET请求
*
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断链路是否关闭
if (frame instanceof CloseWebSocketFrame) {
handsharker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
new PongWebSocketFrame(frame.content().retain());
return;
}
// 只支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(
String.format("%s frame type not supported", frame.getClass().getName()));
}
String request = ((TextWebSocketFrame) frame).text();
logger.info(String.format("%s received %s", ctx.channel(), request));
System.out.println(String.format("%s received %s", ctx.channel(), request));
// ctx.channel().write(new TextWebSocketFrame(request + " , 欢迎使用netty
// websocket 服务,现在时刻是: ")
// + new java.util.Date().toString());
ctx.channel().writeAndFlush("欢迎使用netty websocket 服务,现在时刻是: " + new java.util.Date().toString());
}
/**
* 返回请求
*
* @param ctx
* @param req
* @param defaultFullHttpResponse
*/
private void sendHttpResponse(ChannelHandlerContext ctx, DefaultFullHttpResponse res) {
// 返回应答的消息
if (res.getStatus().code() != 200) {
ByteBuf byteBuf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(byteBuf);
byteBuf.release();
HttpHeaders.setContentLength(res, res.content().readableBytes());
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(res) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
websocket如下:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>netty 测试</title>
</head>
<body>
<script type="text/javascript">
var socket;
alert(window.WebSocket);
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8080/websocket");
socket.onopen = function(event){
var ta = document.getElementById('responseText');
ta.value = "打开websocket服务正常,支持websocket服务";
};
socket.onmessage = function(event){
var ta = document.getElementById('responseText');
ta.value = "";
ta.value = event.data;
};
socket.onclose = function(event){
var ta = document.getElementById('responseText');
ta.value = "";
ta.value = "websocket关闭";
};
}
else{
alert("不支持websocket");
}
function send(message){
if(!window.WebSocket){return;}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("链接没成功");
}
}
</script>
<form onsubmit="return false;">
<input type = "text" name = "message" value = "netty实践" />
<br><br>
<input type = "button" value = "发送" onclick="send(this.form.message.value)" />
<hr color="bule" />
<h3>服务端应答消息</h3>
<textarea id = "responseText" style = "width: 500px;height: 300px"></textarea>
</form>
</body>
</html>
socket.onmessage方法接受不到writeAndFlush写回的数据,不知道是什么原因