Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(一)

在前面的文章中,我们分析了在NIO模式下,服务端接收和读取多个客户端数据的情况;

本文,我们看下NIO模式下,使用Selector如何把数据发送给客户端。

一、直接通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写

二、注册OP_WRITE,在写状态满足情况下,通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写

我们看下第一种情况:

服务端在有数据需要发送给客户端的情况下,直接使用客户端对应的SocketChannel写数据即可,不考虑数据是否可以正常发送。

这和BIO下,直接通过输出流写数据是一样的

我们可以基于聊天室的功能,把收到的数据直接写给其他客户端

客户端代码统一使用BIO,如下:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * 基于BIO的TCP网络通信的客户端,接收控制台输入的数据,然后通过字节流发送给服务端
 *
 */
class ChatClient {
    public static void main(String[] args) throws IOException {
        // 连接server
        Socket serverSocket = new Socket("localhost", 9090);
        System.out.println("client connected to server");

        // 读取用户在控制台上的输入,并发送给服务器
        new Thread(new ClientThread(serverSocket)).start();

        // 接收服务端发送过来的数据
        try (InputStream serverSocketInputStream = serverSocket.getInputStream();) {
            byte[] buffer = new byte[1024];
            int len;
            while ((len = serverSocketInputStream.read(buffer)) != -1) {
                String data = new String(buffer, 0, len);
                System.out.println(
                    "client receive data from server" + serverSocketInputStream + " data size:" + len + ": " + data);
            }
        }

    }
}

class ClientThread implements Runnable {
    private Socket serverSocket;

    public ClientThread(Socket serverSocket) {
        this.serverSocket = serverSocket;
    }

    @Override
    public void run() {
        // 读取用户在控制台上的输入,并发送给服务器
        InputStream in = System.in;
        byte[] buffer = new byte[1024];
        int len;
        try (OutputStream outputStream = serverSocket.getOutputStream();) {
            // read操作阻塞,直到有数据可读,由于后面还要接收服务端转发过来的数据,这两个操作都是阻塞的,所以需要两个线程
            while ((len = in.read(buffer)) != -1) {
                String data = new String(buffer, 0, len);
                System.out.println("client receive data from console" + in + " : " + new String(buffer, 0, len));
                if ("exit\n".equals(data)) {
                    // 模拟客户端关闭连接
                    System.out.println("client close :" + serverSocket);
                    // 这里跳出循环后,try-with-resources 会自动关闭outputStream
                    break;
                }
                // 发送数据给服务器端
                outputStream.write(new String(buffer, 0, len).getBytes()); // 此时buffer中是有换行符
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

服务端代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
 * 服务端接收到数据后,直接写回客户端,或者写给其他客户端
 *
 */
class NIOSelectorDirectWriteServer {
    Selector selector;

    public static void main(String[] args) throws IOException {
        NIOSelectorDirectWriteServer server = new NIOSelectorDirectWriteServer();
        server.start(); // 开启监听和事件处理
    }

    public void start() {
        initServer();
        // selector非阻塞轮询有哪些感兴趣的事件到了
        doService();
    }

    private void doService() {
        if (selector == null) {
            System.out.println("server init failed, without doing read/write");
            return;
        }
        try {
            while (true) {
                while (selector.select() > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
                    Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove(); // 删除当前元素,防止重复处理
                        // 下面根据事件进行分别处理
                        if (key.isAcceptable()) {
                            // 客户端连接事件
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            // 读取客户端数据
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    private void initServer() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));

            // 此时在selector上注册感兴趣的事件
            // 这里先注册OP_ACCEPT: 客户端连接事件
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server init success");
        } catch (IOException exception) {
            exception.printStackTrace();
            System.out.println("server init failied");
        }
    }

    public void acceptHandler(SelectionKey key) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false); // 设置client非阻塞
            System.out.println("server receive a client :" + client);
            // 注册OP_READ事件,用于从客户端读取数据
            // 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
            ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
            client.register(key.selector(), SelectionKey.OP_READ, buffer);
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        System.out.println("read handler");
        SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
        ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
        buffer.clear(); // 使用前clear

        // 防止数据分包,需要while循环读取
        try {
            while (true) {
                int readLen = client.read(buffer);
                if (readLen > 0) {
                    // 读取到数据了
                    buffer.flip();
                    byte[] data = new byte[buffer.limit()];
                    buffer.get(data);
                    System.out.println("server read data from " + client + ", data is :" + new String(data));
                    // 转发给其他客户端
                    forwardMsg(client, data);
                } else if (readLen == 0) {
                    // 没读到数据
                    System.out.println(client + " : no data");
                    break;
                } else if (readLen < 0) {
                    // client 关闭连接
                    // 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
                    System.out.println(client + " close");
                    client.close();
                    break;
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
            // client 关闭连接
            System.out.println(client + " disconnect");
            // todo:disconnect 导致一直有read事件,怎么办?
            try {
                client.close();
            } catch (IOException ex) {
                System.out.println("close ex");
            }
        }
    }

    private void forwardMsg(SocketChannel myself, byte[] msg) throws IOException {
        Set<SelectionKey> keys = selector.keys();
        // read/write 对应同一个key,同一个client不会发送两遍
        for (SelectionKey key : keys) {
            SelectableChannel channel = key.channel();
            if (channel instanceof SocketChannel && channel != myself) {
                // 发送数据
                SocketChannel client = (SocketChannel) channel;
                System.out.println("forward msg to " + client);
                ByteBuffer buff = ByteBuffer.wrap(msg);
                while (buff.hasRemaining()) {
                    client.write(buff);
                }
            }
        }
    }
}

可以看到,这里我们在forwardMsg()方法中,直接通过channel 把数据转发出去了

测试:

先启动服务器,然后启动两个客户端

客户端1发送消息client1,服务端接收到后转发给客户端2

客户端2发送消息client2,服务端接收到后转发给客户端1

服务端日志:

server init success
server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
read handler
server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536], data is :client1

forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] : no data
read handler
server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549], data is :client2

forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] : no data

客户端1日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1

client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client2

客户端2日志:

client connected to server
client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client1

client2
client receive data from consolejava.io.BufferedInputStream@72cdfe9a : client2

测试:客户端1 正常退出关闭

客户端1日志:

exit
client receive data from consolejava.io.BufferedInputStream@65231a33 : exit

client close :Socket[addr=localhost/127.0.0.1,port=9090,localport=23536]
Exception in thread "main" java.net.SocketException: Socket closed
	at java.base/sun.nio.ch.NioSocketImpl.endRead(NioSocketImpl.java:248)
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:327)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.InputStream.read(InputStream.java:218)
	at com.huawei.io.chatroom.bio.ChatClient.main(ChatClient.java:29)

