黄小数

黄小数 查看完整档案

深圳编辑广东海洋大学  |  软件工程 编辑  |  填写所在公司/组织 jevoncode.github.io/ 编辑
编辑

这家伙好像很懂计算机~

个人动态

黄小数 发布了文章 · 2020-07-01

内网穿透工具frp的源码解读之概念流程篇

最近学习go语言,看完基础和高级篇后,果断拿起一个开源项目看看,于是就找到内网穿透工具——frp,它具体干嘛的,我就不多说,可以自己上官网看看,动手试试。

概念

连接,本文用的连接,可以成为socket连接,connection,tcp连接, udp连接等
工作连接通信连接要分开的理解
工作连接是实际用户操作的连接,如ssh通信的流量就走这连接。
通信连接是客户端与服务端的协议通信建立内网穿透的逻辑,里面逻辑就包含工作连接的创建
用户连接是用户发起与服务端的连接
实际业务的连接就是目标程序的连接,如ssh,就是内网22端口的连接,frpc会创建一个tcp连接,连接到22端口

角色

客户端:就是执行frpc程序的机子(也就是内网的机子)
服务端:就是执行frps程序的机子
用户: 就是外网机子,访问frps的机子。

核心流程

  1. 客户端登陆(通信连接
  2. 服务端建立controler
  3. 客户端建立control,发送NewProxy(通信连接
  4. 服务端接收到NewProxy(通信连接),并响应ReqWorkConn,开始建立与用户的监听,handler是HandleUserTcpConnection(server/proxy/proxy.go:235)
  5. 客户端接收ReqWorkConn(通信连接
  6. 客户端创建工作连接(与服务端的新连接)
  7. 发送NewWorkConn给服务端(工作连接
  8. 客户端创建与本地的连接(实际业务的连接)
  9. 客户端join两个连接(工作连接和本地连接)
  10. 服务端在接收NewWorkConn,就将该工作连接放入连接池
  11. 服务端接收用户的连接,会调用HandleUserTcpConnection,该handler就是从上面的连接池获取连接,然后join用户连接工作连接
所谓建立controler也是为了创建消费者模式的,readCh, sendCh, msgHandler, manager
查看原文

赞 0 收藏 0 评论 0

黄小数 发布了文章 · 2020-04-12

Tomcat源码解读——初始化及启动阶段

从startup.sh文件,找到catalina.sh,然后找到启动类:org.apache.catalina.startup.Bootstrap

初始化阶段

1.Bootstrap的main方法,实例化自己,然后初始化一堆classloader,分别是commonLoader, serverLoader, sharedLoader,其中commonLoader在conf的ctalina.properties里配置了读取哪些jar包

common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar

而另外两个classloader是没有指定目录的,也就是没有目录让他们加载。

2.调用Bootstrap的start方法,然后初始化org.apache.catalina.startup.Catalina对象,然后设置其classloader成员变量为sharedLoader。
3.调用Catalina对象的start方法,这里就稍微复杂点。

  public void start() {

        if (getServer() == null) {
            load();
        }

        if (getServer() == null) {
            log.fatal("Cannot start server. Server instance is not configured.");
            return;
        }

        long t1 = System.nanoTime();

        // Start the new server
        try {
            getServer().start();
        } catch (LifecycleException e) {
            log.fatal(sm.getString("catalina.serverStartFail"), e);
            try {
                getServer().destroy();
            } catch (LifecycleException e1) {
                log.debug("destroy() failed for failed Server ", e1);
            }
            return;
        }

        long t2 = System.nanoTime();
        if(log.isInfoEnabled()) {
            log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
        }

        // Register shutdown hook
        if (useShutdownHook) {
            if (shutdownHook == null) {
                shutdownHook = new CatalinaShutdownHook();
            }
            Runtime.getRuntime().addShutdownHook(shutdownHook);

            // If JULI is being used, disable JULI's shutdown hook since
            // shutdown hooks run in parallel and log messages may be lost
            // if JULI's hook completes before the CatalinaShutdownHook()
            LogManager logManager = LogManager.getLogManager();
            if (logManager instanceof ClassLoaderLogManager) {
                ((ClassLoaderLogManager) logManager).setUseShutdownHook(
                        false);
            }
        }

        if (await) {
            await();
            stop();
        }
    }

使用了Digester工具,边读取server.xml,边实例化对象。实例化了org.apache.catalina.core.StandardServer对象,org.apache.catalina.deploy.NamingResources对象等等,Digester这工具可设置实例化默认class或读取xml标签中className属性来实例化对应Catalina对象的成员变量(还可以带层次的,如实例化Server对象里的Service成员变量)。如Server对象,就是默认org.apache.catalina.core.StandardServer对象。然后再实例化Server对象里的Service成员变量org.apache.catalina.core.StandardService。还有就是自定义实例化规则,如ConnectorCreateRule,就会直接将Connector的标签里protocol作为org.apache.catalina.connector.Connector#Connector(java.lang.String)的实例化参数,实例化org.apache.catalina.connector.Connector;

Digester这工具有四种功能,1.读取xml标签,实例化默认配置的class;2.读取xml标签中className属性来实例化;3.可以带层次的设置成员变量;4.自定实例化规则。
//ConnectorCreateRule.java
//ConnectorCreateRule,就会直接将Connector的标签里protocol作为org.apache.catalina.connector.Connector#Connector(java.lang.String)的实例化参数,实例化org.apache.catalina.connector.Connector;
 @Override
    public void begin(String namespace, String name, Attributes attributes)
            throws Exception {
        Service svc = (Service)digester.peek();
        Executor ex = null;
        if ( attributes.getValue("executor")!=null ) {
            ex = svc.getExecutor(attributes.getValue("executor"));
        }
        Connector con = new Connector(attributes.getValue("protocol"));
        if ( ex != null )  _setExecutor(con,ex);

        digester.push(con);
    }

还实例化org.apache.catalina.core.StandardThreadExecutor,这个Exector后续作为Connector处理ServerProcessor使用
第3点,实例化很多对象,当时为了便于理解主干,其他的先忽略。

4.实例化上面一堆对象后,会调用Server对象的init方法,但由于是继承org.apache.catalina.util.LifecycleBase,所以也就是调用LifecycleBase的init方法。

@Override
    protected void initInternal() throws LifecycleException { 
...这里还有一些代码,先忽略
        // Initialize our defined Services
        for (int i = 0; i < services.length; i++) {
            services[i].init();
        }
    }

5.Server对象init也是调用一堆Service的init方法(意思说server.xml的Service标签是可以配置多个),很尴尬,Service的init方法又是调用LifecycleBase的init方法,这一块可能造成理解混乱,因为Server和Service名字很像,而且init方法又是调用LifecycleBase的init模版方法。

 @Override
    protected void initInternal() throws LifecycleException { 
...这里还有一些代码,先忽略
        // Initialize our defined Connectors
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
                    connector.init();
                } catch (Exception e) {
                    String message = sm.getString(
                            "standardService.connector.initFailed", connector);
                    log.error(message, e);

                    if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"))
                        throw new LifecycleException(message);
                }
            }
        }
    }
  1. Service对象调用Connector的init方法。也是一样调用LifecycleBase的init模版方法。

    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        // Initialize adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);

        // Make sure parseBodyMethodsSet has a default
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }

        if (protocolHandler.isAprRequired() &&
                !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerNoApr",
                            getProtocolHandlerClassName()));
        }

        try {
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }

        // Initialize mapper listener
        mapperListener.init();
    }

