多进程日志实现

在绝大多数情况下,日志模块对于一个程序而言是必不可少的,对于服务端程序的重要性尤甚。

因为几乎所有服务端程序都是作为守护进程而运行的,因此指望开发者从terminal获取程序的状态并不现实,这个时候,依靠日志记录关键信息就很重要了,当然日志的作用远不止记录信息,它还可以用于恢复,相信从事数据库开发的读者对这应该十分熟悉,但这不属于本文的范畴。

对于Java开发者而言,他们是很幸福的,因为有log4j,甚至还有slf4j,而对于C语言而言,并没有发现同等地位或者功能的日志库,更别提类似slf4j之类的日志库抽象层了。于是,我拿起键盘,再次造起了轮子。

本文谈及的实现是为了多进程程序而设计的,简单而有效是我们追求的目标,通常而言简单的程序性能不会太差,当然乱用锁的除外。

0x00 多进程下的问题

多进程程序最大的问题在于race condition,在多进程日志记录这个具体的情景下,最终记录日志信息的文件(以下简称日志文件)就成了hotspot(热点)。假设不考虑操作系统的机制,多个进程同时写日志,最直观的问题就是无法保证日志信息的有序性,最终的结果可能如下:

进程A的日志片段 | 进程B的日志片段 | 进程A的日志片段 。。。

假设我们能够保证write操作的原子性,那么这个问题便不存在了。幸运的是,POSIX的write在一定程度上提供了这种机制,前提是设置了O_APPEND标志,当开启这个标记后,在一定程度上可以理解为write的原子追加操作。

但问题是,并不是所有的系统都遵循POSIX标准的,并且从当前获得的信息来看,并不是任意长度的append write操作都能够保证原子性,这里应该是有个阀值的,具体跟OS和FS有关。

先把OS提供的机制搁在一边,让我们先考虑写日志的操作。

由于我们无法对日志信息的长度作任何假设,因此我们必须考虑到因为需要频繁记录短日志信息而大量调用write的情况。须知系统调用的代价是很大的,因此相对理性的做法是先将日志信息缓存,然后待缓冲区满后一次性write。

因此我个人认为相对理性的做法还是实现一个多进程日志库,而非让每个进程直接write,即使OS能够在一定程度上保证append write的原子性。

0x01 实现思想

在单进程程序下,是不存在日志文件写竞争的问题的。在实现本日志库时,采用的核心思想也是如此。只需让一个单独的进程或者线程完成写日志的任务即可,也就是说这里使用的是C/S架构,写日志的进程或线程是Server,而提出记录日志请求的进程是Client。

出于实现的复杂度考虑,这里的Server采用线程来实现。具体而言,就是初始化日志库的进程会启动一个独立的线程作为日志库的Server,而这个进程以及其所派生的所有子进程都将是Client。

当然,这并非最完美的实现,因为它存在单点故障问题,这是Master/Salver架构无法回避的,当然,如果Server端实现得足够简单,甚至简单到没bug,它应该基本不会crash的。当然即使存在潜在的故障,也可以使用一些容错的手段,比如说选举算法,相关内容超过了本文的范畴。

0x02 进程间通讯

那么剩下的问题,似乎就只是进程间通讯了,这已经是老生常谈的话题了,经典的方案如下:

  • 共享内存
  • 管道
  • 套接字
  • 消息队列

在实现的时候,我并没有考虑共享内存,为了保证共享内存的原子性,例如采用semphore,会造成额外的开销和复杂度,似乎并没有必要这么做。

管道分为匿名和具名,区别是前者只能用在具有派生关系的进程之间,而后者就解决了这个问题。

管道和套接字之间的区别是,前者是单向的,而后者则是双向的,并且如果后者使用Datagram,就可以保留消息的边界,而前者是做不到这点的。另外,UNIX Domain Socket的性能和PIPE差不多,因为这个时候并不需要解析协议。

消息队列(MQ)一般而言是相对于高层Application而言的,对于服务器而言,似乎并没有什么必要。

很多情况下,当考虑进程间通讯时,都是首推套接字的,因为进程间通讯可能涉及跨主机通讯,对于这点,只有套接字能够支持。

由于当前的需求是单机多进程,考虑到性能,这里就采用UNIX Domain Socket了,今后改成支持多机多进程时,也会比较容易。

0x03 实现一览

如下为实现示例,它并不能直接运行,因为我对涉及IO操作和time格式化操作的函数做了一定的封装,但代码中并未提供这些函数,因此如果你想copy-paste,这是不可能的,你需要需要自行理解,然后亲手动手实现相关函数。

