基于netlink的内核态与用户态异步并发消息处理模型

新浪微博 QQ空间

用户态采用select模型,初始化时建立多个netlinksocket,绑定完成之后,向内核发送握手消息,这样内核可以将已经建立的连接记住,以便后续选择可用的连接发送数据。初始化和握手完成之后,由内核主动向用户态发送数据,用户态主线程在各个socket句柄上面等待读事件的到来,当检测到读事件时,向线程池提交数据读取和处理任务。这样模拟一个连接池和事件分发模型,保证内核数据及时被读取到用户态程序并处理,能做到并发。

而内核态的netlink在接收数据时本身就是以系统调用的方式提供给业务层的发送接口,因此本身就是异步的,性能不是问题。内核态收到数据时,只需要提交给一个内核线程去处理即可。

原型代码如下:

共用头文件

#define NETLINK_TEST 21
#define MAX_DATA_LEN (768)

#define MAX_PROCESS_COUNT 100

#define MAX_PAYLOAD 1024
#define MAX_PID_COUNT MAX_PROCESS_COUNT
#define MAX_REC_DATA_LEN 1024

#define STATIC_PEROID 1024*500

#define MSG_COUNT 10000

#define TRUE 1

用户态

#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <string.h>
#include <asm/types.h>
#include <linux/netlink.h>
#include <linux/socket.h>
#include <sys/select.h>
#include <pthread.h>
#include "conf.h"

struct endpoint
{
unsigned int pid;
struct sockaddr_nl src_addr;
struct sockaddr_nl dest_addr;
int sock_fd;
struct msghdr msg;
} *endpoits = NULL;

int listen_user(void);
int handwithknl(void);
int close_user(void);
int doselect(struct timeval wait);
void* threadProc(void* ed);
void* sendThreadProc(void* arg);
void flushcount(int len);
void staticsout(void);
void startSendThreads(void);
static int maxfd = 0;
static fd_set rset;
static struct timeval tmout;

unsigned char* readbuffer;

static struct timeval tmnow;
static struct timeval oldtime;
static long reccount = 0;

pthread_mutex_t mutex;
static long readbytes = 0;
static long readnum = 0;
static long sendbytes = 0;

int main(int argc, char* argv[])
{
endpoits = (struct endpoint*)malloc(sizeof(struct endpoint) * MAX_PID_COUNT);

readbuffer = (unsigned char*)malloc(MAX_REC_DATA_LEN);

// create an netlink socket and bind.
listen_user();

if (pthread_mutex_init(&mutex, NULL) != 0 )
{
printf("Init metux error.\n");
return -1;
}

// send a handshake msg to the knl. let the knl to see this client.
handwithknl();

// sleep(6);

// startSendThreads();

gettimeofday(&oldtime, NULL);
tmout.tv_sec = 1;
tmout.tv_usec = 0;
// wait for event from the knl to dispach.
while (1)
{
doselect(tmout);

if (readbytes > STATIC_PEROID)
{
staticsout();
}

// sleep(2);
}

// close the socket.
close_user();
return 0;
}

int listen_user(void)
{
int pidcount;
struct endpoint* ed = NULL;
struct nlmsghdr* nlh = NULL;
unsigned int pid = getpid();

for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount++)
{
ed = endpoits + pidcount;
memset((void*)ed, 0, sizeof(struct endpoint));

ed->sock_fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_TEST);
ed->pid = pid + pidcount;
if (0 == ed->pid)
{
pid = ++ed->pid;
}

ed->src_addr.nl_family = AF_NETLINK;
ed->src_addr.nl_groups = 0;
ed->src_addr.nl_pid = ed->pid;

//TODO may be the src_addr.nl_pid is already in used, the bind will return a nonezero value.
bind(ed->sock_fd, (struct sockaddr *) &ed->src_addr, sizeof(ed->src_addr));