这里就调用protocolHandler的init方法。protocolHandler从哪里来的呢?就第3点说的,自定义实例化规则ConnectorCreateRule里,将Connector标签的protocol属性作为Connector构造方法参数

 public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            this.protocolHandler = (ProtocolHandler) clazz.getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        }

        // Default for Connector depends on this (deprecated) system property
        if (Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "false"))) {
            encodedSolidusHandling = EncodedSolidusHandling.DECODE;
        }
    }

7.protocolHandler的init方法,是在org.apache.coyote.AbstractProtocol#init

@Override
    public void init() throws Exception {
...这里还有一些代码,先忽略
        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));

        try {
            endpoint.init();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.initError",
                    getName()), ex);
            throw ex;
        }
    }

这里我们用org.apache.coyote.http11.Http11NioProtocol这个Protocal继续

public Http11NioProtocol() {
        endpoint=new NioEndpoint();
        cHandler = new Http11ConnectionHandler(this);
        ((NioEndpoint) endpoint).setHandler(cHandler);
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

8.protocolHandler的init方法,调用NioEndpoint的init方法. NioEndpoint继承org.apache.tomcat.util.net.AbstractEndpoint的init模版方法

  public final void init() throws Exception {
        testServerCipherSuitesOrderSupport();
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }

9.bind方法就是交给子类实现,我们看org.apache.tomcat.util.net.NioEndpoint#bind的实现

@Override
    public void bind() throws Exception {

        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        serverSock.socket().bind(addr,getBacklog());
        serverSock.configureBlocking(true); //mimic APR behavior
        if (getSocketProperties().getSoTimeout() >= 0) {
            serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
        }

        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);

        // Initialize SSL if needed
        if (isSSLEnabled()) {
            SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);

            sslContext = sslUtil.createSSLContext();
            sslContext.init(wrap(sslUtil.getKeyManagers()),
                    sslUtil.getTrustManagers(), null);

            SSLSessionContext sessionContext =
                sslContext.getServerSessionContext();
            if (sessionContext != null) {
                sslUtil.configureSessionContext(sessionContext);
            }
            // Determine which cipher suites and protocols to enable
            enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
            enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
        }

        if (oomParachute>0) reclaimParachute(true);
        selectorPool.open();
    }

这里我们看到使用了实例化了ServerSocketChannel,并设置为阻塞模式。但没看到ServerSocketChannel注册Selector。不过调用了org.apache.tomcat.util.net.NioSelectorPool#open来实例化Selector,但没注册

 protected Selector getSharedSelector() throws IOException {
        if (SHARED && SHARED_SELECTOR == null) {
            synchronized ( NioSelectorPool.class ) {
                if ( SHARED_SELECTOR == null )  {
                    synchronized (Selector.class) {
                        // Selector.open() isn't thread safe
                        // http://bugs.sun.com/view_bug.do?bug_id=6427854
                        // Affects 1.6.0_29, fixed in 1.7.0_01
                        SHARED_SELECTOR = Selector.open();
                    }
                    log.info("Using a shared selector for servlet write/read");
                }
            }
        }
        return  SHARED_SELECTOR;
    }
  public void open() throws IOException {
        enabled = true;
        getSharedSelector();
        if (SHARED) {
            blockingSelector = new NioBlockingSelector();
            blockingSelector.open(getSharedSelector());
        }

    }

然后将Selector赋值给NioBlockingSelector成员变量.

    public void open(Selector selector) {
        sharedSelector = selector;
        poller = new BlockPoller();
        poller.selector = sharedSelector;
        poller.setDaemon(true);
        poller.setName("NioBlockingSelector.BlockPoller-"+(threadCounter.getAndIncrement()));
        poller.start();
    }

BlockPoller是个线程对象

 protected static class BlockPoller extends Thread {
       protected volatile boolean run = true;
        protected Selector selector = null;
        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
 }

维护了一个队列events,用于

NioEndPoint --> NioSelectorPool --> NioBlockingSelector --> BlockPoller

Catalina对象的start方法中初始化部分的中,相关主干已经初始化完毕。

下一步,执行getServer().start();方法

启动阶段

1.Server的start方法,又开始一轮的org.apache.catalina.util.LifecycleBase#start模版方法。

@Override
    protected void startInternal() throws LifecycleException {

        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        setState(LifecycleState.STARTING);

        globalNamingResources.start();

        // Start our defined Services
        synchronized (servicesLock) {
            for (int i = 0; i < services.length; i++) {
                services[i].start();
            }
        }
    }

2.Service的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)

@Override
    protected void startInternal() throws LifecycleException {

...这里还有一些代码,先忽略
        // Start our defined Connectors second
        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                try {
                    // If it has already failed, don't try and start it
                    if (connector.getState() != LifecycleState.FAILED) {
                        connector.start();
                    }
                } catch (Exception e) {
                    log.error(sm.getString(
                            "standardService.connector.startFailed",
                            connector), e);
                }
            }
        }
    }

