Jetty线程“互锁”导致数据传输性能降低问题分析
以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111
首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:
Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。
当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。
具体的代码如下:
/* ------------------------------------------------------------ */ /* * Allows thread to block waiting for further events. */ @Override public boolean blockWritable(long timeoutMs) throws IOException { synchronized (this) { long start=_selectSet.getNow(); try { _writeBlocked=true; while (isOpen() && _writeBlocked) { try { updateKey(); this.wait(timeoutMs); timeoutMs -= _selectSet.getNow()-start; if (_writeBlocked && timeoutMs<=0) return false; } catch (InterruptedException e) { Log.warn(e); } } } finally { _writeBlocked=false; if (_idleTimestamp!=-1) scheduleIdle(); } } return true; }
而主线程的select主循环的代码如下:
/* ------------------------------------------------------------ */ /** * Select and dispatch tasks found from changes and the selector. * * @throws IOException */ public void doSelect() throws IOException { try { _selecting=Thread.currentThread(); final Selector selector=_selector; // Make any key changes required Object change; int changes=_changes.size(); while (changes-->0 && (change=_changes.poll())!=null) { try { if (change instanceof EndPoint) { // Update the operations for a key. SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; endpoint.doUpdateKey(); } else if (change instanceof ChannelAndAttachment) { // finish accepting/connecting this connection final ChannelAndAttachment asc = (ChannelAndAttachment)change; final SelectableChannel channel=asc._channel; final Object att = asc._attachment; SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); key.attach(endpoint); endpoint.schedule(); } else if (change instanceof SocketChannel) { // Newly registered channel final SocketChannel channel=(SocketChannel)change; SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); } else if (change instanceof Runnable) { dispatch((Runnable)change); } else throw new IllegalArgumentException(change.toString()); } catch (Exception e) { if (isRunning()) Log.warn(e); else Log.debug(e); } catch (Error e) { if (isRunning()) Log.warn(e); else Log.debug(e); } } // Do and instant select to see if any connections can be handled. int selected=selector.selectNow(); _selects++; long now=System.currentTimeMillis(); // if no immediate things to do if (selected==0) { // If we are in pausing mode if (_pausing) { try { Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop } catch(InterruptedException e) { Log.ignore(e); } now=System.currentTimeMillis(); } // workout how long to wait in select _timeout.setNow(now); long to_next_timeout=_timeout.getTimeToNext(); long wait = _changes.size()==0?__IDLE_TICK:0L; if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) wait = to_next_timeout; // If we should wait with a select if (wait>0) { long before=now; selected=selector.select(wait); _selects++; now = System.currentTimeMillis(); _timeout.setNow(now); checkJvmBugs(before, now, wait, selected); } } // have we been destroyed while sleeping if (_selector==null || !selector.isOpen()) return; // Look for things to do for (SelectionKey key: selector.selectedKeys()) { try { if (!key.isValid()) { key.cancel(); SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); if (endpoint != null) endpoint.doUpdateKey(); continue; } Object att = key.attachment(); if (att instanceof SelectChannelEndPoint) { ((SelectChannelEndPoint)att).schedule(); } else { // Wrap readable registered channel in an endpoint SocketChannel channel = (SocketChannel)key.channel(); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); if (key.isReadable()) endpoint.schedule(); } key = null; } catch (CancelledKeyException e) { Log.ignore(e); } catch (Exception e) { if (isRunning()) Log.warn(e); else Log.ignore(e); if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) key.cancel(); } } // Everything always handled selector.selectedKeys().clear(); now=System.currentTimeMillis(); _timeout.setNow(now); Task task = _timeout.expired(); while (task!=null) { if (task instanceof Runnable) dispatch((Runnable)task); task = _timeout.expired(); } // Idle tick if (now-_idleTick>__IDLE_TICK) { _idleTick=now; final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) :now; dispatch(new Runnable() { public void run() { for (SelectChannelEndPoint endp:_endPoints.keySet()) { endp.checkIdleTimestamp(idle_now); } } }); } } catch (CancelledKeyException e) { Log.ignore(e); } finally { _selecting=null; } } /* ------------------------------------------------------------ */ private void checkJvmBugs(long before, long now, long wait, int selected) throws IOException { Selector selector = _selector; if (selector==null) return; // Look for JVM bugs over a monitor period. // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 // http://bugs.sun.com/view_bug.do?bug_id=6693490 if (now>_monitorNext) { _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart)); _pausing=_selects>__MAX_SELECTS; if (_pausing) _paused++; _selects=0; _jvmBug=0; _monitorStart=now; _monitorNext=now+__MONITOR_PERIOD; } if (now>_log) { if (_paused>0) Log.debug(this+" Busy selector - injecting delay "+_paused+" times"); if (_jvmFix2>0) Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); if (_jvmFix1>0) Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); else if(Log.isDebugEnabled() && _jvmFix0>0) Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); _paused=0; _jvmFix2=0; _jvmFix1=0; _jvmFix0=0; _log=now+60000; } // If we see signature of possible JVM bug, increment count. if (selected==0 && wait>10 && (now-before)<(wait/2)) { // Increment bug count and try a work around _jvmBug++; if (_jvmBug>(__JVMBUG_THRESHHOLD)) { try { if (_jvmBug==__JVMBUG_THRESHHOLD+1) _jvmFix2++; Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop } catch(InterruptedException e) { Log.ignore(e); } } else if (_jvmBug==__JVMBUG_THRESHHOLD) { synchronized (this) { // BLOODY SUN BUG !!! Try refreshing the entire selector. final Selector new_selector = Selector.open(); for (SelectionKey k: selector.keys()) { if (!k.isValid() || k.interestOps()==0) continue; final SelectableChannel channel = k.channel(); final Object attachment = k.attachment(); if (attachment==null) addChange(channel); else addChange(channel,attachment); } _selector.close(); _selector=new_selector; return; } } else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops { // Cancel keys with 0 interested ops int cancelled=0; for (SelectionKey k: selector.keys()) { if (k.isValid()&&k.interestOps()==0) { k.cancel(); cancelled++; } } if (cancelled>0) _jvmFix0++; return; } } else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS) { // Look for busy key SelectionKey busy = selector.selectedKeys().iterator().next(); if (busy==_busyKey) { if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel)) { final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment(); Log.warn("Busy Key "+busy.channel()+" "+endpoint); busy.cancel(); if (endpoint!=null) { dispatch(new Runnable() { public void run() { try { endpoint.close(); } catch (IOException e) { Log.ignore(e); } } }); } } } else _busyKeyCount=0; _busyKey=busy; } }
在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。
问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。
修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。
分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。
问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。