ed->dest_addr.nl_family = AF_NETLINK;
ed->dest_addr.nl_pid = 0;
ed->dest_addr.nl_groups = 0;

/* Fill in the netlink message payload */
ed->msg.msg_name = (void *) &ed->dest_addr;
ed->msg.msg_namelen = sizeof(ed->dest_addr);

printf("init the socket %d of pid %d successful.\n", ed->sock_fd, ed->pid);
usleep(10000);
}

return 0;

}

int handwithknl(void)
{
int pidcount;
struct endpoint* ed = NULL;
struct nlmsghdr* nlh = NULL;

for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount++)
{
ed = endpoits + pidcount;
ed->msg.msg_iovlen = 1;
ed->msg.msg_iov = malloc(sizeof(struct iovec));
ed->msg.msg_iov->iov_base = malloc(NLMSG_SPACE(MAX_PAYLOAD));
nlh = (struct nlmsghdr*)ed->msg.msg_iov->iov_base;
nlh->nlmsg_len = NLMSG_SPACE(MAX_PAYLOAD);
nlh->nlmsg_pid = ed->pid;
nlh->nlmsg_flags = 0;
ed->msg.msg_iov->iov_len = nlh->nlmsg_len;
snprintf((char*)NLMSG_DATA(nlh), MAX_PAYLOAD - 1, "Hello knl! This is %d!", ed->pid);

// printf(" Sending message from . ...\n", ed->pid);
sendmsg(ed->sock_fd, &ed->msg, 0);

if (ed->sock_fd > maxfd)
{
maxfd = ed->sock_fd;
}

FD_SET(ed->sock_fd, &rset);

// pthread_t tid;
// if (0 == pthread_create(&tid, NULL, &threadProc, (void*)ed))
// {
// printf("create a thread %u successful for the pid: %d. \n", tid, ed->pid);
// }
}

return 0;
}

int close_user(void)
{
int pidcount;

for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount++)
{
close(endpoits[pidcount].sock_fd);
}

return 0;
}

int doselect(struct timeval wait)
{
int pidcount;
int selcount = 0;
struct endpoint* ed = NULL;
struct nlmsghdr* nlh = NULL;

selcount = select(maxfd + 1, &rset, NULL, NULL, &wait);

if (selcount == 0)
{
return 0;
}
else if (selcount < 0)
{
printf("selected error!\n");
return -1;
}
else
{
for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount++)
{
ed = endpoits + pidcount;
if (FD_ISSET(ed->sock_fd, &rset))
{
int count = 0;
int msglen = -1;
int readstatus = 0;
memset(readbuffer, 0, MAX_REC_DATA_LEN);
ed->msg.msg_iov->iov_base = (void*)readbuffer;
nlh = (struct nlmsghdr*)ed->msg.msg_iov->iov_base;

while (TRUE)
{
memset(readbuffer, 0, MAX_REC_DATA_LEN);
readstatus = recvmsg(ed->sock_fd, &ed->msg, 0);

if (readstatus == -1)
{
printf("recieved error! %d \n", ed->sock_fd);
break;
}
else if (readstatus == 0)
{
printf("recieved error peer orderly shutdown! %d \n", ed->sock_fd);
break;
}
else
{
count += readstatus;
// printf("count %d\n", count);
}

if (msglen == -1 && count >= 16)
{
msglen = nlh->nlmsg_len;
// printf("msg len is: %u\n", msglen);
}

if (msglen != -1 && count == msglen)
{
// printf("success read a msg: %u\n", count);

// pData = (int*)NLMSG_DATA(nlh);
// for (i = 0; i != MAX_DATA_LEN/4; i++)
// {
// printf("%d,", *(pData + i));
// }
// printf("\n");

readnum++;
// printf("readnum is %d\n", readnum);
break;
}
}

// printf("received %u bytes.\n", count);
// printf("received %d bytes, the peer pid is %d, the local pid is %d.\n", nlh->nlmsg_len, nlh->nlmsg_pid, ed->pid);
flushcount(count);
}
FD_CLR(ed->sock_fd, &rset);
FD_SET(ed->sock_fd, &rset);
}
}

