腾讯开源协程库libco分享


讲libco之前先说一下什么是协程协程,也叫 coroutine 用一句话可以理解为用户级的线程

大家都知道线程是操作系统提供的

如 windows 使用api CreateThread

linux 使用 pthread_create 来创建线程

可以理解为,线程是系统级的。

那协程则是用户级的,比如某些语言自带coroutine

golang 的 goroutine

lua 等等

还有一些是第三方实现的 coroutine 库

为什么要使用协程,因为协程创建资源占用非常小,可以说是廉价的比操作系统的线程占用资源少太多

接下来我要介绍的 libco 是腾讯开源协程库 GitHub https://github.com/Tencent/libco

先搬一下readme的简介

简介

libco是微信后台大规模使用的c/c++协程库,2013年至今稳定运行在微信后台的数万台机器上。

libco通过仅有的几个函数接口 cocreate/coresume/coyield 再配合 copoll,可以支持同步或者异步的写法,如线程库一样轻松。同时库里面提供了socket族函数的hook,使得后台逻辑服务几乎不用修改逻辑代码就可以完成异步化改造。 libco的特性

  • 无需侵入业务逻辑,把多进程、多线程服务改造成协程服务,并发能力得到百倍提升;
  • 支持CGI框架,轻松构建web服务(New);
  • 支持gethostbyname、mysqlclient、ssl等常用第三库(New);
  • 可选的共享栈模式,单机轻松接入千万连接(New);
  • 完善简洁的协程编程接口
  • 类pthread接口设计,通过cocreate、coresume等简单清晰接口即可完成协程的创建与恢复;
  • _thread的协程私有变量、协程间通信的协程信号量cosignal (New);
  • 语言级别的lambda实现,结合协程原地编写并执行后台异步任务 (New);
  • 基于epoll/kqueue实现的小而轻的网络框架,基于时间轮盘实现的高性能定时器;

上面介绍了几大特性 比较有亮点的就是 IO 全局hook功能 就是所有在 coroutine 里进行IO操作都会自动切换 coroutine 去执行操作

比如

for(::)
{
    int ret = read(fd,buf,sizeof(buf));
}

在调用read的时候,交由系统去处理接收IO操作

然后CPU切换到其它coroutine 继续执行

系统接收完了之后,cpu再切回来,这样就能充分利用CPU资源,而不是普通的一直阻塞在这里暂用CPU资源

在 coroutine 使用第三方库也能进行自动切换操作比如 mysql_connect();

因为libco已经把当前进程的使用IO操作都hook了

如何编译:

git clone https://github.com/Tencent/libco.git
cd libco
make

到这里已经把 libco 和一些简单的使用案例编译好了

我自己根据里边的案例写了一个 echo_server 回显服务器

把客户端发过来的数据再返回过去 echo_server.cpp


#include "co_routine.h"

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
#include <stack>

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>

using namespace std;


struct task_t
{
        stCoRoutine_t *co;
        int fd;
};
static stack<task_t*> g_readwrite;

int co_accept(int fd, struct sockaddr *addr, socklen_t *len );

int set_non_block(int fd)
{
    int iFlags;

    iFlags = fcntl(fd, F_GETFL, 0);
    iFlags |= O_NONBLOCK;
    iFlags |= O_NDELAY;
    int ret = fcntl(fd, F_SETFL, iFlags);
    return ret;
}

//读写IO线程
void *readwrite_routine( void *arg )
{
        co_enable_hook_sys();
        task_t *co = (task_t*)arg;
        char buf[1024];
        for(;;)
        {
                //如果sockfd已经关闭,把该task放入到空闲的协程
                if( -1 == co->fd )
                {
                        g_readwrite.push( co );
                        co_yield_ct();
                        continue;
                }
                int fd = co->fd;
                co->fd = -1;
                for(;;)
                {
                        struct pollfd pf = { 0 };
                        pf.fd = fd;
                        pf.events = (POLLIN|POLLERR|POLLHUP);
                        co_poll( co_get_epoll_ct(),&pf,1,1000);

                        int ret = read( fd,buf,sizeof(buf) );
                        if( ret > 0 )
                        {
                                ret = write( fd,buf,ret );
                        }
                        if( ret <= 0 )
                        {
                                if (errno == EAGAIN)
                                        continue;
                                close( fd );
                                break;
                        }
                }
        }
}

//接收客户端连接协程
static void *accept_routine( void *arg )
{
        //开启hook
        co_enable_hook_sys();
        int listenfd = *(int*)arg;
        for(;;)
        {
                //没有空闲的协程可用
                if( g_readwrite.empty() )
                {
                        printf("empty\n");
                        struct pollfd pf = { 0 };
                        pf.fd = -1;
                        poll( &pf,1,1000);
                        continue;
                }
                struct sockaddr_in addr;
                memset( &addr,0,sizeof(addr) );
                socklen_t len = sizeof(addr);
                int fd = co_accept(listenfd, (struct sockaddr *)&addr, &len);
                if( fd < 0 )
                {
                        struct pollfd pf = { 0 };
                        pf.fd = listenfd;
                        pf.events = (POLLIN|POLLERR|POLLHUP);
                        co_poll( co_get_epoll_ct(),&pf,1,1000 );
                        continue;
                }
                if( g_readwrite.empty() )
                {
                        close( fd );
                        continue;
                }
                set_non_block( fd );
                //把接收到的连接放入一个task
                task_t *co = g_readwrite.top();
                co->fd = fd;
                g_readwrite.pop();
                co_resume(co->co);
        }
}