typedef struct log {
    handler_t  server;
    handler_t  client;
    handler_t  logfile;
    pthread_t  tid;
    char*      data;
    unsigned   len;
} log_t, *Log;
// 初始化Logger
Log Log_init(Log logger) {
    int fd[2];
    // 使用socketpair创建客户端和服务端fd
    if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) {
        perror("Log_init");
        exit(-1);
    }
    logger->server.fileno = fd[0];
    logger->client.fileno = fd[1];
    logger->server.type   = H_LOG;
    logger->client.type   = H_LOG;
    return logger;
}
// 获取logger client
Log Log_getClient(Log logger) {
    Handler_close(&logger->server);
    // 此时将服务端的fd置为-1
    logger->server.fileno = -1;
    return logger;
}
// 获取logger server
Log Log_getServer(Log logger, const char* filename) {
    // server端也持有一个client
    logger->data = (char*)calloc(1, LOG_BUF_LEN);
    int fd = open(filename, O_CREAT | O_WRONLY | O_APPEND, 0644);
    if(fd == -1) {
        perror("Log_getServer");
        exit(-1);
    }
    logger->logfile.fileno = fd;
    logger->logfile.type   = H_FILE;
    return logger;
}
// 关闭logger
void Log_close(Log logger) {
    if(logger->server.fileno != -1) {
        Handler_close(&logger->client);
        // 如果是master进程,则需要join服务线程
        pthread_join(logger->tid, NULL);
        Handler_close(&logger->server);
        Handler_close(&logger->logfile);
        free(logger->data);
    } else {
        Handler_close(&logger->client);
    }
}
// 日志记录
void Log_record(Log logger, log_level_t level, const char* fmt, ...) {
    char buf[2048];
    char* p = buf;
    uint8_t ls = LOG_STR_LEN[level];
    strncpy(p, LOG_STR[level], ls);
    p += ls;
    *p++ = ' ';
    time_t tnow = time(NULL);
    // 将time转换成字符串
    DateTime_formatTimeStamp(&tnow, p);
    p += TIMESTAMP_LEN - 1;
    /* separator "%s - %s" */
    *p++ = ' '; *p++ = '-'; *p++ = ' ';
    va_list varg;
    va_start(varg, fmt);
    vsnprintf(p, 2000, fmt, varg);
    va_end(varg);
    size_t len = strlen(buf);
    buf[len++] = '\n';
    int8_t ret = IO_writeSpec(&logger->client, buf, &len);
    assert(ret == 0);
}
// 服务线程主函数
void* Log_main(void* arg) {
    Log logger   = (Log)arg;
    char* buf    = logger->data;
    size_t len   = LOG_BUF_LEN;
    size_t nread = LOG_BUF_LEN;
    size_t off   = 0;
    int8_t ret;
    while(ret = IO_read(&logger->server, buf + off, &nread) == 0) {
        off += nread;
        len -= nread;
        nread = len;
        if(len == 0) {
            // 对write函数的封装,写入指定长度的数据
            ret = IO_writeSpec(&logger->logfile, buf, &off);
            if(ret == -1) {
                perror("Log_main::write");
                exit(-1);
            }
            off   = 0;
            len   = LOG_BUF_LEN;
            nread = LOG_BUF_LEN;
        }
    }
    if(ret == -1) {
        perror("Log_main::write");
        exit(-1);
    }
    /* peer shutdown */
    /* @ASSERTION: ret == 1 */
    // 如果client都close,那么ret == 1,结束循环,服务端将退出
    if(off) {
        ret = IO_writeSpec(&logger->logfile, buf, &off);
        assert(ret == 0);
    }
    pthread_exit(0);
}
// 启动服务线程
void Log_runServer(Log logger) {
    pthread_create(&logger->tid, NULL, Log_main, logger);
}

上述代码比较简单,通过socketpair这一函数,可以直接创建服务端和客户端的fd。注意在fork的时候,fd也会被复制,因此在获取客户端的时候需要将服务端的fd关闭。服务线程的结束依靠read函数返回0来触发,但是fd有一个引用计数,只有将所有进程的client fd都close,才能使得该fd的引用计数为0,此时read函数才回返回0,表示对端关闭。

事实上,由于我们并不需要双向通讯,如果不考虑扩展性,这里使用PIPE也是可以的,方法差不多,替换socketpair即可,两者的性能也基本一致。

0xFF 总结

在0x01一节中,我们提到了目前这种实现并不完美,虽然并不存在真正完美的方案,但我们可以尽量向这个目标靠近。

对于本文而言,我们需要尽量保证Server线程不挂,这里主要面对的问题即为signal,更加健壮的做法是在Server线程中将一些signal给屏蔽,而上文的代码中并没有进行相应的处理,而这并不难,剩下的任务就留给大家了。

如果关于多进程日志的实现,各位如果有更加简单且有效的idea,还望能够分享。

说两句: