Jetty线程“互锁”导致数据传输性能降低问题分析

新浪微博 QQ空间

以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111

首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:

pool

Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。

当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。

具体的代码如下:

/* ------------------------------------------------------------ */
/*
 * Allows thread to block waiting for further events.
 */
@Override
public boolean blockWritable(long timeoutMs) throws IOException
{
    synchronized (this)
    {
        long start=_selectSet.getNow();
        try
        {   
            _writeBlocked=true;
            while (isOpen() && _writeBlocked)
            {
                try
                {
                    updateKey();
                    this.wait(timeoutMs);

                    timeoutMs -= _selectSet.getNow()-start;
                    if (_writeBlocked && timeoutMs<=0)
                        return false;
                }
                catch (InterruptedException e)
                {
                    Log.warn(e);
                }
            }
        }
        finally
        {
            _writeBlocked=false;
            if (_idleTimestamp!=-1)
                scheduleIdle();
        }
    }
    return true;
}

而主线程的select主循环的代码如下:

/* ------------------------------------------------------------ */
/**
 * Select and dispatch tasks found from changes and the selector.
 * 
 * @throws IOException
 */
public void doSelect() throws IOException
{
    try
    {
        _selecting=Thread.currentThread();
        final Selector selector=_selector;

        // Make any key changes required
        Object change;
        int changes=_changes.size();
        while (changes-->0 && (change=_changes.poll())!=null)
        {
            try
            {
                if (change instanceof EndPoint)
                {
                    // Update the operations for a key.
                    SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
                    endpoint.doUpdateKey();
                }
                else if (change instanceof ChannelAndAttachment)
                {
                    // finish accepting/connecting this connection
                    final ChannelAndAttachment asc = (ChannelAndAttachment)change;
                    final SelectableChannel channel=asc._channel;
                    final Object att = asc._attachment;
                    SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
                    SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
                    key.attach(endpoint);
                    endpoint.schedule();
                }
                else if (change instanceof SocketChannel)
                {
                    // Newly registered channel
                    final SocketChannel channel=(SocketChannel)change;
                    SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
                    SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                    key.attach(endpoint);
                    endpoint.schedule();
                }
                else if (change instanceof Runnable)
                {
                    dispatch((Runnable)change);
                }
                else
                    throw new IllegalArgumentException(change.toString());
            }
            catch (Exception e)
            {
                if (isRunning())
                    Log.warn(e);
                else
                    Log.debug(e);
            }
            catch (Error e)
            {
                if (isRunning())
                    Log.warn(e);
                else
                    Log.debug(e);
            }
        }


        // Do and instant select to see if any connections can be handled.
        int selected=selector.selectNow();
        _selects++;

        long now=System.currentTimeMillis();
        
        // if no immediate things to do
        if (selected==0)
        {
            // If we are in pausing mode
            if (_pausing)
            {
                try
                {
                    Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
                }
                catch(InterruptedException e)
                {
                    Log.ignore(e);
                }
                now=System.currentTimeMillis();
            }

            // workout how long to wait in select
            _timeout.setNow(now);
            long to_next_timeout=_timeout.getTimeToNext();

            long wait = _changes.size()==0?__IDLE_TICK:0L;  
            if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
                wait = to_next_timeout;

            // If we should wait with a select
            if (wait>0)
            {
                long before=now;
                selected=selector.select(wait);
                _selects++;
                now = System.currentTimeMillis();
                _timeout.setNow(now);
                checkJvmBugs(before, now, wait, selected);
            }
        }
        
        // have we been destroyed while sleeping
        if (_selector==null || !selector.isOpen())
            return;

        // Look for things to do
        for (SelectionKey key: selector.selectedKeys())
        {   
            try
            {
                if (!key.isValid())
                {
                    key.cancel();
                    SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
                    if (endpoint != null)
                        endpoint.doUpdateKey();
                    continue;
                }

                Object att = key.attachment();
                if (att instanceof SelectChannelEndPoint)
                {
                    ((SelectChannelEndPoint)att).schedule();
                }
                else
                {
                    // Wrap readable registered channel in an endpoint
                    SocketChannel channel = (SocketChannel)key.channel();
                    SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                    key.attach(endpoint);
                    if (key.isReadable())
                        endpoint.schedule();                           
                }
                key = null;
            }
            catch (CancelledKeyException e)
            {
                Log.ignore(e);
            }
            catch (Exception e)
            {
                if (isRunning())
                    Log.warn(e);
                else
                    Log.ignore(e);

                if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
                    key.cancel();
            }
        }
        
        // Everything always handled
        selector.selectedKeys().clear();
        
        now=System.currentTimeMillis();
        _timeout.setNow(now);
        Task task = _timeout.expired();
        while (task!=null)
        {
            if (task instanceof Runnable)
                dispatch((Runnable)task);
            task = _timeout.expired();
        }

        // Idle tick
        if (now-_idleTick>__IDLE_TICK)
        {
            _idleTick=now;
            
            final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
                ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
                :now;
                
            dispatch(new Runnable()
            {
                public void run()
                {
                    for (SelectChannelEndPoint endp:_endPoints.keySet())
                    {
                        endp.checkIdleTimestamp(idle_now);
                    }
                }
            });
        }
    }
    catch (CancelledKeyException e)
    {
        Log.ignore(e);
    }
    finally
    {
        _selecting=null;
    }
}

/* ------------------------------------------------------------ */
private void checkJvmBugs(long before, long now, long wait, int selected)
    throws IOException
{
    Selector selector = _selector;
    if (selector==null)
        return;
        
    // Look for JVM bugs over a monitor period.
    // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
    // http://bugs.sun.com/view_bug.do?bug_id=6693490
    if (now>_monitorNext)
    {
        _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
        _pausing=_selects>__MAX_SELECTS;
        if (_pausing)
            _paused++;

        _selects=0;
        _jvmBug=0;
        _monitorStart=now;
        _monitorNext=now+__MONITOR_PERIOD;
    }

    if (now>_log)
    {
        if (_paused>0)  
            Log.debug(this+" Busy selector - injecting delay "+_paused+" times");

        if (_jvmFix2>0)
            Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");

        if (_jvmFix1>0)
            Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");

        else if(Log.isDebugEnabled() && _jvmFix0>0)
            Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
        _paused=0;
        _jvmFix2=0;
        _jvmFix1=0;
        _jvmFix0=0;
        _log=now+60000;
    }

    // If we see signature of possible JVM bug, increment count.
    if (selected==0 && wait>10 && (now-before)<(wait/2))
    {
        // Increment bug count and try a work around
        _jvmBug++;
        if (_jvmBug>(__JVMBUG_THRESHHOLD))
        {
            try
            {
                if (_jvmBug==__JVMBUG_THRESHHOLD+1)
                    _jvmFix2++;

                Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
            }
            catch(InterruptedException e)
            {
                Log.ignore(e);
            }
        }
        else if (_jvmBug==__JVMBUG_THRESHHOLD)
        {
            synchronized (this)
            {
                // BLOODY SUN BUG !!!  Try refreshing the entire selector.
                final Selector new_selector = Selector.open();
                for (SelectionKey k: selector.keys())
                {
                    if (!k.isValid() || k.interestOps()==0)
                        continue;

                    final SelectableChannel channel = k.channel();
                    final Object attachment = k.attachment();

                    if (attachment==null)
                        addChange(channel);
                    else
                        addChange(channel,attachment);
                }
                _selector.close();
                _selector=new_selector;
                return;
            }
        }
        else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
        {
            // Cancel keys with 0 interested ops
            int cancelled=0;
            for (SelectionKey k: selector.keys())
            {
                if (k.isValid()&&k.interestOps()==0)
                {
                    k.cancel();
                    cancelled++;
                }
            }
            if (cancelled>0)
                _jvmFix0++;

            return;
        }
    }
    else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
    {
        // Look for busy key
        SelectionKey busy = selector.selectedKeys().iterator().next();
        if (busy==_busyKey)
        {
            if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
            {
                final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
                Log.warn("Busy Key "+busy.channel()+" "+endpoint);
                busy.cancel();
                if (endpoint!=null)
                {
                    dispatch(new Runnable()
                    {
                        public void run()
                        {
                            try
                            {
                                endpoint.close();
                            }
                            catch (IOException e)
                            {
                                Log.ignore(e);
                            }
                        }
                    });
                }
            }
        }
        else
            _busyKeyCount=0;
        _busyKey=busy;
    }
}

在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。

问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。

修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。

分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。

问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。

新浪微博 QQ空间

| 1 分2 分3 分4 分5 分 (5.00- 16票) Loading ... Loading ... | 这篇文章归档在:Jetty | 标签: . | 永久链接:链接 | 评论(0) |

评论

邮箱地址不会被泄露, 标记为 * 的项目必填。

8 - 2 = *



You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <img alt="" src="" class=""> <pre class=""> <q cite=""> <s> <strike> <strong>

返回顶部