// printf("select one time.\n");
return 0;
}

void* sendThreadProc(void* arg)
{
int readcount = 0;
struct endpoint* ed;
struct nlmsghdr* nlh;
fd_set rset_c;
struct timeval wait_time;
int val;
int i;

ed = (struct endpoint*)arg;
ed->msg.msg_iov->iov_base = malloc(MAX_DATA_LEN);

wait_time.tv_sec = 2;
wait_time.tv_usec = 0;

printf("send data thread [%d] start!\n", ed->pid);

for (i = 0; i != MSG_COUNT; i++)
{
nlh = (struct nlmsghdr*)ed->msg.msg_iov->iov_base;
nlh->nlmsg_len = NLMSG_SPACE(MAX_DATA_LEN);
nlh->nlmsg_pid = ed->pid;
nlh->nlmsg_flags = 0;
ed->msg.msg_iov->iov_len = nlh->nlmsg_len;
readcount = sendmsg(ed->sock_fd, &ed->msg, MSG_DONTWAIT);
flushcount(readcount);
}
printf("thread %d end!\n", ed->pid);
}



void* threadProc(void* arg)
{
int readcount = 0;
struct endpoint* ed;
struct nlmsghdr* nlh;
fd_set rset_c;
struct timeval wait_time;
int val;

ed = (struct endpoint*)arg;
ed->msg.msg_iov->iov_base = malloc(MAX_DATA_LEN);

wait_time.tv_sec = 2;
wait_time.tv_usec = 0;

printf("thread %d start!\n", ed->pid);

for (;;)
{
FD_ZERO(&rset_c);
FD_SET(ed->sock_fd, &rset_c);
select(ed->sock_fd + 1, &rset_c, NULL, NULL, &wait_time);
if (FD_ISSET(ed->sock_fd, &rset_c))
{
recvmsg(ed->sock_fd, &ed->msg, 0);
nlh = (struct nlmsghdr*)ed->msg.msg_iov->iov_base;
flushcount(nlh->nlmsg_len);
}
}
printf("thread %d end!\n", ed->pid);
}

void flushcount(int len)
{
int val;
val = pthread_mutex_lock(&mutex);
if(val != 0)
{
printf("lock error. \n");
pthread_mutex_unlock(&mutex);
return;
}

if (len > 0)
{
readbytes += len;
readnum++;
}

pthread_mutex_unlock(&mutex);
}

void staticsout()
{
int millsec;
int val;
val = pthread_mutex_lock(&mutex);
if(val != 0)
{
printf("lock error. \n");
pthread_mutex_unlock(&mutex);
return;
}

if (readbytes > STATIC_PEROID)
{
gettimeofday(&tmnow, NULL);
millsec = (tmnow.tv_sec - oldtime.tv_sec) * 1000 + (tmnow.tv_usec - oldtime.tv_usec) / 1000;
printf("received %d Kbytes, consumed time is: %dms, speed is: %5.3fK/s, %5.2f/s.\n",
readbytes / 1024,
millsec,
(float)(readbytes / 1024) * 1000 / millsec,
(float)(readnum * 1000 / millsec));

gettimeofday(&oldtime, NULL);
readbytes = 0;
readnum = 0;
}

pthread_mutex_unlock(&mutex);
}


void startSendThreads(void)
{
int pidcount;
struct endpoint* ed = NULL;
struct nlmsghdr* nlh = NULL;

for (pidcount = 0; pidcount != MAX_PID_COUNT; pidcount++)
{
pthread_t tid;
ed = endpoits + pidcount;
if (0 == pthread_create(&tid, NULL, &sendThreadProc, (void*)ed))
{
printf("create a sendthread %u successful for the pid: %d. \n", tid, ed->pid);
}
}
}