3.调用Connector的start方法(org.apache.catalina.util.LifecycleBase#start模版方法)

    @Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }

        setState(LifecycleState.STARTING);

        try {
            protocolHandler.start();
        } catch (Exception e) {
            String errPrefix = "";
            if(this.service != null) {
                errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
            }

            throw new LifecycleException
            (errPrefix + " " + sm.getString
                    ("coyoteConnector.protocolHandlerStartFailed"), e);
        }

        mapperListener.start();
    }

4.protocolHandler.start(),但这里不是调用org.apache.catalina.util.LifecycleBase#start模版方法,而是调用org.apache.coyote.AbstractProtocol#start的方法。

 @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled())
            getLog().info(sm.getString("abstractProtocolHandler.start",
                    getName()));
        try {
            endpoint.start();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.startError",
                    getName()), ex);
            throw ex;
        }
    }

5.endpoint.start()调用了org.apache.tomcat.util.net.AbstractEndpoint#start方法


    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

这里的bind已经init阶段调用过,所以这里不会调用,而是继续startInternal方法
6.startInternal方法是交给子类实现,这里是org.apache.tomcat.util.net.NioEndpoint#startInternal

@Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }

            initializeConnectionLatch();

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }
    

创建org.apache.tomcat.util.net.NioEndpoint.Acceptor来循环接收连接。countUpOrAwaitConnection的方法就是用于判断是否继续从OS队列中获取连接。里面的数量就是由maxConnections配置的,如果在BIO情况下,maxConnections=maxThreads,所以等价于当工作线程都处于繁忙时,则acceptor会等待工作线程空闲才会去获取来连接。而在NIO情况下,maxConnections默认等于100000,则不会等工作线程繁忙,而是继续从OS队列中获取连接,放在events队列中.

@Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }

每接收到连接,也就是socket,会调用org.apache.tomcat.util.net.NioEndpoint#setSocketOptions方法来处理socket。

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = nioChannels.poll();
            if ( channel == null ) {
                // SSL setup
                if (sslContext != null) {
                    SSLEngine engine = createSSLEngine();
                    int appbufsize = engine.getSession().getApplicationBufferSize();
                    NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
                                                                       Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
                                                                       socketProperties.getDirectBuffer());
                    channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
                } else {
                    // normal tcp setup
                    NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
                                                                       socketProperties.getAppWriteBufSize(),
                                                                       socketProperties.getDirectBuffer());

                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                if ( channel instanceof SecureNioChannel ) {
                    SSLEngine engine = createSSLEngine();
                    ((SecureNioChannel)channel).reset(engine);
                } else {
                    channel.reset();
                }
            }
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

注意到,这里socket设置了非阻塞,然后实例化NioChannel,然后注册到Poller里。Poller的注册方法:

  public void register(final NioChannel socket) {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

创建一个监听READ的事件。注册到Poller里的Selector上。是不是看出叻,实际在PollerEvent里完成叻,虽然看去好像OP_REGISTER,NIO好像没有这事件,其实在PollerEvent会将OP_REGISTER转为SelectionKey.OP_READ。

 */
    public static class PollerEvent implements Runnable {

        protected NioChannel socket;
        protected int interestOps;
        protected KeyAttachment key;
        public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
            reset(ch, k, intOps);
        }

        public void reset(NioChannel ch, KeyAttachment k, int intOps) {
            socket = ch;
            interestOps = intOps;
            key = k;
        }

        public void reset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {
            if ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won't have been counted down when the socket
                        // closed.
                        socket.getPoller().getEndpoint().countDownConnection();
                    } else {
                        final KeyAttachment att = (KeyAttachment) key.attachment();
                        if ( att!=null ) {
                            //handle callback flag
                            if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false);
                            }
                            interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                            att.access();//to prevent timeout
                            //we are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            att.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, true);
                    } catch (Exception ignore) {}
                }
            }//end if
        }//run

        @Override
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]";
        }
    }

由于 socket.setPoller(this);设置poller,所以socker可以注册poller里selector。
而Poller是线程,由上面org.apache.tomcat.util.net.NioEndpoint#startInternal实例化了好几个,也就是运行时看到的线程名:http-nio-8080-ClientPoller-1


    /**
     * Poller class.
     */
    public class Poller implements Runnable {

        protected Selector selector;
        protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();

        protected volatile boolean close = false;
        protected long nextExpiration = 0;//optimize expiration handling

        protected AtomicLong wakeupCounter = new AtomicLong(0l);

        protected volatile int keyCount = 0;

        public Poller() throws IOException {
            synchronized (Selector.class) {
                // Selector.open() isn't thread safe
                // http://bugs.sun.com/view_bug.do?bug_id=6427854
                // Affects 1.6.0_29, fixed in 1.7.0_01
                this.selector = Selector.open();
            }
        }
        ...
        public boolean events() {
            boolean result = false;

            Runnable r = null;
            for (int i = 0, size = events.size(); i < size && (r = events.poll()) != null; i++ ) {
                result = true;
                try {
                    r.run();
                    if ( r instanceof PollerEvent ) {
                        ((PollerEvent)r).reset();
                        eventCache.offer((PollerEvent)r);
                    }
                } catch ( Throwable x ) {
                    log.error("",x);
                }
            }

            return result;
        }

        public void register(final NioChannel socket) {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

        public KeyAttachment cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) {
            KeyAttachment ka = null;
            try {
                if ( key == null ) return null;//nothing to do
                ka = (KeyAttachment) key.attachment();
                if (ka != null && ka.isComet() && status != null) {
                    //the comet event takes care of clean up
                    //processSocket(ka.getChannel(), status, dispatch);
                    ka.setComet(false);//to avoid a loop
                    if (status == SocketStatus.TIMEOUT ) {
                        if (processSocket(ka.getChannel(), status, true)) {
                            return null; // don't close on comet timeout
                        }
                    } else {
                        // Don't dispatch if the lines below are cancelling the key
                        processSocket(ka.getChannel(), status, false);
                    }
                }
                ka = (KeyAttachment) key.attach(null);
                if (ka!=null) handler.release(ka);
                else handler.release((SocketChannel)key.channel());
                if (key.isValid()) key.cancel();
                // If it is available, close the NioChannel first which should
                // in turn close the underlying SocketChannel. The NioChannel
                // needs to be closed first, if available, to ensure that TLS
                // connections are shut down cleanly.
                if (ka != null) {
                    try {
                        ka.getSocket().close(true);
                    } catch (Exception e){
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.socketCloseFail"), e);
                        }
                    }
                }
                // The SocketChannel is also available via the SelectionKey. If
                // it hasn't been closed in the block above, close it now.
                if (key.channel().isOpen()) {
                    try {
                        key.channel().close();
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.channelCloseFail"), e);
                        }
                    }
                }
                try {
                    if (ka != null && ka.getSendfileData() != null
                            && ka.getSendfileData().fchannel != null
                            && ka.getSendfileData().fchannel.isOpen()) {
                        ka.getSendfileData().fchannel.close();
                    }
                } catch (Exception ignore) {
                }
                if (ka!=null) {
                    ka.reset();
                    countDownConnection();
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) log.error("",e);
            }
            return ka;
        }
        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                try {
                    // Loop if endpoint is paused
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }

                    boolean hasEvents = false;

                    // Time to terminate?
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString(
                                    "endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    } else {
                        hasEvents = events();
                    }
                    try {
                        if ( !close ) {
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                keyCount = selector.selectNow();
                            } else {
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString(
                                        "endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch ( NullPointerException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch ( CancelledKeyException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }//while

            stopLatch.countDown();
        }
    }

Poller的run方法,监听SelectionKey,然后交给org.apache.tomcat.util.net.NioEndpoint.Poller#processKey处理

 protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
            boolean result = true;
            try {
                if ( close ) {
                    cancelledKey(sk, SocketStatus.STOP, attachment.comet);
                } else if ( sk.isValid() && attachment != null ) {
                    attachment.access();//make sure we don't time out valid sockets
                    sk.attach(attachment);//cant remember why this is here
                    NioChannel channel = attachment.getChannel();
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            if ( isWorkerAvailable() ) {
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                }
                            } else {
                                result = false;
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk, SocketStatus.ERROR,false);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk, SocketStatus.ERROR,false);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
            return result;
        }

这里有个关键知识点,processSendfile实现了零拷贝,可参考https://www.ibm.com/developer...

if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        }         
public SendfileState processSendfile(SelectionKey sk, KeyAttachment attachment,
                boolean calledByProcessor) {
            long written = sd.fchannel.transferTo(sd.pos,sd.length,wc);
            ...
        }

如果不是文件处理的socket则会走org.apache.tomcat.util.net.NioEndpoint#processSocket这条路。


    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); //will get reset upon next reg
            SocketProcessor sc = processorCache.poll();
            if ( sc == null ) sc = new SocketProcessor(socket,status);
            else sc.reset(socket,status);
            if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
            else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

这里就会实例化SocketProcessor或复用SocketProcessor实例,SocketProcessor是一个Runnable,所以可以交给线程池去处理,也就是最上面init阶段初始化的org.apache.catalina.core.StandardThreadExecutor。
在这里理下Connector的start会启动Acceptor线程,也就是常看到的http-nio-8080-Acceptor-0,一般只有一个,代码里的注释也写着

// Initialize thread count default for acceptor
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }

多个accept线程看不出有更好。accept负责监听连接,当有连接过后,会封装NioChannel,然后增加Event到Poller监听的events队列里。是有多个Poller,每个Poller有自己的events对列,那么accept会将NioChannel注册到哪个Poller呢?是这样的轮训算法:

 /**
     * Return an available poller in true round robin fashion
     */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }

注册到Poller后,这个注册实际就会将socker与该Poller的Selector绑定,监听READ事件。
后续Poller就可以监听SelectionKey来处理了。

Poller获取SelectionKey会生成SocketProcessor交给StandardThreadExecutor线程池来执行。属性是在Catalina里由Digester设置规则org.apache.catalina.startup.SetAllPropertiesRule设置线程池的属性,如maxThreads最大线程数.

此时Catalina大概就算start阶段结束了。其实到accept和poller创建完,start就算结束了

查看原文

赞 0 收藏 0 评论 0

黄小数 发布了文章 · 2020-04-12

Tomcat优化思路

问题

在这些tomcat服务中,如果在很短的时间剧增的流量,会导致这些机子变成CPU饥饿(cpu-starved),并且服务会没响应。会导致这些服务的客户端体验很差,如读超时和连接超时。特别如果将读超时设置得非常高,会导致特别差的体验,客户端会等到好久好久。在SOA架构中,客户端的客户端也会请求超时,从而导致雪崩效应(ripple effect),以至于整个应用的其他服务也一起都变慢或不可用,最后全部服务变慢或不可用。在正常情况,机子都是有大量的CPU资源,服务也不是CPU密集型(cpu intensive)。所以,为什么会导致上述那些异常情况发发生?
在测试环境模拟流量激增时,发现CPU不足(cpu starvation)的原因是tomcat配置不当。当突然增加流量,大量tomcat线程也在繁忙。系统CPU有个巨大的跳跃,是当大部分CPU时间都在处理上下文切换,就没有线程可以做任何有意义的工作时。

解决方法

为了了解为什么tomcat的线程会繁忙,我们需要了解tomcat的线程模型。

高层次的描述Tomcat Http Connectotr的线程模型

Tomcat有一个acceptor线程来接收连接(这里就涉及到对网络的熟悉,熟悉就知道连接不仅仅是socket的表面,更是tcp的三次握手过程)。另外还有线程池来做实际的工作。于是一个请求的过程是:

  1. OS和客户端建立连接的TCP握手。这取决于OS的实现,可能是有一个队列来保存这些连接或多个队列来保存。在多队列的情况,一条队列保存未完成的连接,这些连接都是还没完成三次握手的。一旦完成握手,连接就会被移到保存完成连接的队列,应用就会消费这队列里的连接。“acceptCount”这tomcat参数就用于控制这些队列的长度。(应该是包括未完成握手和完成握手的连接数)
  2. tomcat的acceptor线程接收连接,这些队列都是来自于已完成握手的队列。
  3. 检查工作线程池是否有空闲的线程,如果没有且活动线程数小于maxThreads,则会创建工作线程,否则等待空闲线程。
  4. 一旦有空闲工作线程,acceptor线程就会将连接交给工作线程后,然后继续监听新的连接。
  5. 工作线程做的就是实际的工作,如从连接读取输入,处理请求,然后发送响应给客户端。如果连接不是keep alive则会关闭连接,然后将自己放回线程池。如果是keep aliave连接,继续等待该连接读取输入。如果数据一直没到,那么keep alive情况,会有个keepAliveTimeout,超过该时间,则会关闭连接,然后将自己放回线程池。

