6月忙着毕业,很是尴尬没能坚持写点博客,不过7月争取补上这些债务。
最近,又在继续写应用服务器了,算是有点进展,之后就可以陆续谈一些这方面的内容。对于实现应用服务器这一块,个人认为存在如下几个难点:
- 并发模型的选择与实现
- HTTP协议解析模块的实现,尤其是现在又多了个HTTP/2.0
- 应用框架的实现,主要是路由模块、IoC的实现方式以及基本的DI等
要实现应用框架,就必须提供基本的概念抽象,如Request、Response和Session等。Request和Response充其量是体力活,而Session的实现则需要慎重考虑了。
本文就为大家带来HttpSession实现方面的内容,思想为主,代码为辅。
0x00 何谓Session
HTTP协议是无状态的,所谓的“无状态”即指它与那些XX语音助手一样,根本不知道你上一句话说了什么。但是实际应用中,无状态明显是不够的,那么就需要提供一种机制来实现扩展,它就是Cookie和Session,具体内容可以阅读RFC6265。请注意,它们并不是HTTP协议的一部分,不信你可以去RFC7230~7235这一系列文档中去检索一下。
Session将上下文信息保存在服务器端,服务器通过Cookie中包含的id来对特定的Session进行检索。和Cookie类似,Session保存的也是键值对(HTTP协议大部分内容也是键值对罢了。。。),只不过这里的值相对于Cookie更加丰富,因为你可以将对象序列化到Session中,并且根本不用担心长度对性能造成的影响。
0x01 Session实现方案
因为Session并不是standalone的,因此选择实现方案时,需要结合整体考虑。应用服务器选择的并发模型决定了Seesion的实现,这里所谓的并发模型大概有如下几类:
- 单进程+多线程
- 多进程
- 多进程+多线程
这3类模型都能够实现支持高并发的服务器,当然本文并不打算细究容错、热部署、可控性及资源占用等问题,这些以后再说。
单进程模型
单进程本身当然不能提供对高并发的支持,但是有了多线程就不同了,尽管这个模型一直被很多大牛和他们的书吐槽。在这种环境下,一个常见的HashMap即可实现Session,但是要注意数据的一致性和操作的原子性,Tomcat采用的就是ConcurrentHashMap(面试必问)。
多进程模型
无论其内部是否使用多线程,都需要实现一个能够跨进程工作的Session,换句话说,Session至少是一个独立的进程,这里就涉及到了IPC的问题了。好在Linux下IPC的可选方案比较多,如Socket、命名管道、共享内存等;不过如果出现跨主机这种情况时,就需要考虑RPC了,可以考虑采用Socket以及一些第三方库。
本人目前正在开发的应用服务器采用的是多进程+线程池的并发模型,因此在实现方案的选择方面,我选择的是通过Socket实现RPC,NoSQL作为后端提供数据的存储。
那么为何选择Redis而不是其它诸如Memcached,SSDB之类的NoSQL呢?这里的原因大概如下:
- Redis支持数据持久化,不必再为“悬空”的数据而提心吊胆;
- Redis支持集群,扩展性方面的问题也就不用考虑了;
- Redis通讯协议简单,易于实现。
在实现本Session的时候,我们相当于是在写一个简单的Redis Client,并对其做适当的封装,以简化Session高抽象层次的操作。
0x02 Session实现细节
那么Session内部需要包含哪些信息呢?这里可以参照Servlet中HttpSession的设计,大致信息如下:
- Session ID
- 创建时间
- 最近访问时间
- 最大有效时间
上述信息中,主要有两个细节需要慎重考虑:
- 全局唯一ID
- 过期机制
全局唯一ID
就如同数据库表中的主键一样,这个ID唯一对应了一个Session表。全局唯一性是至关重要的,可以参考MongoDB中ObjectID的生成方式,采用hostname + pid + tid + time的组合,它们正好对应ID的4个维度。
出于安全原因,我们必须确保Session ID不会被轻易猜到,因此,可以对上述ID使用MD5处理一下,当然如果恰好“碰撞”,那就只能自认倒霉了。
Session过期机制
为什么要提供过期机制?因为SessionId通常被保存在Cookie中,而browser是很容易“丢失”这个Cookie的,那么总不能一直保存着Session吧?因此,这里需要提供一个过期机制,自动销毁那些“变质”了的Session。
Tomcat的实现提供了很多可以参考的细节,本文可以利用的大致有如下2点:
- 过期SessionID的复用
- Session实例在操作期间不受过期机制影响,换句话说,在获取了一个Session的实例之后,哪怕接下来这个Session立马过期了,这个实例的任何操作均不会被影响
由于我们并不是亲自管理Session数据,特性1实现起来比较麻烦,因此不再考虑,而特性2则是实现过期机制的关键。
这里依靠Redis的EXPIRE命令来实现Session的过期。如上图所示,为了实现特性2,我们在Redis中添加了一个类似于引用的机制,它以SessionID作为key,以Session的真正命名作为value。我们为每个ID都设置了EXPIRE,值即为其对应的最大有效时间,而具体的Session的EXPIRE值则大于SessionID,默认可以设为2倍。
这样,获取Session实例时,只能先通过SessionID进行Session表名查询,此时便可确定当前Session是否已经过期。但是,即使是已经过期的Session(对应上图中的Session-X),其真实的内容仍然会存在一段时间,以确保不会影响实例的操作。
0x03 Redis通讯协议及解析
[Redis通讯协议]3属于一种文本协议,官方给出了这种协议的特点:
- 易于实现
- 快速解析
- 肉眼可读
原本完全可以使用二进制协议的,解析起来更加快速和简单,可人家偏偏强调要“肉眼可读”,难道除了client的实现者,还有人会没事去看它?
协议格式
简单而言,RESP的协议结构采用的是如下这种形式:
protocol ::= FLAG CONTENT EOL
其中FLAG即为上述5种标点,EOL为rn。
FLAG的官方描述如下:
- For Simple Strings the first byte of the reply is "+"
- For Errors the first byte of the reply is "-"
- For Integers the first byte of the reply is ":"
- For Bulk Strings the first byte of the reply is "$"
- For Arrays the first byte of the reply is "*"
大意为:
- 以+、-、:作为FLAG的,都是单行信息,只不过分别表示简单信息、错误信息以及数字罢了,它们只出现在响应中
- 以$作为FLAG的,表示其后输出的是一个字符串,$后即为该字符串的长度,例如$3rnGETrn
- 以作为FLAG的,表示其后有多个Bulk字符串,后即为这些字符串的数量,例如*2rn$3\r\nGET\r\n$4rntestrn
我们来简单模拟一次通讯:
Request: get test
*2\r\n
$3\r\n
GET\r\n
$4\r\n
test\r\n
Response: ha
$2\r\n
ha\r\n
有几个点需要注意一下:
- $0rn 表示空字符串“”
- $-1rn 表示NULL
- * 0rn 表示长度为0的数组
- *-1rn 表示NULL
解析器实现
协议本身比较简单,但是对于具体的返回值,需要对照官方的指南来实现,以免出现诡异的问题。
测试的时候,建议开个wireshark,至于为什么要这么做,你测试的时候就懂了。
说句废话,这里使用的是bio,也就是阻塞式IO,代码以D语言为例,看不懂的就当作Java代码来看,理解上应该不是什么大问题。
Request生成函数:
void buildCmd(Buffer!char buf, RedisMethod cmd, string[] args...) {
size_t batchCount = args.length + 1;
// 生成*部分信息
buf.put(ResultType.Array);
buf.put(to!string(batchCount));
buf.put(CRLF);
// 生成命令的信息,如GET、SET等
buf.put(ResultType.Bulk);
buf.put(to!string(cmd.length));
buf.put(CRLF);
buf.put(cmd);
buf.put(CRLF);
// 生成命令的具体参数信息
foreach(arg; args) {
buf.put(ResultType.Bulk);
buf.put(to!string(arg.length));
buf.put(CRLF);
buf.put(arg);
buf.put(CRLF);
}
}
这部分代码相对易懂,这里就不再赘述了,Buffer是我自己实现的一个类,和Java中的Buffer比较像,可以参照一下。
Response解析函数:
RedisResult parseResult() {
scope(exit) {
m_Buf.clear();
}
RedisResult res;
m_Buf.limit = 1;
m_Sock.read(m_Buf);
final switch(m_Buf[0]) {
case ResultType.Simple:
res.type = ResultType.Simple;
parseSimpleLineMsg(res);
break;
case ResultType.Error:
res.type = ResultType.Error;
parseSimpleLineMsg(res);
break;
case ResultType.Integer:
res.type = ResultType.Integer;
parseSimpleLineMsg(res);
break;
case ResultType.Bulk:
res.type = ResultType.Bulk;
/* parseBulkMsg will not set length for result */
res.length = 1;
parseBulkMsg(res);
break;
case ResultType.Array:
res.type = ResultType.Array;
parseArrayMsg(res);
break;
}
return res;
}
int parseLength() {
while(true) {
m_Sock.read(m_Buf);
switch(m_Buf[$ - 1]) {
case CR:
m_Buf.position = m_Buf.position - 1;
break;
case LF:
m_Buf.position = m_Buf.position - 1;
m_Buf.limit = m_Buf.limit - 1;
goto finish;
default:
m_Buf.limit = m_Buf.limit + 1;
}
}
finish:
return to!int(m_Buf.toString());
}
void parseSimpleLineMsg(ref RedisResult res) {
res.length = 1;
m_Buf.position = 0;
while(true) {
m_Sock.read(m_Buf);
switch(m_Buf[$ - 1]) {
case CR:
m_Buf.position = m_Buf.position - 1;
break;
case LF:
m_Buf.position = m_Buf.position - 1;
m_Buf.limit = m_Buf.limit - 1;
goto finish;
default:
m_Buf.limit = m_Buf.limit + 1;
}
}
finish:
res[0] = m_Buf.toString();
}
void parseBulkMsg(ref RedisResult res, size_t index = 0) {
m_Buf.position = 0;
/* keep it! for the loop of parseBuilMsg */
m_Buf.limit = 1;
int len = parseLength();
if(len == -1) {
res[index] = null;
return;
}
/* including CR & LF */
m_Buf.limit = len + 2;
m_Buf.position = 0;
m_Sock.read(m_Buf);
/* skip CR & LF */
res[index] = m_Buf[0 .. $ - 2];
}
这部分是基本的线性解析,比较简易。实现的时候只需要按照FLAG描述的那3点进行解析即可。但是请注意,我们每次只从socket中取1个字符,除非$行解析完毕,才可以肆无忌惮的一次取多个字符。
Result定义:
struct RedisResult {
private:
ResultType m_Type;
string[] m_Results;
public:
@property
void length(size_t len) {
m_Results = new string[len];
}
@property
size_t length() {
return m_Results.length;
}
@property
void type(ResultType type) {
m_Type = type;
}
@property
ResultType type() {
return m_Type;
}
@property
string opIndex(size_t index) {
return m_Results[index];
}
@property
void opIndexAssign(string value, size_t index) {
m_Results[index] = value;
}
bool isNil() {
return m_Results == null;
}
string getError() {
return m_Results[0];
}
int getInteger() {
import std.conv;
return to!int(m_Results[0]);
}
}
enum ResultType : char {
Simple = '+',
Error = '-',
Integer = ':',
Bulk = '$',
Array = '*'
}
RedisResult就是对通讯结果的抽象,内部属性很简单,字符串数组 + 消息类型,附带一批property函数以便于操作。
我做了一个简单的benchmark,比较的对象是hiredis client,100w次GET操作,hiredis的时间消耗是14.178s,我自己实现的是14.917s,有优化的余地,但也尚可接受,毕竟语言不同。
0x04 进一步思考
通过上述内容,我们可以得到一个可用的Session,但是想要将这个Session应用于生产环境,尚须思考下列问题:
- Redis的Integer为32位,实际应用中是否会超过上限
- Session数据量变大,此时Redis的可用性就有待思考了
- Redis本身的事务问题,是否有必要启用乐观锁,以及乐观锁能够解决问题
- Session机制本身的并发问题,以及如何容错