基于Redis实现HttpSession

6月忙着毕业,很是尴尬没能坚持写点博客,不过7月争取补上这些债务。

最近,又在继续写应用服务器了,算是有点进展,之后就可以陆续谈一些这方面的内容。对于实现应用服务器这一块,个人认为存在如下几个难点:

  • 并发模型的选择与实现
  • HTTP协议解析模块的实现,尤其是现在又多了个HTTP/2.0
  • 应用框架的实现,主要是路由模块、IoC的实现方式以及基本的DI等

要实现应用框架,就必须提供基本的概念抽象,如Request、Response和Session等。Request和Response充其量是体力活,而Session的实现则需要慎重考虑了。

本文就为大家带来HttpSession实现方面的内容,思想为主,代码为辅。

0x00 何谓Session

HTTP协议是无状态的,所谓的“无状态”即指它与那些XX语音助手一样,根本不知道你上一句话说了什么。但是实际应用中,无状态明显是不够的,那么就需要提供一种机制来实现扩展,它就是Cookie和Session,具体内容可以阅读RFC6265。请注意,它们并不是HTTP协议的一部分,不信你可以去RFC7230~7235这一系列文档中去检索一下。

Session将上下文信息保存在服务器端,服务器通过Cookie中包含的id来对特定的Session进行检索。和Cookie类似,Session保存的也是键值对(HTTP协议大部分内容也是键值对罢了。。。),只不过这里的值相对于Cookie更加丰富,因为你可以将对象序列化到Session中,并且根本不用担心长度对性能造成的影响。

0x01 Session实现方案

因为Session并不是standalone的,因此选择实现方案时,需要结合整体考虑。应用服务器选择的并发模型决定了Seesion的实现,这里所谓的并发模型大概有如下几类:

  1. 单进程+多线程
  2. 多进程
  3. 多进程+多线程

这3类模型都能够实现支持高并发的服务器,当然本文并不打算细究容错、热部署、可控性及资源占用等问题,这些以后再说。

单进程模型

单进程本身当然不能提供对高并发的支持,但是有了多线程就不同了,尽管这个模型一直被很多大牛和他们的书吐槽。在这种环境下,一个常见的HashMap即可实现Session,但是要注意数据的一致性和操作的原子性,Tomcat采用的就是ConcurrentHashMap(面试必问)。

多进程模型

无论其内部是否使用多线程,都需要实现一个能够跨进程工作的Session,换句话说,Session至少是一个独立的进程,这里就涉及到了IPC的问题了。好在Linux下IPC的可选方案比较多,如Socket、命名管道、共享内存等;不过如果出现跨主机这种情况时,就需要考虑RPC了,可以考虑采用Socket以及一些第三方库。

本人目前正在开发的应用服务器采用的是多进程+线程池的并发模型,因此在实现方案的选择方面,我选择的是通过Socket实现RPC,NoSQL作为后端提供数据的存储。

那么为何选择Redis而不是其它诸如Memcached,SSDB之类的NoSQL呢?这里的原因大概如下:

  1. Redis支持数据持久化,不必再为“悬空”的数据而提心吊胆;
  2. Redis支持集群,扩展性方面的问题也就不用考虑了;
  3. Redis通讯协议简单,易于实现。

在实现本Session的时候,我们相当于是在写一个简单的Redis Client,并对其做适当的封装,以简化Session高抽象层次的操作。

0x02 Session实现细节

那么Session内部需要包含哪些信息呢?这里可以参照Servlet中HttpSession的设计,大致信息如下:

  1. Session ID
  2. 创建时间
  3. 最近访问时间
  4. 最大有效时间

上述信息中,主要有两个细节需要慎重考虑:

  1. 全局唯一ID
  2. 过期机制

全局唯一ID

就如同数据库表中的主键一样,这个ID唯一对应了一个Session表。全局唯一性是至关重要的,可以参考MongoDB中ObjectID的生成方式,采用hostname + pid + tid + time的组合,它们正好对应ID的4个维度。

出于安全原因,我们必须确保Session ID不会被轻易猜到,因此,可以对上述ID使用MD5处理一下,当然如果恰好“碰撞”,那就只能自认倒霉了。

Session过期机制