内核态

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/types.h>
#include <linux/sched.h>
#include <net/sock.h>
#include <net/netlink.h>
#include <linux/kthread.h>
#include "conf.h"

#ifndef SLEEP_MILLI_SEC
#define SLEEP_MILLI_SEC(nMilliSec) \
do { \
long timeout = (nMilliSec) * HZ / 1000; \
while(timeout > 0) \
{ \
timeout = schedule_timeout(timeout); \
} \
}while(0);
#endif

struct sock* nl_sk = NULL;
EXPORT_SYMBOL_GPL(nl_sk);

static struct task_struct* task_test[MAX_PROCESS_COUNT];

static DECLARE_WAIT_QUEUE_HEAD(myevent_waitqueue);

static u32 pids[MAX_PID_COUNT] = {0};
static int pidindex = 0;

static int readBytes = 0;

static int childDataThread(void* index)
{
int threadindex = *((int*)index);
struct sk_buff* skb;
struct nlmsghdr* nlh;
int rc;
int len = NLMSG_SPACE(MAX_DATA_LEN);
int a = MSG_COUNT;
unsigned char randomindex;
wait_queue_head_t timeout_wq;
int devi = MAX_PID_COUNT / MAX_PROCESS_COUNT;

init_waitqueue_head(&timeout_wq);

printk("start thread [%d].\n", threadindex);

allow_signal(SIGKILL);

while (a-- && !kthread_should_stop())
{
int* pData;
int i = 0;
skb = alloc_skb(len, GFP_ATOMIC);
if (!skb)
{
printk(KERN_ERR "net_link: allocate failed.\n");
return -1;
}

nlh = nlmsg_put(skb, 0, 0, 0, MAX_DATA_LEN, 0);
NETLINK_CB(skb).pid = 0;
// pData = (int*)NLMSG_DATA(nlh);
// for (i = 0; i != MAX_DATA_LEN/4; i++)
// {
// *(pData + i) = i;
// }

get_random_bytes(&randomindex, 1);
randomindex = randomindex % devi + threadindex * devi;
// printk("radmonindex is: %d, threadindex is %d\n", randomindex, threadindex);
if (pids[randomindex] != 0)
{
// printk("net_link: going to send, peer pid is: %d, a is: %d.\n", pids[randomindex], a);
rc = netlink_unicast(nl_sk, skb, pids[randomindex], MSG_DONTWAIT);
if (rc < 0)
{
printk(KERN_ERR "net_link: can not unicast skb: %d, a is: %d, peerpid is: %u\n", rc, a, pids[randomindex]);
interruptible_sleep_on_timeout(&timeout_wq, (long)(0.1 * HZ));
}
else
{
// printk(KERN_ERR "net_link: unicast skb: %d, a is: %d, peerpid is: %u\n", rc, a, pids[randomindex]);
}
}

interruptible_sleep_on_timeout(&timeout_wq, (long)(0.05 * HZ));

if(signal_pending(current))
{
break;
}
}

printk("thread %d exit!\n", threadindex);
return 0;
}

void nl_data_ready(struct sk_buff* __skb)
{
struct sk_buff* skb;
struct nlmsghdr* nlh;

skb = skb_get(__skb);

if (skb->len >= NLMSG_SPACE(0))
{
nlh = nlmsg_hdr(skb);
// printk("net_link: recv %s.\n", (char *) NLMSG_DATA(nlh));

if (pidindex < MAX_PID_COUNT)
{
pids[pidindex] = nlh->nlmsg_pid;
if (pidindex == MAX_PID_COUNT - 1)
{
int i;
for (i = 0; i != MAX_PROCESS_COUNT; i++)
{
wake_up_process(task_test[i]);
printk("wake up the thread [%d].\n", i);
}
}
pidindex++;
}

readBytes += nlh->nlmsg_len;

if (readBytes > STATIC_PEROID)
{
printk("received %d bytes.\n", readBytes);
readBytes = 0;
}
kfree_skb(skb);
}

return;
}

static int init_netlink(void)
{
nl_sk = netlink_kernel_create(&init_net, NETLINK_TEST, 0, nl_data_ready,
NULL, THIS_MODULE);
if (!nl_sk)
{
printk(KERN_ERR "net_link: Cannot create netlink socket.\n");
return -EIO;
}

printk("net_link: create socket ok.\n");
return 0;
}

int init_thread(void)
{
int i = 0;
char processName[64] = {0};
for (i = 0; i != MAX_PROCESS_COUNT; i++)
{
void* data = kmalloc(sizeof(int), GFP_ATOMIC);
*(int*)data = i;
snprintf(processName, 63, "childDataThread-%d", i);

task_test[i] = kthread_create(childDataThread, data, processName);
if (IS_ERR(task_test[i]))
{
return PTR_ERR(task_test[i]);
}
printk("init thread (%d) ok!\n", i);
}

return 0;
}

int knl_init(void)
{
init_netlink();
init_thread();
return 0;
}

void stop_kthreads(void)
{
int i;
for (i = 0; i != MAX_PROCESS_COUNT; i++)
{
kthread_stop(task_test[i]);
}
}

void knl_exit(void)
{
stop_kthreads();
if (nl_sk != NULL)
{
sock_release(nl_sk->sk_socket);
}

printk("net_link: remove ok.\n");
}

module_exit(knl_exit);
module_init(knl_init);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("r");


Makefile
MODULE_NAME := knl
obj-m += $(MODULE_NAME).o
KERNELDIR ?= /lib/modules/$(shell uname -r)/build
PWD := $(shell pwd)
all:
$(MAKE) -C $(KERNELDIR) M=$(PWD)
gcc -g -o usr -lpthread usr.c
clean:
rm -f *.ko *.o *.cmd usr $(MODULE_NAME).mod.c Module.symvers

in:clean rm all
insmod knl.ko
rm:
rmmod knl.ko
sp:
cat /proc/net/knl
ru:
./usr
sm:
dmesg -c


初步测试的性能结果为:
init the socket 188 of pid 24056 successful.
init the socket 189 of pid 24057 successful.
init the socket 190 of pid 24058 successful.
init the socket 191 of pid 24059 successful.
init the socket 192 of pid 24060 successful.
init the socket 193 of pid 24061 successful.
init the socket 194 of pid 24062 successful.
init the socket 195 of pid 24063 successful.
init the socket 196 of pid 24064 successful.
init the socket 197 of pid 24065 successful.
init the socket 198 of pid 24066 successful.
init the socket 199 of pid 24067 successful.
init the socket 200 of pid 24068 successful.
init the socket 201 of pid 24069 successful.
init the socket 202 of pid 24070 successful.
received 30 Mbytes, consumed time is: 10227ms, speed is: 2.933M/s, 4012.00/s.
received 30 Mbytes, consumed time is: 10062ms, speed is: 2.982M/s, 4013.00/s.
received 30 Mbytes, consumed time is: 10052ms, speed is: 2.984M/s, 4012.00/s.
received 30 Mbytes, consumed time is: 10069ms, speed is: 2.979M/s, 4013.00/s.
received 30 Mbytes, consumed time is: 10113ms, speed is: 2.966M/s, 4012.00/s.
received 30 Mbytes, consumed time is: 10071ms, speed is: 2.979M/s, 4012.00/s.
received 30 Mbytes, consumed time is: 10289ms, speed is: 2.916M/s, 4014.00/s.
received 30 Mbytes, consumed time is: 10247ms, speed is: 2.928M/s, 4013.00/s.
received 30 Mbytes, consumed time is: 10347ms, speed is: 2.899M/s, 4013.00/s.
received 30 Mbytes, consumed time is: 10340ms, speed is: 2.901M/s, 4013.00/s.
received 30 Mbytes, consumed time is: 10107ms, speed is: 2.968M/s, 4012.00/s.
received 30 Mbytes, consumed time is: 10267ms, speed is: 2.922M/s, 4013.00/s.

