本文共 6431 字,大约阅读时间需要 21 分钟。
网络编程基本模型Client/Server模型,即两个进程之间的互相通信,服务端提供位置信息(binding IP & Port) ,客户端通过连接操作向服务端监听地址发起连接, 三次握手建立连接,若成功,双方基于Socket通信。
传统BIO模型:
ServerSocket:绑定IP,启动监听port; Socket:发起连接。成功后,双方通过输入输出流同步阻塞通信。BIO通信模型问题:
每来一个新的客户端请求,服务端必须创建一个新线程处理。一个请求对应一个线程,不能满足高性能的要求。// 服务端public class TimeServer { public static void main(String[] args) { /*设默认端口*/ int port = 8080; if (args != null && args.length > 0) { port = Integer.parseInt(args[0]); } ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println(" time server start in port " + port); Socket socket; /*监听客户端连接,若无,主线程阻塞在accpet上*/ while (true) { socket = serverSocket.accept(); /*每来一个新客户端请求都创建一个新线程处理*/ new Thread(new TimeServerHandler(socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { /* 关闭资源*/ if (serverSocket != null) { System.out.println(" time server close "); try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }}// 处理socket的线程@AllArgsConstructorpublic class TimeServerHandler implements Runnable { private Socket socket; @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); while (true) { String line = in.readLine(); if (Objects.isNull(line)) { break; } System.out.println(" time sever get order : " + line); final String currentTime = "query time order".equalsIgnoreCase(line) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; out.println(currentTime); } } catch (IOException e) { e.printStackTrace(); }finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); } if (this.socket != null) { try { this.socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }}// 客户端public class TimeClient { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { port = Integer.valueOf(args[0]); } Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket("127.0.0.1", port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("query time order"); System.out.println(" send order 2 server succeed"); String response = in.readLine(); System.out.println("NOW is :" + response); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
为改善这种一个连接一个线程的模型,引入线程池或消息队列实现1个或多个线程N个客户端的模型,但底层通信使用同步阻塞IO,称“伪异步”
为解决同步阻塞IO面临的一个链路需要一个线程处理的问题,可能用线程池模型进行优化—-请求接入后扔到线程池中处理,形成客户端数M: 线程池最大线程数N的比例关系,M可远大于N。通过线程池可灵活调配线程资源,设置maxPoolSize,防止高并发导致线程资源用尽。
模型:
// 伪异步时间服务器public class FakeAsyncTimeServer { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { port = Integer.parseInt(args[0]); } try(ServerSocket serverSocket = new ServerSocket(port)) { System.out.println(" time server starts in port :" + port); /*请求处理线程池:JDK线程池维护一个消息队列和N个活跃线程,对消息队列中任务进行处理*/ TimeServerHandlerThreadPool threadPool = new TimeServerHandlerThreadPool(50, 10000); Socket socket; while (true) { socket = serverSocket.accept(); /*客户端socket封装成了一个Task,实现Runnable接口*/ threadPool.execute(new TimeServerHandler(socket)); } } catch (IOException e) { e.printStackTrace(); } finally { } }}// 线程池public class TimeServerHandlerThreadPool { private ExecutorService executorService; public TimeServerHandlerThreadPool(int maxPoolSize, int qSize) { executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(qSize)); } public void execute(Runnable task) { executorService.execute(task); }}
伪异步存在着弊端:
1) 读socket,read()会阻塞,除非* 有数据可读,或可用数据已读完,或发生异常*InputStream# read public int read(byte b[]) throws IOException { return read(b, 0, b.length); }
就是说:假如发请求或应答慢,或网络慢,读的一方的通信线程会被长时间阻塞,对方要60s完成,读的一方也阻塞60s,在此期间,其他接入消息只能在队列中排队。
2)写socket,write()会阻塞,除非所有要写的字节写完,或发生异常。
public void write(byte b[]) throws IOException { write(b, 0, b.length); }
根据TCP、IP,消息的接收方处理缓慢时,将不能及时从TCP缓冲区读,导致发送方的TCP window size 减小,直到0,双方处在keep-alive状态,发送方不会再向TCP缓冲写入消息,
此时若采用同步阻塞IO,write会无限阻塞,直到TCP window size > 0,或IO异常。可见:即使是伪异步,读、写都是同步阻塞的,阻塞时间取决于对方IO线程处理速度和网络IO传输速度。实际生产中,网络可靠性恶劣,问题可能爆发。
伪异步是BIO简单优化,并不能从根本上解决同步IO导致通信线程阻塞的问题。常见一种通信对方返回应答时间过长而导致的级联故障:
1)服务端处理慢,返回应答要60s,平时10ms 2)伪异步IO线程正在读故障服务节点响应,由于读取输入流是阻塞的,该线程会被阻塞60s 3)若所有可用线程被故障服务器阻塞,后续所有IO消息都在队列排队 4)线程池的阻塞队列打满,后续入队列操作被阻塞 5)前端的Acceptor线程接收客户端接入,被阻塞在线程池的同步阻塞队列之后,新的客户端请求被拒绝,客户端发生大面积超时 6)几乎所有连接都超时,调用者认为系统崩溃,不能接受新请求。一言以蔽之:60s的应答时间太长,线程池被打满,新请求不能被处理,caller认为服务器奔溃。(跟DB 连接池被打满,新请求拿不到connection,等待超时后,一片告警很类似)