Jetty线程“互锁”导致数据传输性能降低问题分析

新浪微博 QQ空间 腾讯微博

以下分析针对jetty的特定版本:http://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/tags/jetty-7.2.1.v20101111

首先介绍一下Jetty的反映器模型,Jetty用的经典的NIO异步模型(Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。连接管理的示意图如下:

pool

Jetty在使用这个模型的时候,做了一些改动,acceptor是独立出来的一个阻塞线程,用于阻塞地接受新的连接请求,而所有的连接建立之后,都会想selector线程注册网络事件和内部的事件(changes),selector需要同时处理网络事件和内部的changes。同时还要定期检查超时的链接。

当一个连接建立之后,除了分发网络事件之外,主线程还会与子线程有一些交互。当子线程发现网络拥塞,缓冲区的数据无法及时刷走时,会注册一个表明自己处于阻塞状态的内部事件,并且期望主线程在发现拥塞解除的时候能通知到自己。

具体的代码如下:

/* ------------------------------------------------------------ */
/*
* Allows thread to block waiting for further events.
*/
@Override
public boolean blockWritable(long timeoutMs) throws IOException
{
synchronized (this)
{
long start=_selectSet.getNow();
try
{
_writeBlocked=true;
while (isOpen() && _writeBlocked)
{
try
{
updateKey();
this.wait(timeoutMs);

timeoutMs -= _selectSet.getNow()-start;
if (_writeBlocked && timeoutMs<=0)
return false;
}
catch (InterruptedException e)
{
Log.warn(e);
}
}
}
finally
{
_writeBlocked=false;
if (_idleTimestamp!=-1)
scheduleIdle();
}
}
return true;
}

而主线程的select主循环的代码如下:

/* ------------------------------------------------------------ */
/**
* Select and dispatch tasks found from changes and the selector.
*
* @throws IOException
*/
public void doSelect() throws IOException
{
try
{
_selecting=Thread.currentThread();
final Selector selector=_selector;

// Make any key changes required
Object change;
int changes=_changes.size();
while (changes-->0 && (change=_changes.poll())!=null)
{
try
{
if (change instanceof EndPoint)
{
// Update the operations for a key.
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
endpoint.doUpdateKey();
}
else if (change instanceof ChannelAndAttachment)
{
// finish accepting/connecting this connection
final ChannelAndAttachment asc = (ChannelAndAttachment)change;
final SelectableChannel channel=asc._channel;
final Object att = asc._attachment;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (change instanceof SocketChannel)
{
// Newly registered channel
final SocketChannel channel=(SocketChannel)change;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (change instanceof Runnable)
{
dispatch((Runnable)change);
}
else
throw new IllegalArgumentException(change.toString());
}
catch (Exception e)
{
if (isRunning())
Log.warn(e);
else
Log.debug(e);
}
catch (Error e)
{
if (isRunning())
Log.warn(e);
else
Log.debug(e);
}
}


// Do and instant select to see if any connections can be handled.
int selected=selector.selectNow();
_selects++;

long now=System.currentTimeMillis();

// if no immediate things to do
if (selected==0)
{
// If we are in pausing mode
if (_pausing)
{
try
{
Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
now=System.currentTimeMillis();
}

// workout how long to wait in select
_timeout.setNow(now);
long to_next_timeout=_timeout.getTimeToNext();

long wait = _changes.size()==0?__IDLE_TICK:0L;
if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
wait = to_next_timeout;

// If we should wait with a select
if (wait>0)
{
long before=now;
selected=selector.select(wait);
_selects++;
now = System.currentTimeMillis();
_timeout.setNow(now);
checkJvmBugs(before, now, wait, selected);
}
}

// have we been destroyed while sleeping
if (_selector==null || !selector.isOpen())
return;

// Look for things to do
for (SelectionKey key: selector.selectedKeys())
{
try
{
if (!key.isValid())
{
key.cancel();
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
if (endpoint != null)
endpoint.doUpdateKey();
continue;
}

Object att = key.attachment();
if (att instanceof SelectChannelEndPoint)
{
((SelectChannelEndPoint)att).schedule();
}
else
{
// Wrap readable registered channel in an endpoint
SocketChannel channel = (SocketChannel)key.channel();
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
if (key.isReadable())
endpoint.schedule();
}
key = null;
}
catch (CancelledKeyException e)
{
Log.ignore(e);
}
catch (Exception e)
{
if (isRunning())
Log.warn(e);
else
Log.ignore(e);

if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
key.cancel();
}
}

// Everything always handled
selector.selectedKeys().clear();

now=System.currentTimeMillis();
_timeout.setNow(now);
Task task = _timeout.expired();
while (task!=null)
{
if (task instanceof Runnable)
dispatch((Runnable)task);
task = _timeout.expired();
}

// Idle tick
if (now-_idleTick>__IDLE_TICK)
{
_idleTick=now;

final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
:now;

dispatch(new Runnable()
{
public void run()
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
{
endp.checkIdleTimestamp(idle_now);
}
}
});
}
}
catch (CancelledKeyException e)
{
Log.ignore(e);
}
finally
{
_selecting=null;
}
}

/* ------------------------------------------------------------ */
private void checkJvmBugs(long before, long now, long wait, int selected)
throws IOException
{
Selector selector = _selector;
if (selector==null)
return;

// Look for JVM bugs over a monitor period.
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
// http://bugs.sun.com/view_bug.do?bug_id=6693490
if (now>_monitorNext)
{
_selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
_pausing=_selects>__MAX_SELECTS;
if (_pausing)
_paused++;

_selects=0;
_jvmBug=0;
_monitorStart=now;
_monitorNext=now+__MONITOR_PERIOD;
}

if (now>_log)
{
if (_paused>0)
Log.debug(this+" Busy selector - injecting delay "+_paused+" times");

if (_jvmFix2>0)
Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");

if (_jvmFix1>0)
Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");

else if(Log.isDebugEnabled() && _jvmFix0>0)
Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
_paused=0;
_jvmFix2=0;
_jvmFix1=0;
_jvmFix0=0;
_log=now+60000;
}

// If we see signature of possible JVM bug, increment count.
if (selected==0 && wait>10 && (now-before)<(wait/2))
{
// Increment bug count and try a work around
_jvmBug++;
if (_jvmBug>(__JVMBUG_THRESHHOLD))
{
try
{
if (_jvmBug==__JVMBUG_THRESHHOLD+1)
_jvmFix2++;

Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
}
else if (_jvmBug==__JVMBUG_THRESHHOLD)
{
synchronized (this)
{
// BLOODY SUN BUG !!! Try refreshing the entire selector.
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;

final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();

if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
return;
}
}
else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
{
// Cancel keys with 0 interested ops
int cancelled=0;
for (SelectionKey k: selector.keys())
{
if (k.isValid()&&k.interestOps()==0)
{
k.cancel();
cancelled++;
}
}
if (cancelled>0)
_jvmFix0++;

return;
}
}
else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
{
// Look for busy key
SelectionKey busy = selector.selectedKeys().iterator().next();
if (busy==_busyKey)
{
if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
{
final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
Log.warn("Busy Key "+busy.channel()+" "+endpoint);
busy.cancel();
if (endpoint!=null)
{
dispatch(new Runnable()
{
public void run()
{
try
{
endpoint.close();
}
catch (IOException e)
{
Log.ignore(e);
}
}
});
}
}
}
else
_busyKeyCount=0;
_busyKey=busy;
}
}

在checkjvmbugs的方法中,当主线程正在阻塞selector.select(wait)时,子线程出现blockwrite的情况,会导致主线程被惊醒,使得主线程select到的事件数为0,当selector管理大量的连接时,会出现1秒内上百次主线程被惊醒,最终在checkjvmbugs方法内被当作是jvmbug来处理了,强制让主线程休眠50毫秒,而此时还有大量的线程注册阻塞事件,并等待主线程的唤醒。最终一个12K的数据花费了50ms的时间来写往客户端,最终导致性能下降。

问题已经很清晰,jetty自身的阻塞-唤醒机制被当作jvmbugs来处理,导致数据传输性能受影响,这样的情况也只有在有大量连接和数据传输的时候才会体现出来。

修改方案,可以将checkjvmbugs调用直接注释掉,保留每秒select 25000次后休眠一段时间的实现即可。也可以将50毫秒的休眠时间调短来避免阻塞时间过长的问题。

分析最新的jetty代码,发现这个checkjvmbugs的调用已经去掉了,正是按照方案1来实现的。

问题明确了之后,看似比较简单,但实际上发现问题的过程是很漫长和曲折的,在问题已经锁定在上面之后,有幸参与最终的定位并且分析出了具体的问题点,问题找到的那一刻,有一种兴奋感与成就感。

新浪微博 QQ空间 腾讯微博

| 1 分2 分3 分4 分5 分 (5.00- 16票)
Loading ... Loading ... | 这篇文章归档在:Jetty
| 标签: . | 永久链接:链接 | 评论(0) |

评论

邮箱地址不会被泄露, 标记为 * 的项目必填。

8 - 2 = *



You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <img alt="" src="" class=""> <pre class=""> <q cite=""> <s> <strike> <strong>

返回顶部