int listen_server(const char *ip,int port)
{
        //tcp listen初始化操作
        int fd = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
        if(fd < 0)
        {
                return -1;
        }
        struct sockaddr_in addr;
        bzero(&addr,sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip);
        int ret = bind(fd,(struct sockaddr*)&addr,sizeof(addr));
        if( ret != 0)
        {
                close(fd);
                return -2;
        }
        listen( fd,1024 );
        //设置no block
        set_non_block(fd);

        //创建1024个IO读写协程
        for(int i = 0;i < 1024;i++)
        {
                task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
                task->fd = -1;

                co_create( &(task->co),NULL,readwrite_routine,task );
                co_resume( task->co );
        }
        stCoRoutine_t *accept_co = NULL;
        //创建1个accept协程 用来接收客户端连接
        co_create( &accept_co,NULL,accept_routine,&fd );
        co_resume( accept_co );

        co_eventloop( co_get_epoll_ct(),0,0 );
        return fd;
}

int main(int argc,char *argv[])
{
        if(argc < 3)
        {
                printf("%s [ip] [port]\n",argv[0]);
                return 0;
        }
        const char *ip = argv[1];
        int port = atoi( argv[2] );
        printf("[+]listening to %s:%d!\n",ip,port);
        int ret = listen_server(ip,port);
        if(ret < 1)
        {
                printf("[-]listen to %s:%d faild!code:%d\n",ip,port,ret);
                return 0;
        }
        return 0;
}


修改Makefile文件


#
# Tencent is pleased to support the open source community by making Libco available.
# 
# Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved.
# 
# Licensed under the Apache License, Version 2.0 (the "License"); 
# you may not use this file except in compliance with the License. 
# You may obtain a copy of the License at
# 
#   [url]http://www.apache.org/licenses/LICENSE-2.0[/url]
# 
# Unless required by applicable law or agreed to in writing, 
# software distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License.
#


COMM_MAKE = 1
COMM_ECHO = 1
version=0.5
v=debug
include co.mk

########## options ##########
CFLAGS += -g -fno-strict-aliasing -O2 -Wall -export-dynamic \
        -Wall -pipe  -D_GNU_SOURCE -D_REENTRANT -fPIC -Wno-deprecated -m64

LINKS += -g -L./lib -lcolib -lpthread -ldl 

COLIB_OBJS=co_epoll.o co_routine.o co_hook_sys_call.o coctx_swap.o coctx.o
#co_swapcontext.o

PROGS = colib example_poll echo_server example_echosvr example_echocli example_thread  example_cond example_specific example_copystack example_closure

all:$(PROGS)

colib:libcolib.a libcolib.so

libcolib.a: $(COLIB_OBJS)
        $(ARSTATICLIB) 
libcolib.so: $(COLIB_OBJS)
        $(BUILDSHARELIB) 

echo_server:echo_server.o
        $(BUILDEXE)
example_echosvr:example_echosvr.o
        $(BUILDEXE) 
example_echocli:example_echocli.o
        $(BUILDEXE) 
example_thread:example_thread.o
        $(BUILDEXE) 
example_poll:example_poll.o
        $(BUILDEXE) 
example_exit:example_exit.o
        $(BUILDEXE) 
example_cond:example_cond.o
        $(BUILDEXE)
example_specific:example_specific.o
        $(BUILDEXE)
example_copystack:example_copystack.o
        $(BUILDEXE)
example_setenv:example_setenv.o
        $(BUILDEXE)
example_closure:example_closure.o
        $(BUILDEXE)

dist: clean libco-$(version).src.tar.gz

libco-$(version).src.tar.gz:
        @find . -type f | grep -v CVS | grep -v .svn | sed s:^./:libco-$(version)/: > MANIFEST
        @(cd ..; ln -s libco_pub libco-$(version))
        (cd ..; tar cvf - `cat libco_pub/MANIFEST` | gzip > libco_pub/libco-$(version).src.tar.gz)
        @(cd ..; rm libco-$(version))

clean:
        $(CLEAN) *.o $(PROGS)
        rm -fr MANIFEST lib solib libco-$(version).src.tar.gz libco-$(version)


只是把echo_server.cpp加入一起编译

然后再次make

./echo_server 127.0.0.1 9000

github github

对了这个库只能在linux上面用

某牛在源码上做了分析 包括如何进行全局Hook的 都做了说明

https://github.com/hymanyx/libco 这是带分析的源码,可以在线看看

侧栏导航