Process finished with exit code 1

服务端日志:

read handler
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] close

测试:客户端2异常断开

服务端日志:

read handler
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] disconnect
java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
	at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.readHandler(NIOSelectorDirectWriteServer.java:107)
	at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.doService(NIOSelectorDirectWriteServer.java:56)
	at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.start(NIOSelectorDirectWriteServer.java:34)
	at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.main(NIOSelectorDirectWriteServer.java:28)

二、注册OP_WRITE写数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579874.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot的考勤管理系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

Zynq 7000 系列中成功执行BootROM的条件

Zynq 7000设备的启动需要正确的电压序列和I/O引脚控制。BootROM的流程由复位类型、启动模式引脚设置以及启动映像来控制。BootROM对所选启动设备的引脚连接有特定的要求。 Zynq 7000 SoC设备具有电源、时钟和复位要求&#xff0c;这些要求必须得到满足&#xff0c;才能成功执行…

java:SpringBootWeb请求响应

Servlet 用java编写的服务器端程序 客户端发送请求至服务器 服务器启动并调用Servlet,Servlet根据客户端请求生成响应内容并将其传给服务器 服务器将响应返回给客户端 javaweb的工作原理 在SpringBoot进行web程序开发时,内置了一个核心的Servlet程序DispatcherServlet,称之…

RocketMQ快速入门:namesrv、broker、dashboard的作用及消息发送、消费流程(三)

0. 引言 接触rocketmq之后&#xff0c;大家首当其冲的就会发现需要安装3个组件&#xff1a;namesrv, broker, dashboard&#xff0c;其中dashboard也叫console&#xff0c;为选装。而这几个组件之前的关系是什么呢&#xff0c;消息发送和接收的过程是如何传递的呢&#xff0c;…

应用实战 | 别踩白块小游戏,邀请大家来PK挑战~

“踩白块会输”是一个简单的微信小程序游戏&#xff0c;灵感来自当年火热的别踩白块游戏&#xff0c;程序内分成三个模块&#xff1a;手残模式、经典模式和极速模式&#xff0c;分别对应由易到难的三种玩法&#xff0c;可以查看游戏排名。动画效果采用JS实现&#xff0c;小程序…

Spark-机器学习(6)分类学习之支持向量机

在之前的文章中&#xff0c;我们学习了分类学习之朴素贝叶斯算法&#xff0c;并带来简单案例&#xff0c;学习用法。想了解的朋友可以查看这篇文章。同时&#xff0c;希望我的文章能帮助到你&#xff0c;如果觉得我的文章写的不错&#xff0c;请留下你宝贵的点赞&#xff0c;谢…

基于YOLOV8+Pyqt5无人机航拍太阳能电池板检测系统

1.YOLOv8的基本原理 YOLOv8是一种前沿的目标检测技术&#xff0c;它基于先前YOLO版本在目标检测任务上的成功&#xff0c;进一步提升了性能和灵活性&#xff0c;在精度和速度方面都具有尖端性能。在之前YOLO 版本的基础上&#xff0c;YOLOv8 引入了新的功能和优化&#xff0c;…

SpringBoot 常用注解总结超详细(面试)

目录 一、组件相关&#x1f381; Controller Service Repository Component 二、依赖注入相关&#x1f349; Autowired Resource 根据类型注入&#xff08;By Type&#xff09; 根据名称注入&#xff08;By Name&#xff09; 区别 Qualifier Resource 和 Qualifie…

C语言浮点型数据在内存中的存储及取出等的介绍

文章目录 前言一、浮点型在内存中的存储二、浮点数存储规则三、浮点数在内存中的存储&#xff08;32位&#xff09;float类型四、浮点数在内存中的存储&#xff08;64位&#xff09;double类型五、指数E从内存中取出分成三种情况1. E不全为0或不全为12. E全为03. E全为1 六、有…

设计模式之工厂模式FactoryPattern(二)

一、简单工厂 package com.xu.demo.factoryPattern;/*** 简单工厂模式类*/ public class SimpleFactoryPattern {public static Phone create(String name) {//根据输入对象名称判断返回相匹配的对象if("IPhone".equals(name)) {//返回对象return new IPhone();}else…

Java算法--队列

队列 队列介绍 队列是一个有序列表&#xff0c;可以用数组或是链表来实现。遵循先入先出的原则。即&#xff1a;先存入队列的数据&#xff0c;要先取出。后存入的要后取出 数组模拟队列思路 队列本身是有序列表&#xff0c;若使用数组的结构来存储队列的数据&#xff0c;则…

自动驾驶新书“五一”节马上上市了

我和杨子江教授合写的《自动驾驶系统开发》终于在清华大学出版社三校稿之后即将在五一节后出版。 清华大学汽车学院的李克强教授和工程院院士撰写了序言。 该书得到了唯一华人图灵奖获得者姚期智院士、西安交大管晓宏教授和科学院院士以及杨强教授和院士等的推荐&#xff0c;…

git变更远端仓库名之后如何修改本地仓库配置的另一种方法?(删remote指针、添加、绑定master)

背景 如果某个远端的仓库地址变化后&#xff0c;本地仓库可以修改对应的remote。 之前谈过几种方法&#xff0c;比如重新设置一个新的remote的指针&#xff0c;绑定到新地址。然后删除origin&#xff0c;然后把新指针mv到origin。比如直接seturl修改&#xff08;git remote se…

基于HTML+CSS+JavaScript的表白网页

基于HTMLCSSJavaScript的表白网页 前言效果截图&#xff08;为GIF格式&#xff09;部分代码领取源码下期更新预报 前言 大部分人都有喜欢的人&#xff0c;学会这个表白代码&#xff0c;下次表白你肯定会成功。 效果截图&#xff08;为GIF格式&#xff09; 部分代码 index.htm…

使用 Python 和 DirectShow 从相机捕获图像

在 Python 中使用 OpenCV 是视觉应用程序原型的一个非常好的解决方案,它允许您快速起草和测试算法。处理从文件中读取的图像非常容易,如果要处理从相机捕获的图像,则不那么容易。OpenCV 提供了一些基本方法来访问链接到 PC 的相机(通过对象),但大多数时候,即使对于简单的…

在no branch上commit后,再切换到其他分支,找不到no branch分支的修改怎么办?

解决办法 通过git reflog我们可以查看历史提交记录&#xff0c;这里的第二条提交&#xff08;fbd3ea8&#xff09;就是我在no branch上的提交。 再通过git checkout -b backup fbd3ea8&#xff0c;恢复到上次提交的状态&#xff0c;并且为其创建个分支backup&#xff0c;此时…

B+tree - B+树深度解析+C语言实现+opencv绘图助解

Btree - B树深度解析C语言实现opencv绘图助解 1. 概述2. Btree介绍3. Btree算法实现3.1 插入分裂 3.2 删除向右借位&#xff08;左旋&#xff09;向左借位&#xff08;右旋&#xff09;合并 3.3 查询和遍历3.3.1 查询3.3.2 遍历 3.4 优化优化1(匀key)优化2(升级key)优化3(拓展兄…

池化整合多元数据库,zData X 一体机助力证券公司IT基础架构革新

引言 近期&#xff0c;云和恩墨 zData X 多元数据库一体机&#xff08;以下简称 zData X&#xff09;在某证券公司的OA、短信和CRM业务系统中成功上线&#xff0c;标志着其IT基础架构完成从集中式存储向池化高性能分布式存储的转变。zData X 成功整合了该证券公司使用的达梦、O…

SEO之链接原理(三)

初创企业需要建站的朋友看这篇文章&#xff0c;谢谢支持&#xff1a; 我给不会敲代码又想搭建网站的人建议 &#xff08;接上一篇&#xff09; 4、 Google PR PR是 PageRank 的缩写。Google PR理论是所有基于链接的搜索引擎理论中最有名的。 PR是Google创始人之一拉里佩奇发明…

二维数组打印菱形(C语言)

一、N-S流程图&#xff1b; 二、运行结果&#xff1b; 三、源代码&#xff1b; # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>int main() {//初始化变量值&#xff1b;char arr[5][5] { { , , *, , }, { , *, *, *, },{*, *, *, *, *}, { , *, *, *, …