为什么要提供过期机制?因为SessionId通常被保存在Cookie中,而browser是很容易“丢失”这个Cookie的,那么总不能一直保存着Session吧?因此,这里需要提供一个过期机制,自动销毁那些“变质”了的Session。

Tomcat的实现提供了很多可以参考的细节,本文可以利用的大致有如下2点:

  1. 过期SessionID的复用
  2. Session实例在操作期间不受过期机制影响,换句话说,在获取了一个Session的实例之后,哪怕接下来这个Session立马过期了,这个实例的任何操作均不会被影响

由于我们并不是亲自管理Session数据,特性1实现起来比较麻烦,因此不再考虑,而特性2则是实现过期机制的关键。

session.png

这里依靠Redis的EXPIRE命令来实现Session的过期。如上图所示,为了实现特性2,我们在Redis中添加了一个类似于引用的机制,它以SessionID作为key,以Session的真正命名作为value。我们为每个ID都设置了EXPIRE,值即为其对应的最大有效时间,而具体的Session的EXPIRE值则大于SessionID,默认可以设为2倍。

这样,获取Session实例时,只能先通过SessionID进行Session表名查询,此时便可确定当前Session是否已经过期。但是,即使是已经过期的Session(对应上图中的Session-X),其真实的内容仍然会存在一段时间,以确保不会影响实例的操作。

0x03 Redis通讯协议及解析

[Redis通讯协议]3属于一种文本协议,官方给出了这种协议的特点:

  • 易于实现
  • 快速解析
  • 肉眼可读

原本完全可以使用二进制协议的,解析起来更加快速和简单,可人家偏偏强调要“肉眼可读”,难道除了client的实现者,还有人会没事去看它?

协议格式

简单而言,RESP的协议结构采用的是如下这种形式:

protocol ::= FLAG CONTENT EOL

其中FLAG即为上述5种标点,EOL为rn。

FLAG的官方描述如下:

  • For Simple Strings the first byte of the reply is "+"
  • For Errors the first byte of the reply is "-"
  • For Integers the first byte of the reply is ":"
  • For Bulk Strings the first byte of the reply is "$"
  • For Arrays the first byte of the reply is "*"

大意为:

  • 以+、-、:作为FLAG的,都是单行信息,只不过分别表示简单信息、错误信息以及数字罢了,它们只出现在响应中
  • 以$作为FLAG的,表示其后输出的是一个字符串,$后即为该字符串的长度,例如$3rnGETrn
  • 作为FLAG的,表示其后有多个Bulk字符串,后即为这些字符串的数量,例如*2rn$3\r\nGET\r\n$4rntestrn

我们来简单模拟一次通讯:

Request: get test

*2\r\n
$3\r\n
GET\r\n
$4\r\n
test\r\n

Response: ha

$2\r\n
ha\r\n

有几个点需要注意一下:

  • $0rn 表示空字符串“”
  • $-1rn 表示NULL
  • * 0rn 表示长度为0的数组
  • *-1rn 表示NULL

解析器实现

协议本身比较简单,但是对于具体的返回值,需要对照官方的指南来实现,以免出现诡异的问题。

测试的时候,建议开个wireshark,至于为什么要这么做,你测试的时候就懂了。

说句废话,这里使用的是bio,也就是阻塞式IO,代码以D语言为例,看不懂的就当作Java代码来看,理解上应该不是什么大问题。

Request生成函数:

void buildCmd(Buffer!char buf, RedisMethod cmd, string[] args...) {
    size_t batchCount = args.length + 1;
// 生成*部分信息
    buf.put(ResultType.Array);
    buf.put(to!string(batchCount));
    buf.put(CRLF);
// 生成命令的信息,如GET、SET等
    buf.put(ResultType.Bulk);
    buf.put(to!string(cmd.length));
    buf.put(CRLF);
    buf.put(cmd);
    buf.put(CRLF);
// 生成命令的具体参数信息
    foreach(arg; args) {
        buf.put(ResultType.Bulk);
        buf.put(to!string(arg.length));
        buf.put(CRLF);
        buf.put(arg);
        buf.put(CRLF);
    }
}

这部分代码相对易懂,这里就不再赘述了,Buffer是我自己实现的一个类,和Java中的Buffer比较像,可以参照一下。

Response解析函数:

