Netty的复习【2】AIO

水平触发和边缘触发

水平触发Level_triggered

当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读完(如读写缓冲区太小),那么下次调用epoll_wait()时,它还会通知你在上没读写万的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。

边缘触发Edge_triggered

当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,知道该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统中不会充斥大量你不关心的就绪文件描述符!!
select(),poll()模型都是水平触发模式,信号驱动IO是边缘触发模式,epoll()模型即支持水平触发,也支持边缘触发,默认是水平触发。

原生的JDK网络编程

BIO编程

BIO服务端编写

package run.runnable.xiangxue;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BioServer {

    private static ServerSocket server ;
    //线程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(5);

    private static void start() throws IOException {
        try{
            server = new ServerSocket(8000);
            System.out.println("服务已经启动,端口号:"+8000);
            while (true){
                Socket socket = server.accept();
                System.out.println("有新的客户端连接");
                executorService.execute(new BioServerhandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (server!=null){
                server.close();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        start();
    }
}

BIO服务端业务处理

package run.runnable.xiangxue;

import java.io.*;
import java.net.Socket;
import java.util.Date;

public class BioServerhandler extends Thread {

    Socket socket ;

    public BioServerhandler(Socket socket ) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try(BufferedReader in = new BufferedReader(
                new InputStreamReader(socket.getInputStream()))){
            PrintWriter out = new PrintWriter(socket.getOutputStream(),true);
            String msg ;
            String result;
            while ((msg = in.readLine()) != null){
                System.out.println("Server accept msg :"+msg);
                result = "Hello ,"+msg+",Now is "+new Date(System.currentTimeMillis());
                out.println(result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                socket = null;
            }
        }

    }
}

BIO客户端

package run.runnable.xiangxue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;

public class BioClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1",8000);
        System.out.println("请输入请求消息");
        new ReadMsg(socket).start();
        PrintWriter pw = null;
        while (true){
            pw = new PrintWriter(socket.getOutputStream());
            pw.println(new Scanner(System.in).next());
            pw.flush();
        }
    }

    private static class ReadMsg extends Thread{
        Socket socket;

        public ReadMsg(Socket socket){
            this.socket = socket;
        }

        @Override
        public void run() {
            try(BufferedReader br = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            )) {
                String line = null;
                while ((line=br.readLine())!=null){
                    System.out.printf("%S\n",line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if (socket!=null){
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                socket=null;
            }

        }
    }

}

image.png

原生网络编程AIO

image.png
异步IO采用"订阅-通知"模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情,当操作系统发生IO事件,并且准备好数据后,再主动通知应用程序,触发相应的函数。

当应用程序订阅操作系统的网络数据时,操作系统什么时候把数据传输过来,我们是不知道的,所以这里存在一个问题:应用程序怎么知道操作系统传输数据过来?
JDK中提供了一个接口CompletionHandler,这个接口提供了2个方法

/**
     * 在操作完成时调用
     *
     * @param   result  表示这个操作返回的结果对象
				比如接收服务器请求,那么返回的就是一个socket
				如果是读操作,那么就是在网络上读取到的字节数
     *          The result of the I/O operation.
     * @param   attachment  发起IO操作时附加的参数
     *          The object attached to the I/O operation when it was initiated.
     */
    void completed(V result, A attachment);

    /**
     * 在操作失败时调用
     *
     * @param   exc
     *          The exception to indicate why the I/O operation failed
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     */
    void failed(Throwable exc, A attachment);
}

方法的作用已经在注释上标注出来。那这和我们之前提到的问题有什么关系呢?关于就在于,当应用程序调用操作系统的数据时,应用程序对应的处理器一定要实现这个接口。
比如:两个Socket进行连接,当服务器端进行连接后,要新起一个socket处理客户端请求,如果是读数据,同样的在处理器中实现的completed方法中进行业务处理。
而falied就是当操作系统发现这个IO事件失败的时候,通知应用程序应该怎么处理,比如重新请求。

实现一个AIO处理

在实现之前我们要清楚应该实现哪几部分
image.png

首先从客户端开始

客户端的属性存在一个客户端处理器AioClientHandler,打开客户端的时候,新建连接,然后

package run.runnable.AIO;

import java.io.IOException;
import java.util.Scanner;

public class AioClient {

    private static AioClientHandler aioClientHandler;

    public static void start() throws IOException {
        if (aioClientHandler!=null){
            return;
        }
        aioClientHandler = new AioClientHandler("127.0.0.1",8000);
        new Thread(aioClientHandler,"client").start();
    }

    public static boolean sendMsg(String msg){
        if (msg.equals("exit")){
            return false;
        }
        aioClientHandler.sendMsg(msg);
        return true;
    }

    public static void main(String[] args) throws IOException {
        AioClient.start();
        System.out.println("请输入请求消息:");
        Scanner scanner = new Scanner(System.in);
        while (AioClient.sendMsg(scanner.nextLine()));
    }

}

AIO客户端处理器

package run.runnable.AIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class AioClientHandler implements CompletionHandler<Void,AioClientHandler>,Runnable {

    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AioClientHandler(String host, int port) throws IOException {
        this.host = host;
        this.port = port;
        //创建一个实际的客户端通道
        clientChannel = AsynchronousSocketChannel.open();
    }

    /**
     * 这个是当操作系统有数据后会执行的方法,通知我们已经连接到了服务器端
     * @param result
     * @param attachment
     */
    @Override
    public void completed(Void result, AioClientHandler attachment) {
        System.out.println("已经连接到客户端。");
    }

    @Override
    public void failed(Throwable exc, AioClientHandler attachment) {
        System.out.println("连接失败");
        latch.countDown();
        try {
            clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void run() {
        latch = new CountDownLatch(1);
        clientChannel.connect(new InetSocketAddress(host,port),this,this);

        try {
            latch.await();
            clientChannel.close();
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }

    }

    public void sendMsg(String msg){
        //把msg变成在网络上可以传递的格式
        byte[] bytes = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();

        //异步写,也就是写操作也是异步的,写操作也需要传入一个实现了CompletionHandler接口的处理器
        clientChannel.write(writeBuffer,writeBuffer,new AioClientWriteHandler(clientChannel,latch));
    }
}

AIO客户端读处理器

package run.runnable.AIO;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AioClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        String msg;
        try {
            msg = new String(bytes,"UTF-8");
            System.out.println("accept message:"+msg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.err.println("数据读取失败");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

AIO客户端写处理器

package run.runnable.AIO;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AioClientWriteHandler implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        if(attachment.hasRemaining()){
            clientChannel.write(attachment,attachment,this);
        }else {
            //读取服务端传回的数据
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            //异步读
            clientChannel.read(readBuffer,readBuffer,new AioClientReadHandler(clientChannel,latch));
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("发送数据失败。。。");
        try{
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

# Netty 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×