考虑这种情况,tomcat的maxThreads和acceptCount设置都很大,突增的流量会填满OS的队列和让tomcat的所有线程都变得繁忙。当更多的请求发送到这台机子,从而超过系统所能处理的数量时,这种请求的“排队”是不可避免的,并会导致繁忙线程的增加,最终导致CPU不足(cpu starvation)。因此,解决方法的关键是避免多个点(OS和tomcat线程)上有太多排队的请求,并在应用程序达到最大容量时快速失败(返回http状态503)。以下是实际操作的一个推荐:

当达到系统容量,应快速失败

预估在峰值负载时繁忙的线程数。如果服务器平均5ms内对请求作出响应,那么单个线程每秒则可处理200个请求(rps)。如果机子是4核CPU,则可以达到800rps。假设4个请求并行发送到机子(假设机子有4核),这会让4个线程繁忙5ms,所以下个5ms,4个或更多的请求让4个线程繁忙。随后的请求会选取一个空闲线程。所以理论上,在800rps时,平均不应该有超过8个线程处理繁忙状态。但实际当中,会有些不同,是因为系统所有资源都是共享的。因此,应该对系统能够维持的总吞吐量进行实验,并计算繁忙线程的期望数量。这将为维持峰值负载所需的线程数量提供一个基线。为了提供一些缓冲区,需要将线程数增加三倍以上,达到30个。这个缓冲区是任意的,如果需要还可以进一步调优。在我们的实验中,我们使用了略多于3倍的缓冲区,效果很好。
跟踪内存中运行中并发请求的数量,并将其用于快速失败。例如,当并发请求的数量接近刚刚预估的繁忙线程数据(8个),则返回一个http状态码503。这将防止太多的工作线程变得繁忙,因为一旦达到峰值吞吐了,任何变得活跃的额外线程都将执行非常轻量级的工作,即返回503。

设置操作系统参数

acceptCount参数用于tomcat表示最长队列,这队列是操作系统级别,用于处理还未完成tcp握手的操作(具体取决于OS)。这是一个很重要的调优参数,否则在建立连接时会出现连接不上,活着导致OS队列中的连接过度排队,从而导致读超时。当然每个OS处理正在握手和完成握手的连接细节是不同,可能会时只有一个连接队列,或多个连接队列用于区分存放未完成握手和完成握手的连接(请阅读相关的文档来获取这些细节)。所以,有一个很好的方法调优这个acceptCount参数,那就是从很小的值开始测试,逐步增大,增达到没有连接错误即可。
太大的acceptCount值意味OS层面可以接收更多的请求,但是,如果rps大于该机子能够处理的能力,所有工作线程会变得繁忙,于是aceeptor线程会等待,直至有worker线程空闲。更多的请求将继续堆积在OS队列中,因为只有当工作线程可用时,acceptor线程才可以使用它们。在最糟糕的情况,这些请求还在OS队列时就已经超时了,但是tomcat的acceptor线程依然会去获取它来交给工作线程处理。这种是完全浪费资源,而客户端也没收到任何响应。
如果acceptCount设置太小,则会在很高的rps时无法有足够的OS空间来接收连接,这样客户端就会手连接超时的错误(connect time out error),实际吞吐会低于服务器能够承载的。

因此可以实验从很小的值如10开始,然后逐步增加acceptCount的值,直至没有连接错误出现。

当完成上面两个改变后,就算最差的情况,所有工作线程都很繁忙,但机子不会cpu不足,依然有能力做更多的工作(最大吞吐)。

其他考虑

如上所述,每个连接最终都被tomcat的一个工作线程处理。假如keep alive被打开,工作线程就会继续监听该连接,从而不会变成空闲而返回到线程池中。所以,如果客户端不够智能从而不会关闭连接,那么服务端很快就会用完它的线程。如果keep alive被打开,那么必须通过记住这种情况,来调节服务器群的大小。
或者,如果keep alive是关闭的,那么就不必担心使用工作线程处理非活动连接的问题。但是,在这种情况,每个调用就要付出打开和关闭连接的代价。此外,这还会创建很多TIME_WAIT状态的socket,这样会给服务器造成压力。

最好根据应用程序的用力进行选择,并通过运行实验来测试性能。

备注

“连接”这两个字做于名词时,实际指的是socket。而当用于动词时,其实就是TCP三次握手。
线程池里的线程在本文,有时称为空闲线程或工作线程,或空闲工作线程。

查看原文

赞 0 收藏 0 评论 0

黄小数 赞了文章 · 2019-12-22

Vuejs之axios获取Http响应头

今天在开始接入后端Api 就遇到了一个问题了

在用 axios 获取 respose headers 时候获取到的只有的

Object {
    cache-control:"private, must-revalidate",
    content-type:"application/json"
}

下面是服务器返回的响应头, 我需要拿到的是 Authorization
clipboard.png

使用 respose.headers 拿到的只用两个默认的headers, 尝试了使用捕获响应头的方法

   axios.interceptors.response.use(function (response) {
        // Do something with response data
        console.log(response);
        return response;
    }, function (error) {
        // Do something with response error
        return Promise.reject(error);
    });

结果打印出来的还是

Object {
    cache-control:"private, must-revalidate",
    content-type:"application/json"
}

找了半天问题, 后面的在一个论坛找到了解决方法

原来在默认的请求上, 浏览器只能访问以下默认的 响应头

  • Cache-Control

  • Content-Language

  • Content-Type

  • Expires

  • Last-Modified

  • Pragma

如果想让浏览器能访问到其他的 响应头的话 需要在服务器上设置 Access-Control-Expose-Headers

Access-Control-Expose-Headers : 'Authorization'

前端成功获取Authorization

clipboard.png

原文地址 :http://stackoverflow.com/ques...

查看原文

赞 37 收藏 28 评论 10

黄小数 赞了文章 · 2019-12-21

axios POST提交数据的三种请求方式写法

1、Content-Type: application/json