RedisResult parseResult() {
    scope(exit) {
        m_Buf.clear();
    }
    RedisResult res;
    m_Buf.limit = 1;
    m_Sock.read(m_Buf);
    final switch(m_Buf[0]) {
        case ResultType.Simple:
            res.type = ResultType.Simple;
            parseSimpleLineMsg(res);
            break;
        case ResultType.Error:
            res.type = ResultType.Error;
            parseSimpleLineMsg(res);
            break;
        case ResultType.Integer:
            res.type = ResultType.Integer;
            parseSimpleLineMsg(res);
            break;
        case ResultType.Bulk:
            res.type = ResultType.Bulk;
            /* parseBulkMsg will not set length for result */
            res.length = 1;
            parseBulkMsg(res);
            break;
        case ResultType.Array:
            res.type = ResultType.Array;
            parseArrayMsg(res);
            break;
    }
    return res;
}
int parseLength() {
    while(true) {
        m_Sock.read(m_Buf);
        switch(m_Buf[$ - 1]) {
            case CR:
                m_Buf.position = m_Buf.position - 1;
                break;
            case LF:
                m_Buf.position = m_Buf.position - 1;
                m_Buf.limit = m_Buf.limit - 1;
                goto finish;
            default:
                m_Buf.limit = m_Buf.limit + 1;
        }
    }
finish:
    return to!int(m_Buf.toString());
}
void parseSimpleLineMsg(ref RedisResult res) {
    res.length = 1;
    m_Buf.position = 0;
    while(true) {
        m_Sock.read(m_Buf);
        switch(m_Buf[$ - 1]) {
            case CR:
                m_Buf.position = m_Buf.position - 1;
                break;
            case LF:
                m_Buf.position = m_Buf.position - 1;
                m_Buf.limit = m_Buf.limit - 1;
                goto finish;
            default:
                m_Buf.limit = m_Buf.limit + 1;
        }
    }
finish:
    res[0] = m_Buf.toString();
}
void parseBulkMsg(ref RedisResult res, size_t index = 0) {
    m_Buf.position = 0;
    /* keep it! for the loop of parseBuilMsg */
    m_Buf.limit = 1;
    int len = parseLength();
    if(len == -1) {
        res[index] = null;
        return;
    }
    /* including CR & LF */
    m_Buf.limit = len + 2;
    m_Buf.position = 0;
    m_Sock.read(m_Buf);
    /* skip CR & LF */
    res[index] = m_Buf[0 .. $ - 2];
}

这部分是基本的线性解析,比较简易。实现的时候只需要按照FLAG描述的那3点进行解析即可。但是请注意,我们每次只从socket中取1个字符,除非$行解析完毕,才可以肆无忌惮的一次取多个字符。

Result定义:

struct RedisResult {
private:
    ResultType m_Type;
    string[] m_Results;
public:
    @property
    void length(size_t len) {
        m_Results = new string[len];
    }
    @property
    size_t length() {
        return m_Results.length;
    }
    @property
    void type(ResultType type) {
        m_Type = type;
    }
    @property
    ResultType type() {
        return m_Type;
    }
    @property
    string opIndex(size_t index) {
        return m_Results[index];
    }
    @property
    void opIndexAssign(string value, size_t index) {
        m_Results[index] = value;
    }
    bool isNil() {
        return m_Results == null;
    }
    string getError() {
        return m_Results[0];
    }
    int getInteger() {
        import std.conv;
        return to!int(m_Results[0]);
    }
}
enum ResultType : char {
    Simple  = '+',
    Error   = '-',
    Integer = ':',
    Bulk    = '$',
    Array   = '*'
}

RedisResult就是对通讯结果的抽象,内部属性很简单,字符串数组 + 消息类型,附带一批property函数以便于操作。

我做了一个简单的benchmark,比较的对象是hiredis client,100w次GET操作,hiredis的时间消耗是14.178s,我自己实现的是14.917s,有优化的余地,但也尚可接受,毕竟语言不同。

0x04 进一步思考

通过上述内容,我们可以得到一个可用的Session,但是想要将这个Session应用于生产环境,尚须思考下列问题:

  1. Redis的Integer为32位,实际应用中是否会超过上限
  2. Session数据量变大,此时Redis的可用性就有待思考了
  3. Redis本身的事务问题,是否有必要启用乐观锁,以及乐观锁能够解决问题
  4. Session机制本身的并发问题,以及如何容错
说两句: