V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Joker123456789
V2EX  ›  Java

关于 NIO 非阻塞的问题

  •  
  •   Joker123456789 · 2021-01-27 13:35:56 +08:00 · 2364 次点击
    这是一个创建于 1394 天前的主题,其中的信息可能已经有所发展或是发生改变。

    下面这段代码,在内部循环的 “else if (selectionKey.isReadable()) {” 这个 if 里面, 如果用阻塞的方式进行数据读取,处理业务逻辑,响应等一系列操作,会造成整个循环阻塞,造成 NIO 的非阻塞特性丢失。

    请问有什么办法 可以解决这个问题?

    private void doMonitor(Selector selector) throws Exception {
            while (true) {
                int eventCountTriggered = selector.select();
                if (eventCountTriggered == 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    it.remove();
    
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        try {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } catch (IOException e) {
                            log.error("注册 SocketChannel 异常", e);
                        }
                    } else if (selectionKey.isReadable()) {
                        // 读取数据并处理业务逻辑,然后响应数据
                    }
                }
            }
    }
    
    17 条回复    2021-01-27 17:24:18 +08:00
    zhuawadao
        1
    zhuawadao  
       2021-01-27 13:42:59 +08:00
    多线程啊,消息队列啊
    Joker123456789
        2
    Joker123456789  
    OP
       2021-01-27 13:43:30 +08:00
    @zhuawadao 多线程 报错,早试过了。
    cuantianhou
        3
    cuantianhou  
       2021-01-27 13:44:21 +08:00
    else if 里面的数据已经是准备好的了,不会阻塞了,但是你处理的逻辑如果太耗时,就单独起个线程处理
    Joker123456789
        4
    Joker123456789  
    OP
       2021-01-27 13:49:37 +08:00
    @cuantianhou 我刚才试验过,发起一个请求,在 else if 里的东西执行完之前,接着发起第二个请求,就被阻塞了。 不知道什么原因
    oxromantic
        5
    oxromantic  
       2021-01-27 13:57:21 +08:00
    @Joker123456789 “在 else if 里的东西执行完之前” NIO 是单线程模型,吃不消你在他的线程执行业务逻辑的,你只能在他的线程做数据读取操作,并立刻交还权限,把任务丢给其他线程处理
    sagaxu
        6
    sagaxu  
       2021-01-27 14:04:20 +08:00 via Android
    你不能阻塞 else if,也不能在里面消耗过多 CPU,一般不宜超过 10ms
    yamasa
        7
    yamasa  
       2021-01-27 14:10:45 +08:00
    NIO 的 IO 线程只应该负责 poll 各个端口,poll 到 IO 事件要进行业务处理的话肯定得分发切换到业务线程池,你这个写法就不对。Netty 里也有类似你这段代码 while true poll IO event 的核心逻辑,建议看看。
    liian2019
        8
    liian2019  
       2021-01-27 14:10:47 +08:00
    业务处理用多线程处理,另外,业务处理完成手动调用 selector.wakeup(),会立刻唤醒 selector 。可以看看单 Reactor 多线程模型,多 Reactor 多线程模型( netty 就是这么搞的)。
    v2orz
        9
    v2orz  
       2021-01-27 14:12:53 +08:00
    如 3L 所说,正常情况下读取数据并处理应该很快
    如果你的业务逻辑太重导致执行慢,应该考虑使用 Future f = executorService.submit(runnable)的方式(或者其他多线程方式)
    v2orz
        10
    v2orz  
       2021-01-27 14:14:20 +08:00
    这里本身就应该有个业务线程池,不断把读到的数据提交给业务线程
    zhuawadao
        11
    zhuawadao  
       2021-01-27 14:20:07 +08:00
    @Joker123456789 思路没错,多线程报错是多线程的问题。
    assiadamo
        12
    assiadamo  
       2021-01-27 14:20:20 +08:00
    使用 IO 多复用时不应该用来跑会阻塞的任务,你这种情况只能将阻塞的任务改成异步的
    liian2019
        13
    liian2019  
       2021-01-27 14:20:46 +08:00
    接 8L,可以参考下最简单的示例代码
    if (next.isAcceptable()){
    ...
    }
    else if (next.isReadable()){
    SocketChannel readChannel = (SocketChannel) next.channel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    ...
    Response message = new Response(messageByte);
    next.attach(message);
    threadPool.execute(() -> {
    log.info("接收到消息 {} ... 处理中...",message);
    ...
    message.setStatus(true);
    message.setData(...)
    next.interestOps(SelectionKey.OP_WRITE);
    selector.wakeup();

    });
    }
    else {
    String responseMessage = next.attachment().toString();
    SocketChannel writeChannel = (SocketChannel) next.channel();
    writeChannel.write(ByteBuffer.wrap(responseMessage.getBytes()));
    next.interestOps(SelectionKey.OP_READ);
    }
    mazai
        14
    mazai  
       2021-01-27 14:23:02 +08:00
    NIO 是同步非阻塞,非阻塞这个特性只是相对于 OS 层面是非阻塞的,代码层面还是同步的,第一次发起请求,来了一个 OP_READ 事件,单线程开始处理 isReadable 里的代码逻辑,这时候你再发一次请求必然会等待这个线程处理完代码逻辑。

    确实可以参考 Reactor 线程模型,来一个 master 线程,负责接受请求,接受完请求把这些请求分发给多个 worker 来处理业务逻辑,这样就不会阻塞你的请求了。
    wucao219101
        15
    wucao219101  
       2021-01-27 14:54:42 +08:00
    你这个代码相当于单线程来处理所有的 IO 事件,如果处理事件的业务逻辑本身有阻塞耗时的逻辑,那么肯定要另外再维护一个线程池。

    Node.js 能单线程处理的原因是它所有的业务逻辑都是异步的,没有阻塞的任务存在。Java 不一样,你查询 DB 、调用外部接口、读写文件等,都会阻塞当前线程,所以不去弄个线程池性能肯定上不去。就是 Netty 他也是最好用外部线程池 EventExecutorGroup 来处理业务逻辑的。
    Joker123456789
        16
    Joker123456789  
    OP
       2021-01-27 15:39:33 +08:00
    @wucao219101
    @mazai
    @v2orz

    用线程池做了以后,现在的问题是这样的

    ```java
    while (it.hasNext()) {
    SelectionKey selectionKey = it.next();
    it.remove();

    if (selectionKey.isAcceptable()) {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
    try {
    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.register(selector, SelectionKey.OP_READ);
    } catch (IOException e) {
    log.error("注册 SocketChannel 异常", e);
    }
    } else if (selectionKey.isReadable()) {
    log.info("进来了=========================================");
    Request request = new Request();
    request.selectionKey = selectionKey;
    request.selector = selector;

    Executors.newCachedThreadPool().execute(request);

    // selector.wakeup();
    }
    }
    ```

    直发起了一个请求,但是 这句 log 打印了好多次。 控制台还报异常。
    mightofcode
        17
    mightofcode  
       2021-01-27 17:24:18 +08:00
    你有两个选择:
    1,doMonitor 这个线程里面所有操作都是异步的,不存在任何阻塞
    2,另开线程进行处理阻塞任务
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   996 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 20:20 · PVG 04:20 · LAX 12:20 · JFK 15:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.