import axios from 'axios'
let data = {"code":"1234","name":"yyyy"};
axios.post(`${this.$url}/test/testRequest`,data)
.then(res=>{
    console.log('res=>',res);            
})

clipboard.png

2、Content-Type: multipart/form-data

import axios from 'axios'
let data = new FormData();
data.append('code','1234');
data.append('name','yyyy');
axios.post(`${this.$url}/test/testRequest`,data)
.then(res=>{
    console.log('res=>',res);            
})

clipboard.png

3、Content-Type: application/x-www-form-urlencoded

import axios from 'axios'
import qs from 'Qs'
let data = {"code":"1234","name":"yyyy"};
axios.post(`${this.$url}/test/testRequest`,qs.stringify({
    data
}))
.then(res=>{
    console.log('res=>',res);            
})

clipboard.png

总结:
1、从jquery转到axios最难忘的就是要设置Content-Type,还好现在都搞懂了他们的原理
2、上面三种方式会对应后台的请求方式,这个也要注意,比如java的@RequestBody,HttpSevletRequest等等

查看原文

赞 79 收藏 53 评论 13

黄小数 关注了专栏 · 2019-12-21

全栈工程师进阶

日常学习总结与分享,包括:前端、后台与运维,讲解的知识点包括:javascript、vuejs、reactjs、springboot、springcloud、redis、mongodb、aliyun、linux、bash shell、docker等等

关注 80

黄小数 发布了文章 · 2019-10-20

nodejs-10.16.1源码安装

下载

cd /mydata
wget http://cdn.npm.taobao.org/dist/node/v10.16.1/node-v10.16.1.tar.gz

解压

cd /mydata
tar -zxf node-v10.16.1.tar.gz

创建安装路径

mkdir /usr/local/node/

设置安装路径

sudo  ./configure --prefix=/usr/local/node/

编译安装

sudo make
sudo make install

增加环境变量

export NODEJS_HOME=/usr/local/node
export PATH=$PATH:$NODEJS_HOME/bin

注意,全程都要用sudo权限,避免目录无法安装

查看安装情况

monster@monster-PC:~$ node -v
v10.16.1
monster@monster-PC:~$ npm -v
6.9.0

TroubleShot

monster@monster-PC:~$ npm install -g cnpm --registry=https://registry.npm.taobao.org
npm WARN checkPermissions Missing write access to /usr/local/node/lib/node_modules
npm ERR! path /usr/local/node/lib/node_modules
npm ERR! code EACCES
npm ERR! errno -13
npm ERR! syscall access
npm ERR! Error: EACCES: permission denied, access '/usr/local/node/lib/node_modules'
npm ERR!  { [Error: EACCES: permission denied, access '/usr/local/node/lib/node_modules']
npm ERR!   stack:
npm ERR!    'Error: EACCES: permission denied, access \'/usr/local/node/lib/node_modules\'',
npm ERR!   errno: -13,
npm ERR!   code: 'EACCES',
npm ERR!   syscall: 'access',
npm ERR!   path: '/usr/local/node/lib/node_modules' }
npm ERR! 
npm ERR! The operation was rejected by your operating system.
npm ERR! It is likely you do not have the permissions to access this file as the current user
npm ERR! 
npm ERR! If you believe this might be a permissions issue, please double-check the
npm ERR! permissions of the file and its containing directories, or try running
npm ERR! the command again as root/Administrator (though this is not recommended).

npm ERR! A complete log of this run can be found in:
npm ERR!     /home/monster/.npm/_logs/2019-08-05T09_20_52_374Z-debug.log

将目录授权给当前用户

 sudo chown -R monster:monster  /usr/local/node
查看原文

赞 0 收藏 0 评论 0

黄小数 发布了文章 · 2019-09-08

数据库索引算法——B树与B+树

B-树

数据库索引为什么要使用树结构存储呢?
这个还不简单,树的查询效率高,而且可以保持有序。
既然这样,为什么索引没有使用二叉查找树来实现呢?
这就不明白了,明明二叉查找树时间复杂度是O(logN),性能已经足够高了,难道B树可以比它更快?

其实从算法逻辑来讲,二叉树查找树的查找速度和比较次数都是最小的,但是我们不得不考虑一个现实问题:磁盘IO ;数据索引是存储在磁盘上的,当数据量比较大的时候,索引的大小可能有几个G甚至更多。
当我们利用索引查询的时候,能把整个索引全部加载到内存吗?显然不可能。能做的只有逐一加载每个磁盘页,这里的磁盘页对应索引树的节点。
在二叉查找树里,磁盘的IO次数等于索引树的高度。

既然如此,为了减少磁盘的IO次数,我们就需要把原本“瘦高”的树结构变成“矮胖”,这是B-树的特征之一。
B-树是一种多路平衡查找树,它的每一个节点最多包含k个孩子,k被称为B树的阶。
k的大小取决于磁盘页的大小。

下面来具体介绍以下B-树(Balance Tree),一个m阶的B树具有如下几个特征:

  1. 根节点至少有两个子女;
  2. 每个中间节点都包含k-1个元素和k个孩子,其中m/2<=k<=m;
  3. 每一个叶子节点都包含k-1个元素,其中m/2<=k<=m;
  4. 所有叶子节点都位于同一层;
  5. 每个节点中的元素从小到大排列,节点当中k-1个元素正好是k个孩子包含的元素的值域划分。

下面以3阶的B-树为例,来看看B-树的具体结构。

              【     9    】
           /                  \
          /                    \
     【2  6】                 【12】
   /   |      \            /       \ 
  /    |       \          /        \ 
 /     |        \        /          \
1    【3 5】     8       11       【13 15】

这棵树中,重点看【2 6】节点,该节点有两个元素2和6,又有三个孩子1,【3 5】和8。
其中1小于元素2,6之间,8大于【3 5】,正好符合刚刚所列的几条特征。

演示下B-树的查询过程,假如我们要查询数值为5的节点。
第一次IO,查到 9,比较9
第二次IO,查到 【2 6】 比较2,比较6
第三次IO,查到 【3 5】 比较3, 比较5

虽然 比较次数 并不比二叉查找树少,尤其当单一节点中的元素数量很多时。
但是相比磁盘IO的速度,内存中 比较的操作 耗时几乎可以忽略,所以只要树的高度足够低,IO次数足够少,就可以提高查找性能。
相比之下节点内部元素多一些并没关系,最多就是几次内存交换操作而已,只要不超过磁盘页的大小,这就是B-树的优势之一。

但B-树,插入新节点的过程就比较复杂,而且分成很多种情况。所以我们举个典型的例子,上图中插入4.
自顶下查找4的节点位置,发现4应该插入【3 5】之间
节点【3 5】已经是两个元素节点,根据特征2要求,无法再增加了。父节点【2 6】也是两元素节点,也无法再增加,根节点9是单元素节点,可以升级为两元素节点。于是拆分节点【3 5】和节点【2 6】,让根节点升级为两元素节点【4 9】,节点6独立成为根节点的第二个孩子。

            【    4 9    】
         /         |         \
        /          |          \
       2          6          【12】
   /   |         / \        /      \ 
  /    |        /   \      /        \ 
 /     |       /     \    /          \
1      3      5       8  11       【13 15】

虽然维护树结构麻烦,但也正因为如此,让B-树能够始终维持多路平衡。这就是B-树的一大优势:自平衡。

下面在举例说说B-树的删除,继续上一颗树,我们要删除元素11
自顶向下查找元素11的节点位置。
删除11后,节点12只有一个孩子,不符合特征1和5 (不好意思,我猜的,原文就直说不符合B树规范)。
因此找出12,13,15这个节点的中位数,取代节点12,而节点12自身下已成为第一个孩子。(此过程为左旋

            【    4 9    】
         /         |         \
        /          |          \
       2          6            13
   /   |         / \        /      \ 
  /    |        /   \      /        \ 
 /     |       /     \    /          \
1      3      5       8  12          15

虽然B-树的插入和删除,很复杂,没看懂也没关系,关键是理解B-树的核心思想:B-树主要应用于文件系统以及部分数据库索引,比如著名的非关系型数据库MongoDB。

不过大部分关系型数据库,比如MySql,则使用B+树作为索引。

很多文档里,有时写B-树,有些写B树,但都是指balance tree,而不是balance binary tree

B+树

B+树是基于B-树的一种变体,有着比B-树更高的查询性能。

一个m阶的B+树具有如下几个特征:

  1. 有k个子树的中间节点包含k个元素(B-树中是k-1个元素),每个元素不保存数据,只用来索引,所有数据都保存在叶子节点。
  2. 所有叶子节点中包含了全部元素的信息,及指向含这些元素记录的指针,且叶子节点本身 按关键字的大小自小二大的顺序链接。
  3. 所有的中间节点元素都同时存在于子节点,在子节点中是最大(或最小)元素。



              【    8 15    】
           /                   \
          /                     \
      【2 5 8】               【11 15】
     /    |     \              /      \ 
    /     |      \            /        \ 
   /      |       \          /          \
【1 2】->【3 5】->【6 8】->【9 11】--->【13 15】

在上面棵B+树中,根节点元素8是子节点【2 5 8】的最大元素,也是叶子节点【6 8】的最大元素
根节点元素15也是子节点【11 15】的最大元素,也是叶子节点的【13 15】的最大元素

需要注意的是,根节点的最大元素(这里是15),也就等同于整个B+树的最大元素。以后无论插入删除多少元素,始终要保持最大元素的根节点当中。

至于叶子节点,由于父节点的元素出现在子节点,因此所有叶子节点包含了全量元素信息。
并且每一个叶子节点都带有指向下一个节点的指针,形成了一个有序链表。

B+树还有一个特点,这个特点是在索引之外,确实是至关重要的特点,那就是【卫星数据】的位置。
所谓卫星数据,指的是索引元素所指向的数据记录,比如数据库中的某一行。在B-树中,无论中间节点还是叶子节点都带有卫星数据。

而在B+树中,只有叶子节点带有卫星数据,其余中间节点仅仅是索引,没有任何数据关联。
需要补充的是,在数据库的聚集索引(Clustered Index)中,叶子节点直接包含卫星数据。在非聚集索引(NonClustered Index)中,叶子节点带有指向卫星数据的指针。
B+树的好处主要体现在查询性能上:

  • 在单元素查询的时候,B+树会自顶向下逐层查找节点,最终找到匹配的叶子节点。 这跟B-树不一样的两点是,首先,B+树的中间节点没有卫星数据,所以同样大小的磁盘页可以容纳更多的节点元素。这意味,数据量相同的情况下,B+树的结果比B-树更加”矮胖“,因此查询时IO次数也更少。第二,B+树的查询必须最终查找到叶子节点,而B-树只要找到匹配元素即可,无论匹配元素处于中间节点还是在叶子节点,因此,B-树是查找性能并不稳定(最好情况是只查根节点,最坏情况是查到叶子节点)。而B+树的每一次查找都是稳定的。
  • 在范围查询的时候,B-树只能依靠繁琐的中序遍历。而B+树,很简单,只需要在链表上做遍历即可。

综合起来,B+树相比B-树的优势有三个:

  1. IO次数更少;
  2. 查询性能稳定;
  3. 范围查询简便。

Mysql的阶树计算

 我们可以来算一笔账,以InnoDB存储引擎中默认每个页的大小为16KB来计算,假设以int型的ID作为索引关键字,那么 一个int占用4byte,由上图可以知道还有其他的除主键以外的数据,姑且页当成4byte,那么这里就是8byte,那么16KB=161024byte,那么我们在这种场景下,可以定义这个B-Tree的阶树为 (161024)/8=2048.那么这个树将会有2048-1路,也就是原来平衡二叉树(两路)的1024倍左右,从而大大提高了查找效率与降低IO读写次数。

参考:https://www.cnblogs.com/wuzhe...

Mysql Innodb数据页

其实mysql数据页结构不是单纯16KB都给数据用,请参考https://www.cnblogs.com/bdsir...

看来无论是这里的页,还是操作系统内存的页page的概念,都是类似c语言的structure结构体的概念。

B+树索引的分裂优化

在4阶的B+树中(为了图好看直观,*代表是页面的可用空间)

   【1 3 * *】
     /     \
    /       \
   /         \
【1 2 * *】->【3 4 5 6】

插入记录7时,由于叶节点的页面(下文简称叶页面)中只能存放4条记录,插入记录7时,会导致叶页面分裂,产生一个新的叶页面。

           【1 3 5 *】
         /     |       \
        /      |        \
       /       |         \
      /        |          \
【1 2 * *】->【3 4 * *】->【5 6 7 *】

传统B+树页面裂变操作及分析:

  • 按照原页面中50%的数据量进行分裂,针对当前这个分裂操作,3,4记录保留在原有页面,5,6记录移动到新的页面。最后将新记录7插入到新的页面中;
  • 50%分裂策略的优势:

    • 分裂之后,亮哥页面的空间利用率是一样的;如果新的插入是随机在两个页面中挑选进行,那么下一个分裂的操作就会更晚触发。
  • 50%分裂策略的劣势:

    • 空间利用率不高:按照传统50%的页面分裂策略,索引页面的空间利用率在50%左右;
    • 分裂频率较大:针对如上所示的递增插入(递减插入),每新插入两条记录,就会导致最后的叶页面在此发送分裂。

由于传统50%分裂的策略有不足之处。因此,针对B+树索引的递增/递减插入进行了优化(目前所有的关系型数据库,包括Oracle/InnoDB/PostgreSQL)。经过优化,上述B+树索引,在记录6插入完毕,记录7插入引起分裂之后,新的B+树结构如下:

           【1 3 5 *】
         /     |       \
        /      |        \
       /       |         \
      /        |          \
【1 2 * *】->【3 4 5 6】->【7 * * *】

对比上下两个插入记录7之后,B+树索引的结构图,可以发现二者有很多不同之处:

  • 新的分裂策略,在插入7时,不移动原有页面的任何记录,只是将新插入的记录7写到新的页面中。
  • 原有页面的利用率,仍旧是100%;
  • 优化分裂策略的优势:

    • 索引分裂的代价小:不需要移动记录;
    • 索引分裂的概率降低:如果接下来的插入,仍旧是递增插入,那么需要插入4条记录,才能再次引起页面的分裂。50%分裂策略,分裂的概念降低了一半。
    • 索引页面的空间利用率提高:新的分裂策略,能够保证分裂前的页面,仍旧保持100%的利用率,提高索引的空间利用率。
  • 优化分裂策略的优势:

    • 如果新的插入,不再满足递增插入的条件,而是插入到原有页面,那么就会导致原有页面在此分裂,增加分裂的概率。

因此,此分裂的优化策略,仅仅是针对递增递减插入有效,针对随机插入,就是去了优化的意义,反而带来更高的分裂概率。
在InnoDB的实现中,为每个索引页面维护了一个上次插入的位置,以及上次的插入是递增/递减的标识。根据这些信息,InnoDB能够判断出新插入的页面中的记录,是否仍旧满足递增/递减的约束,若满足约束,则采用优化后的分裂策略;若不满足越苏,则退回到50%的分类策略。这里提下,递增和递减,指的是趋势,所以Snowflake算法生成的序列是满足递增的约束。

查看原文

赞 2 收藏 1 评论 0

黄小数 赞了问题 · 2019-02-25

gitlab git clone 提示我输入密码,我第一不知道这个密码啊。如何解决

我网上看

git clone git@192.168.1.235:operation/ihaozhuo.git
Cloning into 'ihaozhuo'...
git@192.168.1.235's password:
Permission denied, please try again.
git@192.168.1.235's password:
Permission denied, please try again.
git@192.168.1.235's password:
Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).
输入密码 提示密码错误。

我ssh -vvv git@192.168.1.235
就报这个

然后我登陆到gitlab服务器修改了git 密码,就报没有权限访问。
是什么原因。我已经把本地的key 拷贝到gitlab 自己的ssh-key 上面了。

debug2: we did not send a packet, disable method
debug3: authmethod_lookup password
debug3: remaining preferred: ,password
debug3: authmethod_is_enabled password
debug1: Next authentication method: password
git@192.168.1.235's password:
debug2: we sent a password packet, wait for reply
debug1: Authentications that can continue: publickey,gssapi-keyex,gssapi-with-mic,password
Permission denied, please try again.
git@192.168.1.235's password:
debug2: we sent a password packet, wait for reply
debug1: Authentications that can continue: publickey,gssapi-keyex,gssapi-with-mic,password
Permission denied, please try again.
git@192.168.1.235's password:

哪位好心人帮帮忙,我看下/etc/password

git:x:497:497:GitLab:/home/git/:/bin/bash

然后修改了

git:x:497:497:GitLab:/home/git/:/bin/git-shell

也不行,需要重启gitlab吗?

可是我重启了也不行啊

关注 6 回答 6

黄小数 收藏了问题 · 2019-02-25

gitlab git clone 提示我输入密码,我第一不知道这个密码啊。如何解决

我网上看

git clone git@192.168.1.235:operation/ihaozhuo.git
Cloning into 'ihaozhuo'...
git@192.168.1.235's password:
Permission denied, please try again.
git@192.168.1.235's password:
Permission denied, please try again.
git@192.168.1.235's password:
Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).
输入密码 提示密码错误。

我ssh -vvv git@192.168.1.235
就报这个

然后我登陆到gitlab服务器修改了git 密码,就报没有权限访问。
是什么原因。我已经把本地的key 拷贝到gitlab 自己的ssh-key 上面了。

debug2: we did not send a packet, disable method
debug3: authmethod_lookup password
debug3: remaining preferred: ,password
debug3: authmethod_is_enabled password
debug1: Next authentication method: password
git@192.168.1.235's password:
debug2: we sent a password packet, wait for reply
debug1: Authentications that can continue: publickey,gssapi-keyex,gssapi-with-mic,password
Permission denied, please try again.
git@192.168.1.235's password:
debug2: we sent a password packet, wait for reply
debug1: Authentications that can continue: publickey,gssapi-keyex,gssapi-with-mic,password
Permission denied, please try again.
git@192.168.1.235's password:

哪位好心人帮帮忙,我看下/etc/password

git:x:497:497:GitLab:/home/git/:/bin/bash

然后修改了

git:x:497:497:GitLab:/home/git/:/bin/git-shell

也不行,需要重启gitlab吗?

可是我重启了也不行啊

认证与成就

  • 获得 50 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 3 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-11-18
个人主页被 2k 人浏览