绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
netty案例,netty4.1拓展篇七《Netty请求响应同步通信》
2019-12-04 11:30:20

前言介绍

在我们实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。

Dubbo:国内早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。 Motan:微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。 Tars:腾讯内部使用的 RPC 框架,于 2017 年对外开源,仅支持 C++ 语言。 Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC 框架,仅支持 Java 语言 gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。 Thrift:初是由 Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为 Apache 开源项目之一,支持多种语言。 hprose:一个MIT开源许可的新型轻量级跨语言跨平台的面向对象的高性能远程动态通讯中间件。它支持众多语言:nodeJs, C++, .NET, Java, Delphi, Objective-C, ActionScript, JavaScript, ASP, PHP, Python, Ruby, Perl, Golang 。

环境准备

  1. jdk1.8【jdk1.7以下只能部分支持netty】
  2. Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

代码示例

itstack-demo-rpc-2-07
└── src
    └── main
    │    └── java
    │        └── org.itstack.demo.netty
    │             ├── client
    │             │   ├── ClientSocket.java
    │             │   └── MyClientHandler.java  
    │             ├── codec
    │             │   ├── RpcDecoder.java
    │             │   └── RpcEncoder.java  
    │             ├── future
    │             │   ├── SyncWrite.java    
    │             │   ├── SyncWriteFuture.java  
    │             │   ├── SyncWriteMap.java 
    │             │   └── WriteFuture.java  
    │             ├── msg
    │             │   ├── Request.java
    │             │   └── Response.java 
    │             ├── server
    │             │   ├── MyServerHandler.java
    │             │   └── ServerSocket.java     
    │             └── util
    │                 └── SerializationUtil.java    
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── StartClient.java
                 └── StartServer.java    
复制代码

** 展示部分重要代码块,完整代码可以关注公众号获取;bugstack虫洞栈 **

MyClientHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        Response msg = (Response) obj;
        String requestId = msg.getRequestId();
        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
        if (future != null) {
            future.setResponse(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}
复制代码

SyncWrite.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class SyncWrite {

    public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {

        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (request == null) {
            throw new NullPointerException("request");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("timeout <= 0");
        }

        String requestId = UUID.randomUUID().toString();
        request.setRequestId(requestId);

        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
        SyncWriteMap.syncKey.put(request.getRequestId(), future);

        Response response = doWriteAndSync(channel, request, timeout, future);

        SyncWriteMap.syncKey.remove(request.getRequestId());
        return response;
    }

    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {

        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                writeFuture.setWriteResult(future.isSuccess());
                writeFuture.setCause(future.cause());
                //失败移除
                if (!writeFuture.isWriteSuccess()) {
                    SyncWriteMap.syncKey.remove(writeFuture.requestId());
                }
            }
        });

        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (response == null) {
            if (writeFuture.isTimeout()) {
                throw new TimeoutException();
            } else {
                // write exception
                throw new Exception(writeFuture.cause());
            }
        }
        return response;
    }

}
复制代码

MyServerHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj){
        Request msg = (Request) obj;
        //反馈
        Response request = new Response();
        request.setRequestId(msg.getRequestId());
        request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。");
        ctx.writeAndFlush(request);
        //释放
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

}
复制代码

StartClient.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartClient {

    private static ChannelFuture future;

    public static void main(String[] args) {
        ClientSocket client = new ClientSocket();
        new Thread(client).start();

        while (true) {
            try {
                //获取future,线程有等待处理时间
                if (null == future) {
                    future = client.getFuture();
                    Thread.sleep(500);
                    continue;
                }
                //构建发送参数
                Request request = new Request();
                request.setResult("查询{bugstack虫洞栈}用户信息");
                SyncWrite s = new SyncWrite();
                Response response = s.writeAndSync(future.channel(), request, 1000);
                System.out.println("调用结果:" + JSON.toJSON(response));
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

复制代码

StartServer.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartServer {

    public static void main(String[] args) {
        new Thread(new ServerSocket()).start();
        System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
    }

}
复制代码

测试结果

启动StartServer

itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}
复制代码

启动StartClient

调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"188ba00e-3a0d-4094-9475-c7ee93104011"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"bc9d63d4-9d37-406a-9c0f-a68211ac466f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"cceb510b-8179-46ab-abc6-eb7d5b6c0ac2"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f65aafd0-07b7-4bdb-be80-a57b4c58ad2d"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"e6700074-380b-441f-ae0d-f71dcd7f84c9"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"25a0a9d5-46d5-4da6-ad3f-1496ca20bb17"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"052dce21-dfb9-42d7-bbcf-46137b9933df"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4fd968fa-3171-4e4f-838d-4a215a90da00"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"97f6eb6e-8c5d-49f4-beba-2ba7e1ff953f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"ffc48115-8e62-43a8-b3f7-035390427d37"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4bafefc9-4beb-49f6-8126-29e0e03a55d1"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"10367786-24fd-4f11-9465-bcd2c87b4027"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"82293ad3-a694-405c-ab03-01624f38b1ad"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0c163cb8-be5f-4697-931f-61f5bf487bae"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0f1e7611-4fd0-43b4-86dc-fce09965046e"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f22ac0dc-974c-4de1-bcdf-1566ca0b2305"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"1907456b-2507-4ddd-8c1e-a86c990e3e72"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"6a99b1f2-5859-4ed7-9d17-98229c13250f"}

Process finished with exit code -1

复制代码

微信公众号:bugstack虫洞栈


微信公众号:bugstack虫洞栈,欢迎关注&获取源码

分享好友

分享这个小栈给你的朋友们,一起进步吧。

重学 Java 设计模式
创建时间:2020-06-03 14:48:59
技术好就一定能写出好代码吗?不能!再漂亮的马桶放到厨房都略显尴尬!无论是家里装修还是上道开车,只有通过实战才能快速将理论转变为技能。毕竟设计模式也是源于 克里斯托佛·亚历山大 的著作 《建筑模式语言》。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • 小傅哥
    栈主

小栈成员

查看更多
  • 小雨滴
  • 马延龙
  • wojiuzhuai
  • 水煮陈福
戳我,来吐槽~