内核态的运行结果
有较多发送失败的打印:
583 start thread [193].
584 start thread [194].
585 start thread [93].
586 start thread [195].
587 start thread [196].
588 start thread [197].
589 start thread [198].
590 start thread [199].
591 start thread [92].
592 start thread [91].
593 start thread [90].
594 start thread [89].
595 start thread [41].
596 start thread [40].
597 start thread [39].
598 start thread [17].
599 start thread [16].
600 start thread [1].
601 start thread [0].


消息传输的性能能达到4000条每秒,但是数据量却很低,看来netlink只能先用作消息传递,数据需要走共享内存的通道。
既然用做消息通道,那么不需要太多的连接,尝试10个连接的,内核10个进程并发的情况:
 
#define NETLINK_TEST 21
#define MAX_DATA_LEN (768)

#define MAX_PROCESS_COUNT 10

#define MAX_PAYLOAD 1024
#define MAX_PID_COUNT MAX_PROCESS_COUNT
#define MAX_REC_DATA_LEN 1024

#define STATIC_PEROID 1024*500

#define MSG_COUNT 100000000

#define TRUE 1

received 501 Kbytes, consumed time is: 3303ms, speed is: 151.680K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3277ms, speed is: 152.579K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3285ms, speed is: 152.207K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3260ms, speed is: 153.374K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3281ms, speed is: 152.393K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3279ms, speed is: 152.486K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3282ms, speed is: 152.346K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3271ms, speed is: 152.858K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3262ms, speed is: 153.280K/s, 218.00/s.
received 500 Kbytes, consumed time is: 3295ms, speed is: 151.745K/s, 218.00/s.


这个性能可以满足消息收发的需要。
 

新浪微博 QQ空间

| 1 分2 分3 分4 分5 分 (5.00- 8票) Loading ... Loading ... | 这篇文章归档在:C/C++, Linux内核. | 永久链接:链接 | 评论(3) |

3 条评论

  1. nowlan
    评论于 一月 19, 2017 at 15:25:34 CST | 评论链接

    netlink_unicast()发生错误的事情我遇到过,我当时的情况是内核发送速度过快,用户程序处理要慢一些,导致用户socket缓冲区很快就满了,然后netlink_unicast()就发送失败了。我的一个解决办法就是使用setsockopt()来增大用户socket缓存,可以缓解netlink_unicast()发送失败的情况。

  2. 快快闪人
    评论于 八月 20, 2014 at 14:13:37 CST | 评论链接

    最近也测试netlink的效率看到了你的这篇文章,也参考了不少的内容,帮助还是挺大的。不过对你的netlink效率的测试方法存疑,在内核的发送线程中出现unicast错误时,直接将其休眠,这种方法不妥,不能够反映出netlink的真实效率。这可能是你测试出的netlink发送条数为4000左右,而我在测试里采用类似ping pong的方法,配置不高的虚拟机里可以轻松达到2万条每秒。
    希望你可以做个参考,如有疑问可以互相沟通下。
    再次谢谢你的文章。

    • 童燕群
      评论于 八月 21, 2014 at 00:06:52 CST | 评论链接

      多谢你的肯定,能够对你有一点点帮助感到非常高兴!
      的确unicast错误后,就休眠不妥,我当时没有办法解释这个错误,也不知道怎么避免,后来没有再分析这个了。记得当时把并发连接数调大后,会出现非常多的失败打印,但是调到比较小的情况下,就没有问题了。

      能解释下unicast错误的原因以及应对么?

回复 童燕群

邮箱地址不会被泄露, 标记为 * 的项目必填。

8 - 2 = *



You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <img alt="" src="" class=""> <pre class=""> <q cite=""> <s> <strike> <strong>

返回顶部