博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
vertx的HttpServer模块
阅读量:4429 次
发布时间:2019-06-07

本文共 11117 字,大约阅读时间需要 37 分钟。

Start HttpServer

/**   * 启动 HttpServer   * multi instances 采用 synchronized防止线程安全问题   * addHandlers 方法是actor模式的实现(EventLoopPoolSize >= instances):  *               1 instances : 1 verticle(actor) : 1 VertxThread(Eventloop)   */ public synchronized HttpServer listen(int port, String host, Handler
> listenHandler) { //是否有配置requestHandler或webscoket if (requestStream.handler() == null && wsStream.handler() == null) { throw new IllegalStateException("Set request or websocket handler first"); } if (listening) { throw new IllegalStateException("Already listening"); } listenContext = vertx.getOrCreateContext(); //根据currentThread 获取Context,获取null则create serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;//判断是否启用ssl List
applicationProtocols = options.getAlpnVersions();//获取协议版本,默认支持1.1和2.0 if (listenContext.isWorkerContext()) {
//是否使用 Worker Verticles ,不予许使用HTTP2.0 applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList()); } sslHelper.setApplicationProtocols(applicationProtocols);//应用协议 synchronized (vertx.sharedHttpServers()) {
// 监听多个不同网络接口(ip:port) Httpserver 防止并发 this.actualPort = port; id = new ServerID(port, host);//生成服务id HttpServerImpl shared = vertx.sharedHttpServers().get(id); if (shared == null || port == 0) {
// mutil instances 的情况,利用 mutli core cpu /** * frist instances */ serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); ServerBootstrap bootstrap = new ServerBootstrap(); //定义两个线程组,accept size 1, 重写的VertxEventLoopGroup bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers); applyConnectionOptions(bootstrap);//添加Connection Accept之后的附属选项 sslHelper.validate(vertx);//验证ssl相关参数 bootstrap.childHandler(new ChannelInitializer
() { @Override /** * connection accept 调度切换线程后触发 */ protected void initChannel(Channel ch) throws Exception {  //限流策略,读大于写,导致内存无限扩大,最终 OOM if (requestStream.isPaused() || wsStream.isPaused()) { ch.close(); //超过服务承载能力,关闭连接 return; } ChannelPipeline pipeline = ch.pipeline(); if (sslHelper.isSSL()) {
//是否启用ssl io.netty.util.concurrent.Future
handshakeFuture; if (options.isSni()) {
//是否启用sni,单服务多证书情况 VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx); pipeline.addLast(sniHandler); handshakeFuture = sniHandler.handshakeFuture(); } else { SslHandler handler = new SslHandler(sslHelper.createEngine(vertx)); pipeline.addLast("ssl", handler); handshakeFuture = handler.handshakeFuture(); } //侦听 TLS handshake handshakeFuture.addListener(future -> { if (future.isSuccess()) {
// 握手成功 if (options.isUseAlpn()) {
//是否启用alpn,协调使用的protocol //获取使用的协议 SslHandler sslHandler = pipeline.get(SslHandler.class); String protocol = sslHandler.applicationProtocol(); if ("h2".equals(protocol)) {
//是否是http2.0 handleHttp2(ch); } else { handleHttp1(ch); } } else { handleHttp1(ch); } } else {
//握手失败 HandlerHolder
handler = httpHandlerMgr.chooseHandler(ch.eventLoop()); handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(future.cause())); } }); } else { //是否是启用http2,通过VM Options: -Dvertx.disableH2c 设置;默认false if (DISABLE_H2C) { handleHttp1(ch); } else { IdleStateHandler idle; if (options.getIdleTimeout() > 0) { //是否定义最大空闲时间 pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout())); } else { idle = null; } /**直接使用明文的http2.0或1.1处理*/ pipeline.addLast(new Http1xOrH2CHandler() { @Override protected void configure(ChannelHandlerContext ctx, boolean h2c) { if (idle != null) { //移除idleHandler,重新添加,不用注意次序 pipeline.remove(idle); } if (h2c) { //判断协议,如果定义idle则会重新添加 idleHandler handleHttp2(ctx.channel()); } else { handleHttp1(ch); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) { ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); //根据eventloop选中对应的handler进行异常传播 HandlerHolder
handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop()); handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(cause)); } }); } } } }); addHandlers(this, listenContext);////添加一个instaces(verticle的HttpHandlers)到httpHandlerMgr中 try {  //listen ip:port bindFuture = AsyncResolveConnectHelper.doBind(vertx, SocketAddress.inetSocketAddress(port, host), bootstrap); bindFuture.addListener(res -> { if (res.failed()) { vertx.sharedHttpServers().remove(id); } else { Channel serverChannel = res.result(); HttpServerImpl.this.actualPort = ((InetSocketAddress) serverChannel.localAddress()).getPort(); serverChannelGroup.add(serverChannel);//添加当前的ServerSocketChannel //初始化metrcis指标 VertxMetrics metrics = vertx.metricsSPI(); this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; } }); } catch (final Throwable t) { if (listenHandler != null) { vertx.runOnContext(v -> listenHandler.handle(Future.failedFuture(t))); } else { log.error(t); } listening = false; return this; } vertx.sharedHttpServers().put(id, this);//启动的HttpServer服务(verticle)添加到Vertx.sharedHttpMap中 actualServer = this; } else { //other instances actualServer = shared; this.actualPort = shared.actualPort; //在actualServer基础上添加一个instaces(verticle的HttpHandlers)到httpHandlerMgr中 addHandlers(actualServer, listenContext); //初始化metrics VertxMetrics metrics = vertx.metricsSPI(); this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; } //服务 bind 状态 actualServer.bindFuture.addListener(future -> { if (listenHandler != null) { final AsyncResult
res; if (future.succeeded()) { res = Future.succeededFuture(HttpServerImpl.this); } else { res = Future.failedFuture(future.cause()); listening = false; } listenContext.runOnContext((v) -> listenHandler.handle(res));//回调处理 } else if (future.failed()) { listening = false; log.error(future.cause()); } }); } return this;}

 

如何实现隔离(actor模型)

/**  * 添加一个verticle instances handlers  * @param server  First Actual Server(multi instances)  *    mutil instances 情况下第一个instance启动成功,other instances 仅仅是  *    利用multi core cpu,所以以 first instances actual Server为主,后续在  *    Current HttpServerImpl instance 添加handlers(verticle)  * @param context current Thread context  *    multi instances 下EventLoopGroup.next 方法挑选(choose)出一个Eventloop  *    与Context 映射. netty EventExecutor调度DefaultEventExecutorChooserFactory类  *    两种实现:  *           ①求余取模   *           ②位运算取模(2的幂)  *    所以防止实例数量大于EventloopGroup数量,Default : 2 * CpuCoreSensor.availableProcessors()  *    ,linux下以读取/proc/self/status 文件为主,而不是Runtime.getRuntime().availableProcessors()  */private void addHandlers(HttpServerImpl server, ContextImpl context) {    server.httpHandlerMgr.addHandler(      new HttpHandlers(        requestStream.handler(),        wsStream.handler(),        connectionHandler,        exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler)      , context);}public class HttpHandlers {  final Handler
requestHandler; final Handler
wsHandler; final Handler
connectionHandler; final Handler
exceptionHandler; /** * @param requestHandler Http Request Handler * @param wsHandler WebScoket Handler * @param connectionHander TCP Connection Handler * @param exceptionHander Exception Handlet */ public HttpHandlers( Handler
requestHandler, Handler
wsHandler, Handler
connectionHandler, Handler
exceptionHandler) { this.requestHandler = requestHandler; this.wsHandler = wsHandler; this.connectionHandler = connectionHandler; this.exceptionHandler = exceptionHandler; }}public class HandlerManager
{ public synchronized void addHandler(T handler, ContextImpl context) { /** * 添加一个eventloop(Thread)到 VertxEventLoopGroup 集合中. * accept状态后的read/write事件,线程调度在VertxEventLoopGroup类的next方法, * vertx重写choose策略 */ EventLoop worker = context.nettyEventLoop(); availableWorkers.addWorker(worker); /** * 添加handlers,并且绑定handler和context映射关系. * 注意部署的instances size不要超过EventLoopPoolSize, * 否则出现 1 EventLoop : N handler(verticle) * 导致一个eventloop上执行 N 个verticle */ Handlers
handlers = new Handlers<>(); Handlers
prev = handlerMap.putIfAbsent(worker, handlers); if (prev != null) { handlers = prev; } handlers.addHandler(new HandlerHolder<>(context, handler)); hasHandlers = true; }}

 

Connection scheduling process:

 

image

add handler to eventloop structure:

  1. an eventloop corresponds to a handlers
  2. an eventloop corresponds to multiple instances verticles(HandlerHolder)

 

 

 

HttpServer option

public class HttpServerOptions extends NetServerOptions {    //是否启用压缩,默认false  private boolean compressionSupported;    //压缩级别越高cpu负荷越大,默认gzip  private int compressionLevel;    //websocket最大的 Frame 大小,默认65536  private int maxWebsocketFrameSize;    //websocket 最大消息大小,默认65536*4  private int maxWebsocketMessageSize;    //处理WebSocket消息的约定的子协议  private String websocketSubProtocols;    //是否自动处理100-Continue,默认false  private boolean handle100ContinueAutomatically;    //分段传输chunk 大小,默认8192  private int maxChunkSize;    //initial line 最大长度,默认 4096  private int maxInitialLineLength;    //Header 最大大小,默认 8192  private int maxHeaderSize;    //http2.0最大的并发流,默认100  private Http2Settings initialSettings;    //支持alpn的版本,默认Http1.1和Http2.0  private List
alpnVersions; //设置连接的窗口大小,默认无限制 private int http2ConnectionWindowSize; //是否启用压缩解码 private boolean decompressionSupported; //WebSocket Masked位为true。 PerformingUnMasking将是错误的,默认为false private boolean acceptUnmaskedFrames; //默认HttpObjectDecoder的初始缓冲区大小,默认128 private int decoderInitialBufferSize;}

 

备注

1.建立HttpServer,配置最大的idle时间,默认tcpkeepalive配置是false,  网络故障等造成TCP挥手交互失败从而导致epoll的达到FileMax,阻止后续连接,导致  服务器无法提供服务; 或者启用keepalive,依靠内核TCP模块去侦测(默认2小时一次).  可用netstat工具查看应用当前网络状况  2.启用HTTP2,使用jetty开源apln-boot jar包,对JDK版本依赖关系强,需下载对应JDK版本的apln;   或者使用openssl,当前服务环境都需安装,迁移服务麻烦,但是性能稍高.3.具体 Route和其它HttpServer功能在 Web 模块中, core 模块只是实现Tcp相关、TLS、   Choose vertcile.handlers Scheduling 和codec等.

转载于:https://www.cnblogs.com/cmfa/p/10594932.html

你可能感兴趣的文章
桶排序
查看>>
the least common multiplier
查看>>
Metro 风格的浏览和无插件的 HTML5
查看>>
LifecycleControl.cs
查看>>
函数式思维: 利用 Either 和 Option 进行函数式错误处理 类型安全的函数式异常...
查看>>
ICEfaces 3.2.0.BETA1 发布
查看>>
div 旋转
查看>>
【设计模式】4、原型模式
查看>>
进入meta模式关闭背光灯
查看>>
pycharm+PyQt5+python最新开发环境配置
查看>>
走进AngularJS
查看>>
【学习笔记】 唐大仕—Java程序设计 第5讲 深入理解Java语言之5.2 多态及虚方法调用...
查看>>
轻松实现Ecshop商城多语言切换
查看>>
async & await 的前世今生(Updated)
查看>>
iOS开发:用SQLite3存储和读取数据
查看>>
webstorm上svn的安装使用
查看>>
setAdapter(adapter)空指针nullPointer 解决办法 分类: ...
查看>>
【JEECG技术文档】数据权限自定义SQL表达式用法说明
查看>>
使用 Bootstrap Typeahead 组件
查看>>
第一次玩蛇,有点紧张。
查看>>