MYDB record

MYDB

1. 项目结构

整体架构

img

  1. Transaction Manager(TM)
  2. Data Manager(DM)
  3. Version Manager(VM)
  4. Index Manager(IM)
  5. Table Manager(TBM)

每个模块的职责如下:

  1. TM 通过维护 XID 文件来维护事务的状态,并提供接口供其他模块来查询某个事务的状态。
  2. DM 直接管理数据库 DB 文件和日志文件。DM 的主要职责有:1) 分页管理 DB 文件,并进行缓存;2) 管理日志文件,保证在发生错误时可以根据日志进行恢复;3) 抽象 DB 文件为 DataItem 供上层模块使用,并提供缓存。
  3. VM 基于两段锁协议实现了调度序列的可串行化,并实现了 MVCC 以消除读写阻塞。同时实现了两种隔离级别。
  4. IM 实现了基于 B+ 树的索引,BTW,目前 where 只支持已索引字段。
  5. TBM 实现了对字段和表的管理。同时,解析 SQL 语句,并根据语句操作表。

2. Transaction Manager

TM模块通过维护一个XID文件来维护事务状态,并提供接口供其他模块来查询某个事务的状态

XID文件

在 MYDB 中,每一个事务都有一个 XID,这个 ID 唯一标识了这个事务。事务的 XID 从 1 开始标号,并自增,不可重复。并特殊规定 XID 0 是一个超级事务(Super Transaction)。当一些操作想在没有申请事务的情况下进行,那么可以将操作的 XID 设置为 0。XID 为 0 的事务的状态永远是 committed。

TransactionManager 维护了一个 XID 格式的文件,用来记录各个事务的状态。MYDB 中,每个事务都有下面的三种状态:

  1. active,正在进行,尚未结束
  2. committed,已提交
  3. aborted,已撤销(回滚)

XID 文件给每个事务分配了一个字节的空间,用来保存其状态。同时,在 XID 文件的头部,还保存了一个 8 字节的数字,记录了这个 XID 文件管理的事务的个数。于是,事务 xid 在文件中的状态就存储在 (xid-1)+8 字节处,xid-1 是因为 xid 0(Super XID) 的状态不需要记录。(XID文件头8个字节用于记录此XID文件管理的事务数量,后面记录事务的状态)

必要的常量及变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// XID文件头长度
static final int LEN_XID_HEADER_LENGTH = 8;
// 每个事务的占用长度
private static final int XID_FIELD_SIZE = 1;
// 事务的三种状态
private static final byte FIELD_TRAN_ACTIVE = 0;
private static final byte FIELD_TRAN_COMMITTED = 1;
private static final byte FIELD_TRAN_ABORTED = 2;
// 超级事务,永远为commited状态
public static final long SUPER_XID = 0;
// XID 文件后缀
static final String XID_SUFFIX = ".xid";

// 用于获取XID文件
private RandomAccessFile file;
// 文件通道,用于文件的读写
private FileChannel fc;
// 用于记录XID文件中事务的数量
private long xidCounter;
private Lock counterLock;

XID文件校验

在构造函数创建了一个 TransactionManager 之后,首先要对 XID 文件进行校验,以保证这是一个合法的 XID 文件。校验的方式也很简单,通过文件头的 8 字节数字反推文件的理论长度,与文件的实际长度做对比。如果不同则认为 XID 文件不合法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void checkXIDCounter() {
long fileLen = 0;
try {
// 记录文件的实际长度
fileLen = file.length();
} catch (IOException e1) {
Panic.panic(Error.BadXIDFileException);
}
// 文件实际长度比文件头的8字节还小,说明文件不合法
if(fileLen < LEN_XID_HEADER_LENGTH) {
Panic.panic(Error.BadXIDFileException);
}

ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
try {
// 设置FileChannel的position
fc.position(0);
// 从文件通道读取内容到ByteBuffer中
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
this.xidCounter = Parser.parseLong(buf.array());
long end = getXidPosition(this.xidCounter + 1);
if(end != fileLen) {
Panic.panic(Error.BadXIDFileException);
}
}

// 根据事务xid取得其在xid文件中对应的位置
private long getXidPosition(long xid) {
return LEN_XID_HEADER_LENGTH + (xid-1)*XID_FIELD_SIZE;
}

更新事务状态

  • begin():开启一个事务,首先设置xid = xidCounter+1的事务状态为active,随后xidCounter自增
  • commit():提交一个事务,将xid事务的状态设置为committed
  • abort():取消一个事务,将xid事务的状态设置为aborted
  • 这三个更新操作都可借助下面的updateXID()方法实现,同时开启事务还需要实现一个xidCounter自增的方法incrXIDCounter()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 更新xid事务的状态为status
private void updateXID(long xid, byte status) {
long offset = getXidPosition(xid);
byte[] tmp = new byte[XID_FIELD_SIZE];
tmp[0] = status;
// tmp数组包装成bytebuffer数组
ByteBuffer buf = ByteBuffer.wrap(tmp);
try {
// 将ByteBuffer中的内容写入到文件通道中的offset位置,即更改事务状态
fc.position(offset);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
// 所有文件操作,在执行后都需要立刻刷入文件中,防止在崩溃后文件丢失数据,fileChannel 的 force() 方法,强制同步缓存内容到文件中。force 方法的参数是一个布尔,表示是否同步文件的元数据(例如最后修改时间等)。
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

// 将XID加一,并更新XID Header(开启一个新事务时需要此操作)
private void incrXIDCounter() {
xidCounter ++;
ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
try {
fc.position(0);
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
}
try {
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

检查事务状态

isActive()isCommitted()isAborted() 都是检查一个 xid 的状态,可以用一个通用的方法解决:(检查时需要排除SUPER_XID的情况,因为该情况一定是committed的)

1
2
3
4
5
6
7
8
9
10
11
12
// 检测XID事务是否处于status状态
private boolean checkXID(long xid, byte status) {
long offset = getXidPosition(xid);
ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
try {
fc.position(offset);
fc.read(buf);
} catch (IOException e) {
Panic.panic(e);
}
return buf.array()[0] == status;
}

创建和打开XID文件

在接口中创建的两个静态方法create()open()

  • create():创建一个XID文件,并创建TM(TM的构造器需要XID文件和FileChannel)
  • open():从一个已有的XID文件来创建TM

3. Data Manager

DataManager(DM)功能归纳:

  • 上层模块和文件系统中的一个抽象层。向上,提供数据包装;向下,直接读写文件。
  • 提供日志功能。

计数缓存框架

分页管理和数据项(DataItem)管理涉及缓存,故抽象出一个通用缓存框架

引用计数法

**引用计数法(Reference counting)**是一种内存管理技术,它通过计算每个对象被引用的次数来判断是否需要回收该对象。当对象被创建时,引用计数为 1,每当有一个新的引用指向该对象时,计数加 1,当引用失效时,计数减 1。当计数为 0 时,该对象就可以被回收

在 MYDB 的实践中,需要的效果是,只有上层模块主动释放引用,缓存在确保没有模块在使用这个资源了,才会去驱逐资源。于是,选择引用计数法。增加了一个方法 release (key),用于在上册模块不使用某个资源时,释放对资源的引用。当引用归零时,缓存就会驱逐这个资源。

同样,在缓存满了之后,引用计数法无法自动释放缓存,此时应该直接报错(和 JVM 似的,直接 OOM)。

LRU

**LRU(least recently used)**是一种缓存淘汰算法。它的特点是根据数据最近被访问的时间来决定哪些数据应该被保留,哪些数据应该被淘汰。当缓存达到一定容量时,会淘汰掉最近最少使用的数据。

如果使用 LRU 缓存,那么只需要设计一个get (key)接口即可,释放缓存可以在缓存满了之后自动完成。

however,当某时刻缓存满了,缓存驱逐一个资源,此时上层模块想将某个资源强制刷回数据源,这个资源恰恰是刚被驱逐的资源。此时的上层模块会发现,资源在缓存中消失了,那么,是否有必要做回源操作?

  • 不回源。由于没法确定缓存被驱逐的时间,更没法确定被驱逐之后数据项是否被修改,这样是极其不安全的
  • 回源。如果数据项被驱逐时的数据和现在又是相同的,那就是一次无效回源
  • 放回缓存里,等下次被驱逐时回源。看起来解决了问题,但是此时缓存已经满了,这意味着你还需要驱逐一个资源才能放进去。这有可能会导致缓存抖动问题。

必要的变量

AbstractCache<T>是一个抽象类,内部有两个抽象方法,留给实现类去实现具体的操作:

1
2
3
4
5
6
7
8
/**
* 当资源不在缓存时的获取行为(去数据源中获取)
*/
protected abstract T getForCache(long key) throws Exception;
/**
* 当资源被驱逐时的写回行为
*/
protected abstract void releaseForCache(T obj);

由于我们选择使用的是引用计数法实现缓存,因此除了普通的缓存功能,还需要另外维护一个计数,用于记录资源被引用的个数。除此以外,为了应对多线程场景,还需要记录哪些资源正在从数据源获取中(从数据源获取资源是一个相对费时的操作)。于是有下面三个 Map:

1
2
3
4
5
6
7
private HashMap<Long, T> cache;                     // 实际缓存的数据
private HashMap<Long, Integer> references; // 资源的引用个数
private HashMap<Long, Boolean> getting; // 正在被获取的资源

private int maxResource; // 缓存的最大缓存资源数
private int count = 0; // 缓存中元素的个数
private Lock lock;

get()方法获取资源

  1. 判断请求的资源是否正在被其他线程获取,若是,则过一段时间再来查看
  2. 没有其他线程在获取目标资源,到缓存中查看是否有需要的资源,若有,则直接返回
  3. 资源不在缓存中,且缓存已满,抛出CacheFullException异常
  4. 资源不在缓存中且缓存没满,尝试去数据源获取该资源
  5. 在数据源中没找到该资源,抛出异常
  6. 在数据源中找到需要的资源,添加到缓存中后返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
protected T get(long key) throws Exception {
// 1.在通过 get() 方法获取资源时,首先进入一个死循环,来无限尝试从缓存里获取。
while(true) {
lock.lock();
// 1.1.首先就需要检查这个时候是否有其他线程正在从数据源获取这个资源,如果有,就过会再来看看
if(getting.containsKey(key)) {
// 请求的资源正在被其他线程获取
lock.unlock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
continue;
}

// 1.2.如果没有其他线程在获取这个资源,那么就可以尝试从缓存中获取了
if(cache.containsKey(key)) {
// 资源在缓存中,直接返回
T obj = cache.get(key);
// 记得给资源的引用计数加一
references.put(key, references.get(key) + 1);
lock.unlock();
return obj;
}

// 1.3.尝试获取该资源
// a.判断缓存是否已满,如果已满,就抛出一个异常
if(maxResource > 0 && count == maxResource) {
lock.unlock();
throw Error.CacheFullException;
}
// b.如果缓存未满,就在 getting 中注册一下,该线程准备从数据源获取资源了
count ++;
getting.put(key, true);
lock.unlock();
break;
}

// 2.从数据源获取资源
T obj = null;
try {
// 调用抽象方法从数据源中获取资源
obj = getForCache(key);
} catch (Exception e) {
// 如果获取失败,就把 getting 中的注册信息清除掉
lock.lock();
count --;
getting.remove(key);
lock.unlock();
throw e;
}

lock.lock();
getting.remove(key); // 获取完成要从 getting 中清除注册信息
cache.put(key, obj);
references.put(key, 1);
lock.unlock();

return obj;
}

release()方法释放资源

当引用计数references减到0后,就可以回源并删除缓存中的相关结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 3.释放资源
protected void release(long key) {
lock.lock();
try {
// 释放一个缓存时,直接从 references 中减 1,如果已经减到 0 了,就可以回源,并且删除缓存中所有相关的结构
int ref = references.get(key) - 1;
if(ref == 0) {
T obj = cache.get(key);
// 调用抽象方法释放缓存
releaseForCache(obj);
// 删除缓存中所有相关的结构
references.remove(key);
cache.remove(key);
count --;
} else {
references.put(key, ref);
}
} finally {
lock.unlock();
}
}

close()方法关闭缓存

缓存应当还有以一个安全关闭的功能,在关闭时,需要将缓存中所有的资源强行回源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void close() {
lock.lock();
try {
Set<Long> keys = cache.keySet();
for (long key : keys) {
T obj = cache.get(key);
releaseForCache(obj);
references.remove(key);
cache.remove(key);
}
} finally {
lock.unlock();
}
}

这样,一个简单的缓存框架就实现完了,其他的缓存只需要继承这个类,并实现那两个抽象方法即可。

数据页的缓存与管理

页面缓存

参考大部分数据库的设计,将默认数据页大小定为 8K。如果想要提升向数据库写入大量数据情况下的性能的话,也可以适当增大这个值。

上一节我们已经实现了一个通用的缓存框架,那么这一节我们需要缓存页面,就可以直接借用那个缓存的框架了。但是首先,需要定义出页面的结构。注意这个页面是存储在内存中的,与已经持久化到磁盘的抽象页面有区别。定义一个页面如下:

1
2
3
4
5
6
7
8
public class PageImpl implements Page {
private int pageNumber; // 页号,从1开始
private byte[] data; // 此页面实际包含的字节数据
private boolean dirty; // 标识该页面是否是脏页面,在缓存驱逐的时候,脏页面需要被写回磁盘
private Lock lock;

private PageCache pc; // 这里保存了一个 PageCache(还未定义)的引用,用来方便在拿到 Page 的引用时可以快速对这个页面的缓存进行释放操作。
}

页面缓存接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public interface PageCache {

// 页面大小8192
public static final int PAGE_SIZE = 1 << 13;

int newPage(byte[] initData);
Page getPage(int pgno) throws Exception;
void close();
void release(Page page);

void truncateByBgno(int maxPgno);
int getPageNumber();
void flushPage(Page pg);

// 创建一个新的.db文件
public static PageCacheImpl create(String path, long memory) {
File f = new File(path+PageCacheImpl.DB_SUFFIX);
try {
if(!f.createNewFile()) {
Panic.panic(Error.FileExistsException);
}
} catch (Exception e) {
Panic.panic(e);
}
if(!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}
return new PageCacheImpl(raf, fc, (int)memory/PAGE_SIZE);
}

// 打开一个.db文件
public static PageCacheImpl open(String path, long memory) {
File f = new File(path+PageCacheImpl.DB_SUFFIX);
if(!f.exists()) {
Panic.panic(Error.FileNotExistsException);
}
if(!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}

FileChannel fc = null;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(f, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
Panic.panic(e);
}
return new PageCacheImpl(raf, fc, (int)memory/PAGE_SIZE);
}
}

页面缓存的具体实现类PageCacheImpl,需要继承抽象缓存框架AbstractCache,并且实现getForCache() releaseForCache() 两个抽象方法。由于数据源就是文件系统(.db文件),getForCache() 直接从文件中读取,并包裹成 Page 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 根据pageNumber从数据库文件中读取页数据,并包裹成Page
*/
@Override
protected Page getForCache(long key) throws Exception {
// 页号
int pgno = (int)key;
// 页号对应的页在文件中的位置
long offset = PageCacheImpl.pageOffset(pgno);

ByteBuffer buf = ByteBuffer.allocate(PAGE_SIZE);
fileLock.lock();
try {
fc.position(offset);
fc.read(buf);
} catch(IOException e) {
Panic.panic(e);
}
fileLock.unlock();
return new PageImpl(pgno, buf.array(), this);
}

private static long pageOffset(int pgno) {
// 页号从 1 开始
return (pgno-1) * PAGE_SIZE;
}

releaseForCache()驱逐页面时,也只需要根据页面是否是脏页面,来决定是否需要写回文件系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected void releaseForCache(Page pg) {
if(pg.isDirty()) {
flush(pg);
pg.setDirty(false);
}
}

private void flush(Page pg) {
int pgno = pg.getPageNumber();
long offset = pageOffset(pgno);

fileLock.lock();
try {
ByteBuffer buf = ByteBuffer.wrap(pg.getData());
fc.position(offset);
fc.write(buf);
fc.force(false);
} catch(IOException e) {
Panic.panic(e);
} finally {
fileLock.unlock();
}
}

PageCache 还使用了一个 AtomicInteger,来记录了当前打开的数据库文件有多少页。这个数字在数据库文件被打开时就会被计算,并在新建页面时自增。

1
2
3
4
5
6
public int newPage(byte[] initData) {
int pgno = pageNumbers.incrementAndGet();
Page pg = new PageImpl(pgno, initData, null);
flush(pg); // 新建的页面需要立刻写回
return pgno;
}

数据页管理

第一页

数据库文件的第一页,通常用作一些特殊用途,比如存储一些元数据,用来启动检查什么的。MYDB 的第一页,只是用来做启动检查。具体的原理是,在每次数据库启动时,会生成一串随机字节,存储在 100 ~ 107 字节。在数据库正常关闭时,会将这串字节,拷贝到第一页的 108 ~ 115 字节。

这样数据库在每次启动时,就会检查第一页两处的字节是否相同,以此来判断上一次是否正常关闭。如果是异常关闭,就需要执行数据的恢复流程。

启动时设置初始字节:

1
2
3
4
5
6
7
8
public static void setVcOpen(Page pg) {
pg.setDirty(true);
setVcOpen(pg.getData());
}

private static void setVcOpen(byte[] raw) {
System.arraycopy(RandomUtil.randomBytes(LEN_VC), 0, raw, OF_VC, LEN_VC);
}

关闭时拷贝字节:

1
2
3
4
5
6
7
8
public static void setVcClose(Page pg) {
pg.setDirty(true);
setVcClose(pg.getData());
}

private static void setVcClose(byte[] raw) {
System.arraycopy(raw, OF_VC, raw, OF_VC+LEN_VC, LEN_VC);
}

校验字节:

1
2
3
4
5
6
7
public static boolean checkVc(Page pg) {
return checkVc(pg.getData());
}

private static boolean checkVc(byte[] raw) {
return Arrays.equals(Arrays.copyOfRange(raw, OF_VC, OF_VC+LEN_VC), Arrays.copyOfRange(raw, OF_VC+LEN_VC, OF_VC+2*LEN_VC));
}

普通页

一个普通页面以一个 2 字节无符号数起始,表示这一页的空闲位置的偏移。剩下的部分都是实际存储的数据。对普通页的管理,基本都是围绕着对 FSO(Free Space Offset)进行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class PageX {

private static final short OF_FREE = 0;
private static final short OF_DATA = 2; // 数据起始位置
public static final int MAX_FREE_SPACE = PageCache.PAGE_SIZE - OF_DATA; // 页面最大空闲空间

public static byte[] initRaw() {
byte[] raw = new byte[PageCache.PAGE_SIZE];
setFSO(raw, OF_DATA);
return raw;
}

private static void setFSO(byte[] raw, short ofData) {
System.arraycopy(Parser.short2Byte(ofData), 0, raw, OF_FREE, OF_DATA);
}

// 获取pg的FSO
public static short getFSO(Page pg) {
return getFSO(pg.getData());
}
// 获取页面数据row的前两个字节(这两个字节标识空闲位置的起始位置)
private static short getFSO(byte[] raw) {
return Parser.parseShort(Arrays.copyOfRange(raw, 0, 2));
}

// 将raw插入pg中,返回插入位置
public static short insert(Page pg, byte[] raw) {
pg.setDirty(true);
short offset = getFSO(pg.getData());
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);
setFSO(pg.getData(), (short)(offset + raw.length));
return offset;
}

// 获取页面的空闲空间大小
public static int getFreeSpace(Page pg) {
return PageCache.PAGE_SIZE - (int)getFSO(pg.getData());
}

// 下面两个函数在updateLog 和 insertLog 的重做和撤销处理中涉及,用于在数据库崩溃后重新打开时,恢复例程直接插入数据以及修改数据使用
// 将raw插入pg中的offset位置,并将pg的offset设置为较大的offset
public static void recoverInsert(Page pg, byte[] raw, short offset) {
pg.setDirty(true);
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);

short rawFSO = getFSO(pg.getData());
if(rawFSO < offset + raw.length) {
setFSO(pg.getData(), (short)(offset+raw.length));
}
}

// 将raw插入pg中的offset位置,不更新update
public static void recoverUpdate(Page pg, byte[] raw, short offset) {
pg.setDirty(true);
System.arraycopy(raw, 0, pg.getData(), offset, raw.length);
}
}

日志文件与恢复策略

MYDB 提供了崩溃后的数据恢复功能。DM 层在每次对底层数据操作时,都会记录一条日志到磁盘上。在数据库崩溃之后,再次启动时,可以根据日志的内容,恢复数据文件,保证其一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 必要的常量及变量
private static final int SEED = 13331;

private static final int OF_SIZE = 0; // 单条记录Size的起始位置:0
private static final int OF_CHECKSUM = OF_SIZE + 4; // 单条记录CheckSum起始位置:4
private static final int OF_DATA = OF_CHECKSUM + 4; // 单条记录数据起始位置:8

public static final String LOG_SUFFIX = ".log"; // 日志文件后缀

private RandomAccessFile file;
private FileChannel fc;
private Lock lock;

private long position; // 当前日志指针的位置
private long fileSize; // 初始化时记录,log操作不更新
private int xChecksum; // 总校验和

日志读写

日志文件格式

日志的二进制文件,按照如下的格式排布:

[XCheckSum] [Log1] [Log2] [Log3] … [LogN] [BadTail]

其中 XChecksum 是一个四字节的整数int,是对后续所有日志计算的校验和Log1 ~ LogN 是常规的日志数据,BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在。

每条日志的格式如下:

[Size] [CheckSum] [Data]

其中,Size 是一个四字节整数int,标识了 Data 段的字节数。Checksum 则是该条日志的校验和int。单条日志的校验和,其实就是通过一个指定的种子实现的:

1
2
3
4
5
6
private int calChecksum(int xCheck, byte[] log) {
for (byte b : log) {
xCheck = xCheck * SEED + b;
}
return xCheck;
}

这样,对所有日志求出校验和CheckSum,每条校验和再求和(❌)就能得到日志文件的校验和XCheckSum了。

注意,文件里的XCheckSum和每条日志的CheckSum计算对象是不一样的:

  • XCheckSum是用后续所有日志(包括日志里的sizecheckSumdata字段)计算得到的
    • XCheckSum是用来保证文件的完整性的,关心的是整个文件,故可以用一整条日志参与计算
  • 每条日志里的CheckSum只用到日志里的data字段计算得到
    • 这是因为每条日志里的CheckSum是用来确保当前日志data部分的完整性,故只用data部分计算
    • 此外,每条日志里的CheckSum的计算也无法用到整条日志进行计算,这是因为整条日志里也包含自身,如果用整条日志计算CheckSum会用到CheckSum自身,这是有问题的
  • 因此XCheckSum != calChecksum( calChecksum( calChecksum(0, data1), data2), ... dataN)

遍历日志文件中的每一条日志

Logger 被实现成迭代器模式,通过 next() 方法,不断地从文件中读取下一条日志,并将其中的 Data 解析出来并返回next() 方法的实现主要依靠 internNext(),大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private byte[] internNext() {
// 位置超出文件大小
if(position + OF_DATA >= fileSize) {
return null;
}
// 读取size,读取后position指针向后移动4位,指向checkSum开头
ByteBuffer tmp = ByteBuffer.allocate(4);
fc.position(position);
fc.read(tmp);
int size = Parser.parseInt(tmp.array());
if(position + size + OF_DATA > fileSize) {
return null;
}

// 读取checksum+data,log中包含size+checkSum+data
ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
fc.position(position);
fc.read(buf);
byte[] log = buf.array();

// 校验 checksum,确保本条日志的完整性
// checkSum1:手动计算该条日志的校验和,从log的第8位到最后一位读取
// checkSum2:从日志数据log中读取该条日志的校验和,从log的4到8位读取
int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
if(checkSum1 != checkSum2) {
return null;
}
// position指针位置指向下一条日志
position += log.length;
// 返回这条日志
return log;
}

校验日志文件

打开一个日志文件时,需要首先校验日志文件XChecksum,并移除文件尾部可能存在的 BadTail,由于 BadTail 该条日志尚未写入完成,文件的校验和也就不会包含该日志的校验和,去掉 BadTail 即可保证日志文件的一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void checkAndRemoveTail() {
// 设置position为4,也就是XCheckSum后面,第一条日志开头的位置
rewind();

// 对每条日志的!!所有字段!!(区别于一条日志的校验和计算)计算校验和,校验和再求和得到总校验和xCheck
int xCheck = 0;
while(true) {
byte[] log = internNext();
if(log == null) break;
xCheck = calChecksum(xCheck, log);
}

// xCheckSum是从日志文件开头4个字节获得的
if(xCheck != xChecksum) {
Panic.panic(Error.BadLogFileException);
}

// 截断文件到正常日志的末尾
truncate(position);
// position指针回到起点
rewind();
}

@Override
public void rewind() {
position = 4;
}

// 计算总校验和是用不到它,因为它只返回了日志的数据部分
@Override
public byte[] next() {
lock.lock();
try {
byte[] log = internNext();
if(log == null) return null;
return Arrays.copyOfRange(log, OF_DATA, log.length);
} finally {
lock.unlock();
}
}

写入日志

向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void log(byte[] data) {
// 将数据包装成日志格式[Size][CheckSum][Data]
byte[] log = wrapLog(data);
// 将包装好的一条日志包装成ByteBuffer格式
ByteBuffer buf = ByteBuffer.wrap(log);
lock.lock();
try {
// 将日志写到文件通道中的末尾
fc.position(fc.size());
fc.write(buf);
} catch(IOException e) {
Panic.panic(e);
} finally {
lock.unlock();
}
// 更新总校验和xCheckSum(这里的log是[Size][CheckSum][Data]这样一整条日志的格式)
updateXChecksum(log);
}

private void updateXChecksum(byte[] log) {
this.xChecksum = calChecksum(this.xChecksum, log);
// 将更新后的总校验和xCheckSum写到文件开头4个字节的位置
fc.position(0);
fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
fc.force(false);
}

// 在数据前面拼接上Size和CheckSum
private byte[] wrapLog(byte[] data) {
byte[] checksum = Parser.int2Byte(calChecksum(0, data));
byte[] size = Parser.int2Byte(data.length);
return Bytes.concat(size, checksum, data);
}

恢复策略

DM 为上层模块,提供了两种操作,分别是插入新数据(I)更新现有数据(U)。(不包含删除操作)DM 的日志策略很简单,一句话就是:

在进行 I 和 U 操作之前,必须先进行对应的日志操作,在保证日志写入磁盘后,才进行数据操作。

这个日志策略,使得 DM 对于数据操作的磁盘同步,可以更加随意。日志在数据操作之前,保证到达了磁盘,那么即使该数据操作最后没有来得及同步到磁盘数据库就发生了崩溃,后续也可以通过磁盘上的日志恢复该数据

对于两种数据操作,DM 记录的日志如下:

  • (Ti, I, A, x),表示事务 Ti 在 A 位置插入了一条数据 x
  • (Ti, U, A, oldx, newx),表示事务 Ti 将 A 位置的数据,从 oldx 更新成 newx

我们首先不考虑并发的情况,那么在某一时刻,只可能有一个事务在操作数据库。日志会看起来像下面那样:

1
(Ti, x, x), ..., (Ti, x, x), (Tj, x, x), ..., (Tj, x, x), (Tk, x, x), ..., (Tk, x, x)

单线程

由于单线程,Ti、Tj 和 Tk 的日志永远不会相交。这种情况下利用日志恢复很简单,假设日志中最后一个事务是 Ti

  1. Ti 之前所有的事务的日志,进行重做(redo)
  2. 接着检查 Ti 的状态(XID 文件),如果 Ti 的状态是已完成(包括 committed 和 aborted),就将 Ti 重做,否则进行撤销(undo)

接着,是如何对事务 T 进行 redo

  1. 正序扫描事务 T 的所有日志
  2. 如果日志是插入操作 (Ti, I, A, x),就将 x 重新插入 A 位置
  3. 如果日志是更新操作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 newx

undo 也很好理解:

  1. 倒序扫描事务 T 的所有日志
  2. 如果日志是插入操作 (Ti, I, A, x),就将 A 位置的数据删除
  3. 如果日志是更新操作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 oldx

注意,MYDB 中其实没有真正的删除操作,对于插入操作的 undo,只是将其中的标志位设置为 invalid。对于删除的探讨将在 VM 一节中进行。

多线程

考虑以下两种情况:

1
2
3
4
5
6
7
8
9
// 情况1

T1 begin
T2 begin
T2 U(x)
T1 R(x)
...
T1 commit
MYDB break down

在系统崩溃时,T2 仍然是活跃状态(active)。那么当数据库重新启动,执行恢复例程时,会撤销 T2,它对数据库的影响会被消除。但是由于 T1 读取了 T2 更新的值,既然 T2 被撤销,那么 T1 也应当被撤销。这种情况,就是级联回滚Cascading Rollback。但是,T1 已经 commit 了,所有 commit 的事务的影响,应当被持久化。这里就造成了矛盾。所以这里需要保证:

规定1:正在进行的事务,不会读取其他未提交的事务产生的数据(读提交Read Committed)

1
2
3
4
5
6
7
8
// 情况2,假设x的初值为0

T1 begin
T2 begin
T1 set x = x+1 // 产生的日志为(T1, U, A, 0, 1)
T2 set x = x+1 // 产生的日志为(T1, U, A, 1, 2)
T2 commit
MYDB break down

在系统崩溃时,T1 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会对 T1 进行撤销,对 T2 进行重做,但是,无论撤销和重做的先后顺序如何,x 最后的结果,要么是 0,要么是 2,这都是错误的。

出现这种问题的原因, 归根结底是因为我们的日志太过简单, 仅仅记录了**”前相”和”后相”. 并单纯的依靠”前相”undo, 依靠”后相”redo.** 这种简单的日志方式和恢复方式, 并不能涵盖住所有数据库操作形成的语义

解决方法有两种:

  1. 增加日志种类
  2. 限制数据库操作

MYDB 采用的是限制数据库操作,需要保证:

规定2:正在进行的事务,不会修改其他任何未提交的事务修改或产生的数据

实现

两种日志的格式
1
2
3
4
5
6
7
8
private static final byte LOG_TYPE_INSERT = 0;
private static final byte LOG_TYPE_UPDATE = 1;

// updateLog:
// [LogType] [XID] [UID] [OldRaw] [NewRaw]

// insertLog:
// [LogType] [XID] [Pgno] [Offset] [Raw]

和原理中描述的类似,recover 例程主要也是两步:重做所有已完成事务,撤销所有未完成事务

重做所有已完成事务(committed、aborted)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 重做所有已完成事务(committed、aborted)
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
// 从第一个事务开始(重做redo要按顺序重做)
lg.rewind();
while(true) {
// 一个一个事务遍历
byte[] log = lg.next();
if(log == null) break;
if(isInsertLog(log)) {
// 如果是插入事务
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
// 事务状态是committed或aborted,重做插入操作
doInsertLog(pc, log, REDO);
}
} else {
// 如果是更新事务
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
if(!tm.isActive(xid)) {
// 事务状态是committed或aborted,重做更新操作
doUpdateLog(pc, log, REDO);
}
}
}
}
撤销所有未完成事务(active)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 撤销所有未完成事务(active)
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
// 用于记录(要撤回事务的xid,事务xid要撤回的操作列表),一个事务可能要撤回多个操作,所以要用列表记录
Map<Long, List<byte[]>> logCache = new HashMap<>();
lg.rewind();
while(true) {
byte[] log = lg.next();
if(log == null) break;
if(isInsertLog(log)) {
// 如果是插入事务
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
// 事务状态是active
if(tm.isActive(xid)) {
// 把事务xid及其要撤回的操作加入到logCache中
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
} else {
// 如果是更新事务
UpdateLogInfo xi = parseUpdateLog(log);
long xid = xi.xid;
// 事务状态是active
if(tm.isActive(xid)) {
// 把事务xid及其要撤回的操作加入到logCache中
if(!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
}
}

// 对所有active log进行倒序undo
for(Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
List<byte[]> logs = entry.getValue();
for (int i = logs.size()-1; i >= 0; i --) {
byte[] log = logs.get(i);
if(isInsertLog(log)) {
doInsertLog(pc, log, UNDO);
} else {
doUpdateLog(pc, log, UNDO);
}
}
tm.abort(entry.getKey());
}
}

// 判读是否是插入事务,否则是更新事务
private static boolean isInsertLog(byte[] log) {
return log[0] == LOG_TYPE_INSERT;
}
解析插入事务与更新事务
  • 解析插入事务日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 解析插入事务日志 [LogType] [XID] [Pgno] [Offset] [Raw]

private static final int OF_TYPE = 0; // LogType位置:0
private static final int OF_XID = OF_TYPE+1; // XID位置:1
private static final int OF_INSERT_PGNO = OF_XID+8; // 插入事务发生的页号位置:9
private static final int OF_INSERT_OFFSET = OF_INSERT_PGNO+4; // 插入事务发生在页中的位置:13
private static final int OF_INSERT_RAW = OF_INSERT_OFFSET+2; // 插入数据位置:15

// 插入事务日志信息
static class InsertLogInfo {
long xid;
int pgno;
short offset;
byte[] raw;
}

// 解析插入事务日志
private static InsertLogInfo parseInsertLog(byte[] log) {
InsertLogInfo li = new InsertLogInfo();
li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_INSERT_PGNO));
li.pgno = Parser.parseInt(Arrays.copyOfRange(log, OF_INSERT_PGNO, OF_INSERT_OFFSET));
li.offset = Parser.parseShort(Arrays.copyOfRange(log, OF_INSERT_OFFSET, OF_INSERT_RAW));
li.raw = Arrays.copyOfRange(log, OF_INSERT_RAW, log.length);
return li;
}
  • 解析更新事务日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 解析更新事务日志 [LogType] [XID] [UID] [OldRaw] [NewRaw]

private static final int OF_UPDATE_UID = OF_XID+8; // UID位置:9
private static final int OF_UPDATE_RAW = OF_UPDATE_UID+8; // 更新数据位置:17

// 更新事务日志信息
static class UpdateLogInfo {
long xid;
int pgno;
short offset;
byte[] oldRaw;
byte[] newRaw;
}

// 解析更新事务日志
private static UpdateLogInfo parseUpdateLog(byte[] log) {
UpdateLogInfo li = new UpdateLogInfo();
li.xid = Parser.parseLong(Arrays.copyOfRange(log, OF_XID, OF_UPDATE_UID));
long uid = Parser.parseLong(Arrays.copyOfRange(log, OF_UPDATE_UID, OF_UPDATE_RAW));
li.offset = (short)(uid & ((1L << 16) - 1));
uid >>>= 32;
li.pgno = (int)(uid & ((1L << 32) - 1));
int length = (log.length - OF_UPDATE_RAW) / 2;
li.oldRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW, OF_UPDATE_RAW+length);
li.newRaw = Arrays.copyOfRange(log, OF_UPDATE_RAW+length, OF_UPDATE_RAW+length*2);
return li;
}
updateLog 和 insertLog 的重做和撤销处理

updateLog 和 insertLog 的重做和撤销处理,分别合并为一个方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// updateLog的重做和撤销处理
private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
int pgno;
short offset;
byte[] raw;
if(flag == REDO) {
// 重做
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.newRaw; // 重做要设置数据到最新
} else {
// 撤销
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.oldRaw; // 撤销要设置数据到久版本
}
Page pg = null;
try {
pg = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
try {
// 见普通页部分
PageX.recoverUpdate(pg, raw, offset);
} finally {
pg.release();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// insertLog的重做和撤销操作
private static void doInsertLog(PageCache pc, byte[] log, int flag) {
InsertLogInfo li = parseInsertLog(log);
Page pg = null;
try {
pg = pc.getPage(li.pgno);
} catch(Exception e) {
Panic.panic(e);
}
try {
if(flag == UNDO) {
// 撤销插入,逻辑删除,大致的作用,就是将该条 DataItem 的有效位设置为无效,来进行逻辑删除。
DataItem.setDataItemRawInvalid(li.raw);
}
// 见普通页部分
PageX.recoverInsert(pg, li.raw, li.offset);
} finally {
pg.release();
}
}

页面索引与DM的实现

本节将为 DM 层做收尾,介绍一个实现简单的页面索引。并且实现了 DM 层对于上层的抽象:DataItem。

页面索引

页面索引,缓存了每一页的空闲空间。用于在上层模块进行插入操作时,能够快速找到一个合适空间的页面,而无需从磁盘或者缓存中检查每一个页面的信息

MYDB 用一个比较粗略的算法实现了页面索引,将一页的空间划分成了 40 个区间。在启动时,就会遍历所有的页面信息,获取页面的空闲空间,安排到这 40 个区间中。insert 在请求一个页时,会首先将所需的空间向上取整,映射到某一个区间,随后取出这个区间的任何一页,都可以满足需求。

PageIndex 的实现也很简单,一个 List 类型的数组。

lists的作用是将页面信息根据页面的剩余空间分组存储,THRESHOLD = 8192 / 40 = 204.8,这意味着,页面的剩余空间将被按大约 204.8 字节划分成多个区间,每个页面的剩余空间将根据这个阈值来计算自己属于哪个区间,并将自己的页面信息PageInfo放到对应的list下。打个比方:

  • 如果页面的剩余空间是 500 字节,那么它将**被划分到 500 / 204.8 ≈ 2 的区间中(即 lists[2])。**此时如果有一个数据,大小在 [1 * 204.8, 2 * 204.8] 之间,那么他就会先到lists[2]中找到这个页面,并将数据保存在这个页面。
  • 如果剩余空间是 1500 字节,那么它将被划分到 1500 / 204.8 ≈ 7 的区间中(即 lists[7])。
  • 如果有多个页面,它们的剩余空间都在同一个区间内,那就会放在同一个list,要取的时候,从列表头取起。
  • 通过这个操作,要保存数据的时候就不用到磁盘中去一个个找哪个页面能够容纳这个数据,直接在PageIndex中就能够知道哪一页是能容纳该数据的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class PageIndex {
// 将一页划成40个区间
private static final int INTERVALS_NO = 40;
// 一个区间的大小,8192 / 40 = 204.8
private static final int THRESHOLD = PageCache.PAGE_SIZE / INTERVALS_NO;

private Lock lock;
//
private List<PageInfo>[] lists;

@SuppressWarnings("unchecked")
public PageIndex() {
lock = new ReentrantLock();
lists = new List[INTERVALS_NO+1];
for (int i = 0; i < INTERVALS_NO+1; i ++) {
lists[i] = new ArrayList<>();
}
}
}

public class PageInfo {
public int pgno;
public int freeSpace;

public PageInfo(int pgno, int freeSpace) {
this.pgno = pgno;
this.freeSpace = freeSpace;
}
}

当来了一个大小为spaceSize的数据要保存到页面时,使用select()方法,直接算出区间号,直接取即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public PageInfo select(int spaceSize) {
// 计算哪个区间内的页面能容纳该数据(大于number的list中包含能容纳该数据的页面)
int number = spaceSize / THRESHOLD;
if(number < INTERVALS_NO) number ++;
while(number <= INTERVALS_NO) {
// lists[number]中没有页面,那就往下找
if(lists[number].size() == 0) {
number ++;
continue;
}
// 找到一个符合的页面,返回页面信息PageInfo,并将该页从list中移除
return lists[number].remove(0);
}
return null;
}

可以注意到,被选择的页,会直接从 PageIndex 中移除,这意味着,同一个页面是不允许并发写的。在上层模块使用完这个页面后,需要将其重新插入 PageIndex

1
2
3
4
public void add(int pgno, int freeSpace) {
int number = freeSpace / THRESHOLD;
lists[number].add(new PageInfo(pgno, freeSpace));
}

DataManager 被创建时,需要获取所有页面并填充 PageIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 初始化pageIndex
void fillPageIndex() {
int pageNumber = pc.getPageNumber();
// 第一页用于启动检查,页面从1开始,故int i = 2
for(int i = 2; i <= pageNumber; i ++) {
Page pg = null;
try {
// getPage调用了AbstractCache中get方法,调用这个方法会先到缓存(HashMap cache)中找对应页号i的页面,若没有再到数据源(文件系统)找
pg = pc.getPage(i);
} catch (Exception e) {
Panic.panic(e);
}
// 将对应的页面信息添加到PageIndex中
pIndex.add(pg.getPageNumber(), PageX.getFreeSpace(pg));
// 注意在使用完 Page 后需要及时 release,否则可能会撑爆缓存
pg.release();
}
}

DataItem

DataItem是 DM 层向上层提供的数据抽象。上层模块通过地址,向 DM 请求到对应的 DataItem,再获取到其中的数据。DataItem的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* dataItem 结构如下:
* [ValidFlag] [DataSize] [Data]
* ValidFlag 1字节,0为合法,1为非法
* DataSize 2字节,标识Data的长度
*/
public class DataItemImpl implements DataItem {

static final int OF_VALID = 0;
static final int OF_SIZE = 1;
static final int OF_DATA = 3;

private SubArray raw;
private byte[] oldRaw;
private Lock rLock;
private Lock wLock;
private DataManagerImpl dm;
private long uid;
private Page pg;
}

public class SubArray {
public byte[] raw;
public int start;
public int end;

public SubArray(byte[] raw, int start, int end) {
this.raw = raw;
this.start = start;
this.end = end;
}
}

DataItemSubArray raw的大概意思:

  • 因为数据都是放在页面里的,取数据的时候也是按页为单位取的,故**raw.raw一般表示的是数据所在页那一整页的数据**
  • 所以**raw.start表示的是DataItem所表示的数据在这一页中的起始位置**(偏移offset

DataItem 中保存的数据,结构如下:

1
[ValidFlag] [DataSize] [Data]

其中 ValidFlag 占用 1 字节,标识了该 DataItem 是否有效删除一个 DataItem,只需要简单地将其有效位设置为 1DataSize 占用 2 字节,标识了后面 Data 的长度。

根据上面DataItem的结构,我们有:

  • offset = raw.start
  • ValidFlag = raw.raw[offset + OF_VALID]
  • DataSize = Parser.parseShort(Arrays.copyOfRange(raw, offset+DataItemImpl.OF_SIZE, offset+DataItemImpl.OF_DATA))

上层模块在获取到 DataItem 后,可以通过 data() 方法,该方法返回的数组是数据共享的,而不是拷贝实现的,所以使用了 SubArray

1
2
3
4
@Override
public SubArray data() {
return new SubArray(raw.raw, raw.start+OF_DATA, raw.end);
}

在上层模块试图对 DataItem 进行修改时,需要遵循一定的流程:在修改之前需要调用 before() 方法,想要撤销修改时,调用 unBefore() 方法,在修改完成后,调用 after() 方法。整个流程,主要是为了保存前相数据,并及时落日志。DM 会保证对 DataItem 的修改是原子性的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void before() {
wLock.lock();
pg.setDirty(true);
System.arraycopy(raw.raw, raw.start, oldRaw, 0, oldRaw.length);
}

@Override
public void unBefore() {
System.arraycopy(oldRaw, 0, raw.raw, raw.start, oldRaw.length);
wLock.unlock();
}

@Override
public void after(long xid) {
// 调用 dm 中的一个方法,对修改操作落日志
dm.logDataItem(xid, this);
wLock.unlock();
}

public boolean isValid() {
return raw.raw[raw.start+OF_VALID] == (byte)0;
}

在使用完 DataItem 后,也应当及时调用 release()方法,释放掉 DataItem 的缓存(由 DM 缓存 DataItem)。

1
2
3
4
@Override
public void release() {
dm.releaseDataItem(this);
}

DM的实现

DM读取、释放数据DataItem

DataManager 是 DM 层直接对外提供方法的类,同时,也实现成 DataItem 对象的缓存,继承AbstractCacheDataItem 存储的 key,是由页号和页内偏移组成的一个 8 字节无符号整数,页号和偏移各占 4 字节

DataItem 缓存,getForCache(),只需要从 key 中解析出页号,从 pageCache 中获取到页面,再根据偏移,解析出 DataItem 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected DataItem getForCache(long uid) throws Exception {
short offset = (short)(uid & ((1L << 16) - 1));
uid >>>= 32;
int pgno = (int)(uid & ((1L << 32) - 1));
Page pg = pc.getPage(pgno);
return DataItem.parseDataItem(pg, offset, this);
}

// 从页面的offset处解析处DataItem
public static DataItem parseDataItem(Page pg, short offset, DataManagerImpl dm) {
// 获取这个页面的所有数据
byte[] raw = pg.getData();
// 获取这个DataItem的Data字段的长度
short size = Parser.parseShort(Arrays.copyOfRange(raw, offset+DataItemImpl.OF_SIZE, offset+DataItemImpl.OF_DATA));
// 获取这个DataItem总长度,则这个DataItem在这个页面中的范围是[offset, offset + length]
short length = (short)(size + DataItemImpl.OF_DATA);
// 获取这个DataItem的uid
long uid = Types.addressToUid(pg.getPageNumber(), offset);
// 拼装成一个DataItem返回
return new DataItemImpl(new SubArray(raw, offset, offset+length), new byte[length], pg, uid, dm);
}

DataItem 缓存释放,需要将 DataItem 写回数据源,由于对文件的读写是以页为单位进行的,只需要DataItem 所在的页 release 即可:

1
2
3
4
@Override
protected void releaseForCache(DataItem di) {
di.page().release();
}

DM的创建与打开

已有文件创建 DataManager 和从空文件创建 DataManager 的流程稍有不同,除了 PageCache 和 Logger 的创建方式有所不同以外,从空文件创建首先需要对第一页进行初始化,而从已有文件创建,则是需要对第一页进行校验,来判断是否需要执行恢复流程。并重新对第一页生成随机字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static DataManager create(String path, long mem, TransactionManager tm) {
PageCache pc = PageCache.create(path, mem);
Logger lg = Logger.create(path);
DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
dm.initPageOne();
return dm;
}

public static DataManager open(String path, long mem, TransactionManager tm) {
PageCache pc = PageCache.open(path, mem);
Logger lg = Logger.open(path);
DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
if(!dm.loadCheckPageOne()) {
Recover.recover(tm, lg, pc);
}
dm.fillPageIndex();
PageOne.setVcOpen(dm.pageOne);
dm.pc.flushPage(dm.pageOne);
return dm;
}

其中,初始化第一页,和校验第一页,基本都是调用 PageOne 类中的方法实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 在创建文件时初始化PageOne
void initPageOne() {
int pgno = pc.newPage(PageOne.InitRaw());
assert pgno == 1;
try {
pageOne = pc.getPage(pgno);
} catch (Exception e) {
Panic.panic(e);
}
pc.flushPage(pageOne);
}

// 在打开已有文件时时读入PageOne,并验证正确性
boolean loadCheckPageOne() {
try {
pageOne = pc.getPage(1);
} catch (Exception e) {
Panic.panic(e);
}
return PageOne.checkVc(pageOne);
}

DM读、插入数据

DM 层提供了三个功能供上层使用,分别是读、插入和修改。修改是通过读出的 DataItem 实现的,于是 DataManager 只需要提供 read()insert() 方法。

  • read() 根据 UID 从缓存中获取 DataItem,并校验有效位:
1
2
3
4
5
6
7
8
9
@Override
public DataItem read(long uid) throws Exception {
DataItemImpl di = (DataItemImpl)super.get(uid);
if(!di.isValid()) {
di.release();
return null;
}
return di;
}
  • insert() 方法,在 pageIndex 中获取一个足以存储插入内容的页面的页号,获取页面后,首先需要写入插入日志,接着才可以通过 pageX 插入数据,并返回插入位置的偏移。最后需要将页面信息重新插入 pageIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static byte[] wrapDataItemRaw(byte[] raw) {
byte[] valid = new byte[1]; // 起始默认值为0,表示合法
byte[] size = Parser.short2Byte((short)raw.length);
return Bytes.concat(valid, size, raw);
}

@Override
public long insert(long xid, byte[] data) throws Exception {
// 包装成DataItem
byte[] raw = DataItem.wrapDataItemRaw(data);
if(raw.length > PageX.MAX_FREE_SPACE) {
throw Error.DataTooLargeException;
}

// 尝试获取可用页
PageInfo pi = null;
for(int i = 0; i < 5; i ++) {
pi = pIndex.select(raw.length);
if (pi != null) {
break;
} else {
int newPgno = pc.newPage(PageX.initRaw());
pIndex.add(newPgno, PageX.MAX_FREE_SPACE);
}
}
if(pi == null) {
throw Error.DatabaseBusyException;
}

Page pg = null;
int freeSpace = 0;
try {
pg = pc.getPage(pi.pgno);
// 首先做日志
byte[] log = Recover.insertLog(xid, pg, raw);
logger.log(log);
// 再执行插入操作
short offset = PageX.insert(pg, raw);

pg.release();
return Types.addressToUid(pi.pgno, offset);

} finally {
// 将取出的pg重新插入pIndex
if(pg != null) {
pIndex.add(pi.pgno, PageX.getFreeSpace(pg));
} else {
pIndex.add(pi.pgno, freeSpace);
}
}
}

DM的关闭

DataManager 正常关闭时,需要执行缓存和日志的关闭流程,不要忘了设置第一页的字节校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void close() {
// DataManager缓存关闭
super.close();
// 日志关闭
logger.close();


// 第一页的字节校验
PageOne.setVcClose(pageOne);
pageOne.release();
// 页面缓存关闭
pc.close();
}

4. Version Manager

VM 基于两段锁协议实现了调度序列的可串行化,并实现了 MVCC 以消除读写阻塞。同时实现了两种隔离级别。类似于 Data Manager 是 MYDB 的数据管理核心,Version Manager 是 MYDB 的事务和数据版本的管理核心

2PL与MVCC

冲突与2PL

首先来定义数据库的冲突,暂时不考虑插入操作,只看更新操作(U)和读操作(R),两个操作只要满足下面三个条件,就可以说这两个操作相互冲突

  1. 这两个操作是由不同的事务执行的
  2. 这两个操作操作的是同一个数据项
  3. 这两个操作至少有一个是更新操作

那么这样,对同一个数据操作的冲突,其实就只有下面这两种情况:

  1. 两个不同事务的 U 操作冲突
  2. 两个不同事务的 U、R 操作冲突

那么冲突或者不冲突,意义何在?作用在于,交换两个互不冲突的操作的顺序,不会对最终的结果造成影响,而交换两个冲突操作的顺序,则是会有影响的。

VM 的一个很重要的职责,就是实现了调度序列的可串行化。MYDB 采用两段锁协议(2PL)来实现。当采用 2PL 时,如果某个事务 i 已经对 x 加锁,且另一个事务 j 也想操作 x,但是这个操作与事务 i 之前的操作相互冲突的话,事务 j 就会被阻塞。譬如,T1 已经因为 U1(x) 锁定了 x,那么 T2 对 x 的读或者写操作都会被阻塞,T2 必须等待 T1 释放掉对 x 的锁。

由此来看,2PL 确实保证了调度序列的可串行话,但是不可避免地导致了事务间的相互阻塞,甚至可能导致死锁。MYDB 为了提高事务处理的效率,降低阻塞概率,实现了 MVCC。

MVCC

在介绍 MVCC 之前,首先明确记录和版本的概念。

DM 层向上层提供了数据项(DataItem)的概念,VM 通过管理所有的数据项,向上层提供了记录(Entry)的概念。上层模块通过 VM 操作数据的最小单位,就是记录。VM 则在其内部,为每个记录,维护了多个版本(Version)。每当上层模块对某个记录进行修改时,VM 就会为这个记录创建一个新的版本

MYDB 通过 MVCC,降低了事务的阻塞概率。譬如,T1 想要更新记录 X 的值,于是 T1 需要首先获取 X 的锁,接着更新,也就是创建了一个新的 X 的版本,假设为 x3。假设 T1 还没有释放 X 的锁时,T2 想要读取 X 的值,这时候就不会阻塞,MYDB 会返回一个较老版本的 X,例如 x2。这样最后执行的结果,就等价于,T2 先执行,T1 后执行,调度序列依然是可串行化的。如果 X 没有一个更老的版本,那只能等待 T1 释放锁了。所以只是降低了概率

记录的实现

对于一条记录来说,MYDB 使用 Entry 类维护了其结构。虽然理论上,MVCC 实现了多版本,但是在实现中,VM 并没有提供 Update 操作,对于字段的更新操作由后面的表和字段管理(TBM)实现。所以在 VM 的实现中,一条记录只有一个版本。

一条记录存储在一条 DataItem 中,所以 Entry 中保存一个 DataItem 的引用即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* VM向上层抽象出entry
* entry结构:
* [XMIN] [XMAX] [data]
*/
public class Entry {
private static final int OF_XMIN = 0; // entry XMIN的起始位置:0
private static final int OF_XMAX = OF_XMIN+8; // entry XMAX的起始位置:8
private static final int OF_DATA = OF_XMAX+8; // 注意⚠️,这里的OF_DATA=16要和DataItemImpl中的OF_DATA=3区分开来

private long uid;
private DataItem dataItem;
private VersionManager vm;

public static Entry loadEntry(VersionManager vm, long uid) throws Exception {
DataItem di = ((VersionManagerImpl)vm).dm.read(uid);
return newEntry(vm, di, uid);
}

public void remove() {
dataItem.release();
}
}

我们规定,一条 Entry 中存储的数据格式如下:

1
2
// XMIN和XMAX都是一个8字节的byte[]
[XMIN] [XMAX] [DATA]

XMIN创建该条记录(版本)的事务编号,而 XMAX 则是删除该条记录(版本)的事务编号。它们的作用将在下一节中说明。DATA 就是这条记录持有的数据。根据这个结构,在创建记录时调用的 wrapEntryRaw() 方法如下:

1
2
3
4
5
public static byte[] wrapEntryRaw(long xid, byte[] data) {
byte[] xmin = Parser.long2Byte(xid);
byte[] xmax = new byte[8];
return Bytes.concat(xmin, xmax, data);
}

同样,如果要获取记录中持有的数据,也就需要按照这个结构来解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 以拷贝的形式返回内容
public byte[] data() {
dataItem.rLock();
try {
// 从DataItem中取出数据部分:也就是除去VaildFlag和DataSize后面的数据部分,也是一个SubArray
SubArray sa = dataItem.data();
// 取出来的数据sa也被分为[XMIN] [XMAX] [DATA]的结构,这里的OF_DATA=16
byte[] data = new byte[sa.end - sa.start - OF_DATA];
System.arraycopy(sa.raw, sa.start+OF_DATA, data, 0, data.length);
return data;
} finally {
dataItem.rUnLock();
}
}

这里以拷贝的形式返回数据,如果需要修改的话,需要对 DataItem 执行 before() 方法,这个在设置 XMAX 的值中体现了:

1
2
3
4
5
6
7
8
9
10
public void setXmax(long xid) {
dataItem.before();
try {
SubArray sa = dataItem.data();
// 将sa的XMAX部分修改为当前的xid
System.arraycopy(Parser.long2Byte(xid), 0, sa.raw, sa.start+OF_XMAX, 8);
} finally {
dataItem.after(xid);
}
}

事务的隔离级别

读提交(Read Committed, RC)

上面提到,如果一个记录的最新版本被加锁,当另一个事务想要修改或读取这条记录时,MYDB 就会返回一个较旧的版本的数据。这时就可以认为,最新的被加锁的版本,对于另一个事务来说,是不可见的。于是版本可见性的概念就诞生了。

版本的可见性与事务的隔离度是相关的。MYDB 支持的最低的事务隔离程度,是“读提交”(Read Committed),即事务在读取数据时, 只能读取已经提交事务产生的数据。保证最低的读提交的好处,第四章中已经说明(防止级联回滚与 commit 语义冲突)。

MYDB 实现读提交,为每个版本维护了两个变量,就是上面提到的 XMINXMAX

  • XMIN:创建该版本的事务编号
  • XMAX:删除该版本的事务编号

XMIN 应当在版本创建时填写,而 XMAX 则在版本被删除,或者有新版本出现时填写。

XMAX 这个变量,也就解释了为什么 DM 层不提供删除操作,当想删除一个版本时,只需要设置其 XMAX,这样,这个版本对每一个 XMAX 之后的事务都是不可见的,也就等价于删除了。

如此,在读提交下,版本对事务的可见性逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
// 对于当前事务Ti,当如下逻辑判断为true时,该版本对当前事务Ti可见

(XMIN == Ti and // 由Ti创建且
XMAX == NULL // 还未被删除
)
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
(XMAX == NULL or // 尚未删除或
(XMAX != Ti and XMAX is not commited) // 由一个未提交的事务删除
)
)

若条件为 true,则版本对 Ti 可见。那么获取 Ti 适合的版本,只需要从最新版本开始,依次向前检查可见性,如果为 true,就可以直接返回。以下方法判断某个记录对事务 t 是否可见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static boolean readCommitted(TransactionManager tm, Transaction t, Entry e) {
// 当前事务id
long xid = t.xid;
// 该条记录的XMIN,即创建该记录的事务id
long xmin = e.getXmin();
// 该条记录的XMAX,即删除该记录的事务id
long xmax = e.getXmax();

// 该记录由当前事务创建,且还未被删除,则对当前事务可见
if(xmin == xid && xmax == 0) return true;

// 该记录由一条已提交事务创建
if(tm.isCommitted(xmin)) {
// 该记录还未被删除,则对当前事务可见
if(xmax == 0) return true;
// 该记录已被删除,但不是当前事务删除的,且删除这条记录的事务还没提交,则对当前事务可见
if(xmax != xid) {
if(!tm.isCommitted(xmax)) {
return true;
}
}
}
// 对当前事务不可见
return false;
}

可重复读(Repeatable Read, RR)

读提交会导致的问题大家也都很清楚,八股也背了不少。那就是不可重复读和幻读。这里我们来解决不可重复读的问题。

1
2
3
4
5
6
T1 begin
R1(X) // T1 读得 0
T2 begin
U2(X) // 将 X 修改为 1
T2 commit
R1(X) // T1 读的 1

T1 在第二次读取的时候,读到了已经提交的 T2 修改的值,导致了这个问题。于是我们可以规定:

事务只能读取它开始时, 就已经结束的那些事务产生的数据版本

这条规定,相当于,事务需要忽略

  1. 本事务后开始的事务的数据;
  2. 本事务开始时还是 active 状态的事务的数据

对于第一条,只需要比较事务 ID,即可确定,因为事务ID是自增的。而对于第二条,则需要在事务 Ti 开始时,记录下当前活跃的所有事务 SP(Ti),如果记录的某个版本,XMIN 在 SP(Ti) 中,说明创建这个版本记录的事务,在当前事务开始时处于active状态,也应当对 Ti 不可见。于是,可重复读的判断逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 对于当前事务Ti,当如下逻辑判断为true时,该版本对当前事务Ti可见

(XMIN == Ti and // 由Ti创建且
(XMAX == NULL) // 尚未被删除
)
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
XMIN < XID and // 这个事务小于Ti且
XMIN is not in SP(Ti) and // 这个事务在Ti开始前提交且
(XMAX == NULL or // 尚未被删除或
(XMAX != Ti and // 由其他事务删除但是
(XMAX is not commited or // 这个事务尚未提交或
XMAX > Ti or // 这个事务在Ti开始之后才开始或
XMAX is in SP(Ti) // 这个事务在Ti开始前还未提交
))))

于是,需要提供一个结构,来抽象一个事务,以保存快照数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Transaction {
public long xid;
public int level;
// snapshot用来保存活跃事务id
public Map<Long, Boolean> snapshot;
public Exception err;
public boolean autoAborted;

// 构造方法,active保存着当前所有状态为active的事务
public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) {
Transaction t = new Transaction();
t.xid = xid;
t.level = level;
// 如果是可重复读隔离级别,会将当前处于active的事务id保存到snapshot中
if(level != 0) {
t.snapshot = new HashMap<>();
for(Long x : active.keySet()) {
t.snapshot.put(x, true);
}
}
return t;
}

public boolean isInSnapshot(long xid) {
if(xid == TransactionManagerImpl.SUPER_XID) {
return false;
}
return snapshot.containsKey(xid);
}
}

于是,可重复读的隔离级别下,一个版本是否对事务可见的判断如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static boolean repeatableRead(TransactionManager tm, Transaction t, Entry e) {
// 当前事务id
long xid = t.xid;
// 创建该记录的事务id
long xmin = e.getXmin();
// 删除该记录的事务id
long xmax = e.getXmax();
// 当前事务创建了该记录 且 该记录还没被删除
if(xmin == xid && xmax == 0) return true;

// 创建该记录的事务已提交 且 创建该记录的事务比当前事务早 且 当前事务创建时,创建该记录的事务不处于active状态
if(tm.isCommitted(xmin) && xmin < xid && !t.isInSnapshot(xmin)) {
// 该记录还未被删除
if(xmax == 0) return true;
// 该记录被删除,但不是当前事务删除的
if(xmax != xid) {
// 删除该记录的事务未提交 或 删除该记录的事务比当前事务晚 或 当前事务创建时,删除该记录的事务处于active状态
if(!tm.isCommitted(xmax) || xmax > xid || t.isInSnapshot(xmax)) {
return true;
}
}
}
return false;
}

版本跳跃问题

版本跳跃问题,考虑如下的情况,假设 X 最初只有 x0 版本,T1 和 T2 都是可重复读的隔离级别:

1
2
3
4
5
6
7
8
T1 begin
T2 begin
R1(X) // T1读取x0
R2(X) // T2读取x0
U1(X) // T1将X更新到x1
T1 commit
U2(X) // T2将X更新到x2
T2 commit

这种情况实际运行起来是没问题的,但是逻辑上不太正确。T1 将 X 从 x0 更新为了 x1,这是没错的。但是 T2 则是将 X 从 x0 更新成了 x2,跳过了 x1 版本

**读提交是允许版本跳跃的,而可重复读则是不允许版本跳跃的。**解决版本跳跃的思路也很简单:如果 Ti 需要修改 X,而 X 已经被 Ti 不可见的事务 Tj 修改了,那么要求 Ti 回滚

上一节中就总结了,Ti 不可见的 Tj,有两种情况:

  1. XID(Tj) > XID(Ti)
  2. Tj in SP(Ti)

于是版本跳跃的检查也就很简单了,取出要修改的数据 X 的最新提交版本,并检查该最新版本的创建者对当前事务是否可见

IMG_3883 2
1
2
3
4
5
6
7
8
9
10
11
12
// 检查版本跳跃 -> 检查当前事务要修改的数据最新版本的创建者是否对当前事务可见
public static boolean isVersionSkip(TransactionManager tm, Transaction t, Entry e) {
long xmax = e.getXmax();
// level = 0是读提交,可以接受版本跳跃
if(t.level == 0) {
return false;
} else {
// 创建该数据的最新版本的事务已提交 且 (这个事务比当前事务创建晚 或 这个事务在当前事务创建时处于active状态),而我们当前事务想要更改这个数据,就会发生版本跳跃
return tm.isCommitted(xmax) &&
(xmax > t.xid || t.isInSnapshot(xmax));
}
}

死锁检测

上一节提到了 2PL 会阻塞事务,直至持有锁的线程释放锁。可以将这种等待关系抽象成有向边,例如 Tj 在等待 Ti,就可以表示为 Tj –> Ti。这样,无数有向边就可以形成一个图(不一定是连通图)。检测死锁也就简单了,只需要查看这个图中是否有环即可

MYDB 使用一个 LockTable 对象,在内存中维护这张图。维护结构如下:

1
2
3
4
5
6
7
8
9
10
11
public class LockTable {

private Map<Long, List<Long>> x2u; // 某个XID已经获得的资源的UID列表
private Map<Long, Long> u2x; // UID被某个XID持有
private Map<Long, List<Long>> wait; // 正在等待UID的XID列表
private Map<Long, Lock> waitLock; // 正在等待资源的XID的锁
private Map<Long, Long> waitU; // XID正在等待的UID
private Lock lock;

...
}

每次出现等待的情况时,就尝试向图中增加一条边,并进行死锁检测。如果检测到死锁,就撤销这条边,不允许添加,并撤销该事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// xid事务想要获得uid资源
// 不需要等待则返回null,否则返回锁对象
// 会造成死锁则抛出异常
public Lock add(long xid, long uid) throws Exception {
lock.lock();
try {
// xid事务已获得uid资源
if(isInList(x2u, xid, uid)) {
return null;
}
// 没有事务持有uid资源,则xid事务可以获取
if(!u2x.containsKey(uid)) {
u2x.put(uid, xid);
putIntoList(x2u, xid, uid);
return null;
}
// xid事务正在等待uid资源
waitU.put(xid, uid);
// 将xid事务加入到等待uid资源的事务列表里
putIntoList(wait, xid, uid);
// 发生死锁,抛出异常
if(hasDeadLock()) {
waitU.remove(xid);
removeFromList(wait, uid, xid);
throw Error.DeadlockException;
}
// 没发生死锁,则将一个上了锁的Lock对象加入到xid事务的锁map里,返回这个锁
Lock l = new ReentrantLock();
l.lock();
waitLock.put(xid, l);
return l;
} finally {
lock.unlock();
}
}

调用 add,如果需要等待的话,会返回一个上了锁的 Lock 对象。调用方在获取到该对象时,需要尝试获取该对象的锁,由此实现阻塞线程的目的,例如:

1
2
3
4
5
6
// ⚠️
Lock l = lt.add(xid, uid);
if(l != null) {
l.lock(); // 阻塞在这一步? l是一个可重入锁,同一个线程再执行加锁不会阻塞吧?
l.unlock();
}

查找图中是否有环的算法也非常简单,就是一个深搜,只是需要注意这个图不一定是连通图。思路就是为每个节点设置一个访问戳,都初始化为 -1,随后遍历所有节点,以每个非 -1 的节点作为根进行深搜,并将深搜该连通图中遇到的所有节点都设置为同一个数字,不同的连通图数字不同。这样,如果在遍历某个图时,遇到了之前遍历过的节点,说明出现了环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private boolean hasDeadLock() {
xidStamp = new HashMap<>();
stamp = 1;
for(long xid : x2u.keySet()) {
Integer s = xidStamp.get(xid);
if(s != null && s > 0) {
continue;
}
stamp ++;
if(dfs(xid)) {
return true;
}
}
return false;
}

private boolean dfs(long xid) {
Integer stp = xidStamp.get(xid);
if(stp != null && stp == stamp) {
return true;
}
if(stp != null && stp < stamp) {
return false;
}
xidStamp.put(xid, stamp);

Long uid = waitU.get(xid);
if(uid == null) return false;
Long x = u2x.get(uid);
assert x != null;
return dfs(x);
}

在一个事务 commit 或者 abort 时,就可以释放所有它持有的锁,并将自身从等待图中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void remove(long xid) {
lock.lock();
try {
List<Long> l = x2u.get(xid);
if(l != null) {
// 把所有资源都释放
while(l.size() > 0) {
Long uid = l.remove(0);
// 被释放的资源,从它的等待队列中选一个xid事务来占用该uid资源
selectNewXID(uid);
}
}

// abort时可能还在等别的资源,abort后就不用等了,故移除waitU中的东西
waitU.remove(xid);
// xid事务已释放所有资源,将xid从获得资源map中移除
x2u.remove(xid);
// 因为abort时可能还在等资源,故waitLock中还有东西,要移除
waitLock.remove(xid);
} finally {
lock.unlock();
}
}

while 循环释放掉了这个线程所有持有的资源的锁,这些资源可以被等待的线程所获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 从等待队列中选择一个xid来占用uid
private void selectNewXID(long uid) {
// 当前uid已不被占有
u2x.remove(uid);
// 去uid资源的等待列表中找一个xid事务来占用
List<Long> l = wait.get(uid);
// 没有xid想要uid
if(l == null) return;
assert l.size() > 0;
while(l.size() > 0) {
// 选取等待列表中最前面的xid,并将其从wait中移除
long xid = l.remove(0);
// 若xid没有等待资源锁,说明xid已获得过了资源,跳过这个xid
// 至于为什么获得过了还会在wait中,因为在remove那里xid释放所有资源时并没有将wait的(uid, xid)记录删除,故导致wait中xid还在等uid,但是实际上xid已经获取并释放过uid了;但是如下方所示,获取资源后会将xid从waitLock中删除,故用waitLock来判断这个xid是否还需要资源。
if(!waitLock.containsKey(xid)) {
continue;
} else {
// xid占有uid
u2x.put(uid, xid);
// xid的资源列表要加上uid
// putIntoList(x2u, xid, uid);

// xid的等待资源锁要移除
Lock lo = waitLock.remove(xid);
// 将(xid,uid)从waitU中移除
waitU.remove(xid);
// xid的等待资源锁解锁,才能进行获得资源后的操作
lo.unlock();
break;
}
}
// uid资源分配出去后,发现没有事务在等它了,将它从wait中移除
if(l.size() == 0) wait.remove(uid);
}

VM的实现

VM 层通过 VersionManager 接口,向上层提供功能,如下:

1
2
3
4
5
6
7
8
9
public interface VersionManager {
byte[] read(long xid, long uid) throws Exception;
long insert(long xid, byte[] data) throws Exception;
boolean delete(long xid, long uid) throws Exception;

long begin(int level);
void commit(long xid) throws Exception;
void abort(long xid);
}

同时,VM 的实现类还**被设计为 Entry 的缓存,需要继承 AbstractCache<Entry>。**需要实现的获取到缓存和从缓存释放的方法很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected Entry getForCache(long uid) throws Exception {
Entry entry = Entry.loadEntry(this, uid);
if(entry == null) {
throw Error.NullEntryException;
}
return entry;
}

@Override
protected void releaseForCache(Entry entry) {
entry.remove();
}

public static Entry loadEntry(VersionManager vm, long uid) throws Exception {
DataItem di = ((VersionManagerImpl)vm).dm.read(uid);
return newEntry(vm, di, uid);
}

public void remove() {
dataItem.release();
}

begin() 开启一个事务,并初始化事务的结构,将其存放在 activeTransaction,用于检查和快照使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public long begin(int level) {
lock.lock();
try {
// 创建一个新事务,事务id自增并返回
long xid = tm.begin();
// 创建事务时传入activeTransaction是为了创造快照
Transaction t = Transaction.newTransaction(xid, level, activeTransaction);
// 创建完将这个事务也加入active事务组中
activeTransaction.put(xid, t);
return xid;
} finally {
lock.unlock();
}
}

commit() 方法提交一个事务,主要就是 free 掉相关的结构,并且释放持有的锁,并修改 TM 状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void commit(long xid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();
try {
if(t.err != null) {
throw t.err;
}
} catch(NullPointerException n) {
System.out.println(xid);
System.out.println(activeTransaction.keySet());
Panic.panic(n);
}
lock.lock();
// 将xid事务从active事务中中移除
activeTransaction.remove(xid);
lock.unlock();
// 释放xid事务所拥有的所有资源
lt.remove(xid);
// 提交xid事务
tm.commit(xid);
}

abort 事务的方法则有两种,手动和自动。手动指的是调用 abort() 方法;而自动,则是在事务被检测出出现死锁时,会自动撤销回滚事务;或者出现版本跳跃时,也会自动回滚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void internAbort(long xid, boolean autoAborted) {
lock.lock();
Transaction t = activeTransaction.get(xid);
if(!autoAborted) {
activeTransaction.remove(xid);
}
lock.unlock();
if(t.autoAborted) return;
lt.remove(xid);
tm.abort(xid);
}

// 手动abort
@Override
public void abort(long xid) {
internAbort(xid, false);
}
// 自动abort,t.autoAborted默认值为false
internAbort(xid, true); 然后-> t.autoAborted = true;

read() 方法读取一个 entry,注意判断下可见性即可:

image-20250222193536630
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public byte[] read(long xid, long uid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();
if(t.err != null) {
throw t.err;
}
Entry entry = super.get(uid);
try {
// 想要读取的资源是对于当前事务是可见的
if(Visibility.isVisible(tm, t, entry)) {
return entry.data();
} else {
return null;
}
} finally {
entry.release();
}
}

public static boolean isVisible(TransactionManager tm, Transaction t, Entry e) {
if(t.level == 0) {
// 读提交隔离级别下查看是否可见
return readCommitted(tm, t, e);
} else {
// 可重复读隔离级别下查看是否可见
return repeatableRead(tm, t, e);
}
}

insert() 则是将数据包裹成 Entry,无脑交给 DM 插入即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public long insert(long xid, byte[] data) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();
if(t.err != null) {
throw t.err;
}
// 将数据包裹成Entry格式:[XMIN] [XMAX] [data],然后交给DM层
byte[] raw = Entry.wrapEntryRaw(xid, data);
// 插入时DM会再将entry包裹成DataItem格式:[ValidFlag] [DataSize] [Data]
// Entry就是这里的Data
return dm.insert(xid, raw);
}

delete() 方法看起来略为复杂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public boolean delete(long xid, long uid) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();

if(t.err != null) {
throw t.err;
}
Entry entry = super.get(uid);
try {
// 要删除的资源对这个事务不可见,无法删除
if(!Visibility.isVisible(tm, t, entry)) {
return false;
}
Lock l = null;
try {
// 可见,就让xid去获取uid
l = lt.add(xid, uid);
} catch(Exception e) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// 获取非空,是一把锁
if(l != null) {
// ⚠️由于l是一把可重入锁,当获取到一把已上锁的锁时,再在这里上锁也能上锁成功,不会发生阻塞。。。
// 而我们这段代码的思路就是要让程序阻塞在这里。。。
l.lock();
l.unlock();
}
// 本事务已经删除过了(同一个事务,前面删除过了,这次再删当然删不了)
if(entry.getXmax() == xid) {
return false;
}
// 发生版本跳跃,抛出异常
if(Visibility.isVersionSkip(tm, t, entry)) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// 设置XMAX为本事务的xid,表明事务xid已删除这条数据
entry.setXmax(xid);
return true;
} finally {
entry.release();
}
}

实际上主要是前置的三件事:一是可见性判断,二是获取资源的锁,三是版本跳跃判断。删除的操作只有一个设置 XMAX

5. Index Manager

IM,即 Index Manager,索引管理器,为 MYDB 提供了基于 B+ 树的聚簇索引。目前 MYDB 只支持基于索引查找数据,不支持全表扫描。

二叉树索引

二叉树由一个个 Node 组成,每个 Node 都存储在一条 DataItem 中。结构如下:

1
2
[LeafFlag][KeyNumber][SiblingUid]
[Son0][Key0][Son1][Key1]...[SonN][KeyN]

其中 LeafFlag 标记了该节点是否是个叶子节点;KeyNumber 为该节点中 key 的个数;SiblingUid 是其兄弟节点存储在 DM 中的 UID。后续是穿插的子节点(SonN)和 KeyN。最后的一个 KeyN 始终为 MAX_VALUE,以此方便查找。

IMG_3885

Node 类持有了其 B+ 树结构的引用,DataItem 的引用和 SubArray 的引用,用于方便快速修改数据和释放数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Node {

// LeafFlag起始位置:0
static final int IS_LEAF_OFFSET = 0;
// KeyNumber起始位置:1
static final int NO_KEYS_OFFSET = IS_LEAF_OFFSET+1;
// SiblingUid起始位置:3
static final int SIBLING_OFFSET = NO_KEYS_OFFSET+2;
// Node头文件大小:[LeafFlag][KeyNumber][SiblingUid]
static final int NODE_HEADER_SIZE = SIBLING_OFFSET+8;

static final int BALANCE_NUMBER = 32;
static final int NODE_SIZE = NODE_HEADER_SIZE + (2*8)*(BALANCE_NUMBER*2+2);

BPlusTree tree;
DataItem dataItem;
SubArray raw;
long uid;
...
}

于是生成一个根节点的数据可以写成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 该根节点的初始两个子节点为 left 和 right, 初始键值为 key。
static byte[] newRootRaw(long left, long right, long key) {
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);
// 根结点为非叶节点,LeafFlag = 0
setRawIsLeaf(raw, false);
// 有left、right两个子节点,KeyNumber = 2
setRawNoKeys(raw, 2);
// 根结点没有兄弟节点,SiblingUid = 0
setRawSibling(raw, 0);
// left子节点为第一个子节点
setRawKthSon(raw, left, 0);
// 给left子节点设置初始键值:key
setRawKthKey(raw, key, 0);
// right子节点为第二个子节点
setRawKthSon(raw, right, 1);
// 给right子节点设置键值MAX_VALUE
setRawKthKey(raw, Long.MAX_VALUE, 1);
return raw.raw;
}

类似的,生成一个空的根节点数据:

1
2
3
4
5
6
7
static byte[] newNilRootRaw()  {
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);
setRawIsLeaf(raw, true);
setRawNoKeys(raw, 0);
setRawSibling(raw, 0);
return raw.raw;
}

Node 类有两个方法,用于辅助 B+ 树做插入和搜索操作,分别是 searchNext 方法和 leafSearchRange 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class SearchNextRes {
long uid;
long siblingUid;
}

// searchNext 寻找对应 key 的 UID, 如果找不到, 则返回兄弟节点的 UID。
public SearchNextRes searchNext(long key) {
dataItem.rLock();
try {
SearchNextRes res = new SearchNextRes();
// 根据raw数据,获取当前Node节点的key数量
int noKeys = getRawNoKeys(raw);
// 遍历所有key
for(int kth = 0; kth < noKeys; kth ++) {
// 获取第kth个key
long ik = getRawKthKey(raw, kth);
if(key < ik) {
res.uid = getRawKthSon(raw, kth);
res.siblingUid = 0;
return res;
}
}
res.uid = 0;
res.siblingUid = getRawSibling(raw);
return res;

} finally {
dataItem.rUnLock();
}
}

leafSearchRange 方法在当前节点进行范围查找,范围是 [leftKey, rightKey],这里约定如果 rightKey 大于等于该节点的最大的 key, 则还同时返回兄弟节点的 UID,方便继续搜索下一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public LeafSearchRangeRes leafSearchRange(long leftKey, long rightKey) {
dataItem.rLock();
try {
int noKeys = getRawNoKeys(raw);
int kth = 0;
while(kth < noKeys) {
long ik = getRawKthKey(raw, kth);
// 从leftKey开始找起
if(ik >= leftKey) {
break;
}
kth ++;
}
List<Long> uids = new ArrayList<>();
// 在不超出key数量的前提下
while(kth < noKeys) {
long ik = getRawKthKey(raw, kth);
// 若key在[leftKey, rightKey]范围内
if(ik <= rightKey) {
// 将符合条件的节点的uid加入
uids.add(getRawKthSon(raw, kth));
kth ++;
} else {
// 在key数量范围内找完了 [leftKey, rightKey]范围的节点
break;
}
}
long siblingUid = 0;
// 发现rightKey大于等于该节点的最大的 key
if(kth == noKeys) {
// 返回兄弟节点的uid
siblingUid = getRawSibling(raw);
}
LeafSearchRangeRes res = new LeafSearchRangeRes();
res.uids = uids;
res.siblingUid = siblingUid;
return res;
} finally {
dataItem.rUnLock();
}
}

由于 B+ 树在插入删除时,会动态调整,根节点不是固定节点,于是设置一个 bootDataItem,该 DataItem 中存储了根节点的 UID。可以注意到,IM 在操作 DM 时,使用的事务都是 SUPER_XID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class BPlusTree {
DataItem bootDataItem;

private long rootUid() {
bootLock.lock();
try {
SubArray sa = bootDataItem.data();
return Parser.parseLong(Arrays.copyOfRange(sa.raw, sa.start, sa.start+8));
} finally {
bootLock.unlock();
}
}

private void updateRootUid(long left, long right, long rightKey) throws Exception {
bootLock.lock();
try {
byte[] rootRaw = Node.newRootRaw(left, right, rightKey);
long newRootUid = dm.insert(TransactionManagerImpl.SUPER_XID, rootRaw);
bootDataItem.before();
SubArray diRaw = bootDataItem.data();
System.arraycopy(Parser.long2Byte(newRootUid), 0, diRaw.raw, diRaw.start, 8);
bootDataItem.after(TransactionManagerImpl.SUPER_XID);
} finally {
bootLock.unlock();
}
}
}

这里可能会有疑问,IM 为什么不提供删除索引的能力。当上层模块通过 VM 删除某个 Entry,实际的操作是设置其 XMAX。如果不去删除对应索引的话,当后续再次尝试读取该 Entry 时,是可以通过索引寻找到的,但是由于设置了 XMAX,寻找不到合适的版本而返回一个找不到内容的错误。

6. Table Manager

本章概述 TBM,即表管理器的实现。TBM 实现了对字段结构和表结构的管理。同时简要介绍 MYDB 使用的类 SQL 语句的解析。

SQL解析器

Parser 实现了对类 SQL 语句的结构化解析,将语句中包含的信息封装为对应语句的类,这些类可见 top.guoziyang.mydb.backend.parser.statement 包。

MYDB 实现的 SQL 语句语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 开启事务,isolation level设置事务隔离级别,默认为RC
<begin statement>
begin [isolation level (read committedrepeatable read)]
begin isolation level read committed

// 提交事务
<commit statement>
commit

// 撤销事务
<abort statement>
abort

// 建表语句
<create statement>
create table <table name>
<field name> <field type>
<field name> <field type>
...
<field name> <field type>
[(index <field name list>)]
create table students
id int32,
name string,
age int32,
(index id name)

// 删表语句
<drop statement>
drop table <table name>
drop table students

// 查询语句
<select statement>
select (*<field name list>) from <table name> [<where statement>]
select * from student where id = 1
select name from student where id > 1 and id < 4
select name, age, id from student where id = 12

// 插入语句
<insert statement>
insert into <table name> values <value list>
insert into student values 5 "Zhang Yuanjia" 22

// 删除语句
<delete statement>
delete from <table name> <where statement>
delete from student where name = "Zhang Yuanjia"

// 更新语句
<update statement>
update <table name> set <field name>=<value> [<where statement>]
update student set name = "ZYJ" where id = 5

// where查询
<where statement>
where <field name> (><=) <value> [(andor) <field name> (><=) <value>]
where age > 10 or age < 3

// 字段名命名规则
<field name> <table name>
[a-zA-Z][a-zA-Z0-9_]*

// 字段类型只有下面三种
<field type>
int32 int64 string

<value>
.*

parser 包的 Tokenizer 类,对语句进行逐字节解析,根据空白符或者上述词法规则,将语句切割成多个 token。对外提供了 peek()pop() 方法方便取出 Token 进行解析。

Parser 类则直接对外提供了 Parse(byte[] statement) 方法,核心就是一个调用 Tokenizer 类分割 Token,并根据词法规则包装成具体的 Statement 类并返回。解析过程很简单,仅仅是根据第一个 Token 来区分语句类型,并分别处理,不再赘述。

字段和表管理

由于 TBM 基于 VM,单个字段信息和表信息都是直接保存在 Entry 中。字段的二进制表示如下:

1
[FieldName][TypeName][IndexUid]

这里 FieldNameTypeName,以及后面的表名,存储的都是字节形式的字符串。这里规定一个字符串的存储方式,以明确其存储边界。

1
[StringLength][StringData]

TypeName 为字段的类型,限定为 int32、int64 和 string 类型如果这个字段有索引,那个 IndexUID 指向了索引二叉树的根,否则该字段为 0。

根据这个结构,通过一个 UID 从 VM 中读取并解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 读取字段信息
public static Field loadField(Table tb, long uid) {
byte[] raw = null;
try {
raw = ((TableManagerImpl)tb.tbm).vm.read(TransactionManagerImpl.SUPER_XID, uid);
} catch (Exception e) {
Panic.panic(e);
}
assert raw != null;
return new Field(uid, tb).parseSelf(raw);
}

// 解析字段信息
private Field parseSelf(byte[] raw) {
int position = 0;
// 第一个res,包含FieldName的[StringLength][StringData]
// 返回的res.str 是 FieldName,res.next 是 [StringLength][StringData]的长度
ParseStringRes res = Parser.parseString(raw);
fieldName = res.str;
position += res.next;

// 第二个res,包含TypeName的[StringLength][StringData]
// 返回的res.str 是 TypeName,res.next 是 [StringLength][StringData]的长度
res = Parser.parseString(Arrays.copyOfRange(raw, position, raw.length));
fieldType = res.str;
position += res.next;

// 若有index,则继续解析index
this.index = Parser.parseLong(Arrays.copyOfRange(raw, position, position+8));
if(index != 0) {
try {
bt = BPlusTree.load(index, ((TableManagerImpl)tb.tbm).dm);
} catch(Exception e) {
Panic.panic(e);
}
}
return this;
}

创建一个字段的方法类似,将相关的信息通过 VM 持久化即可:

1
2
3
4
5
6
7
8
9
10
11
12
private void persistSelf(long xid) throws Exception {
byte[] nameRaw = Parser.string2Byte(fieldName);
byte[] typeRaw = Parser.string2Byte(fieldType);
byte[] indexRaw = Parser.long2Byte(index);
this.uid = ((TableManagerImpl)tb.tbm).vm.insert(xid, Bytes.concat(nameRaw, typeRaw, indexRaw));
}

// [StringLength] + [StringData]
public static byte[] string2Byte(String str) {
byte[] l = int2Byte(str.length());
return Bytes.concat(l, str.getBytes());
}

一个数据库中存在多张表,TBM 使用链表的形式将其组织起来,每一张表都保存一个指向下一张表的 UID。表的二进制结构如下:

1
2
[TableName][NextTable]
[Field1Uid][Field2Uid]...[FieldNUid]

这里由于每个 Entry 中的数据,字节数是确定的,于是无需保存字段的个数。根据 UID 从 Entry 中读取表数据的过程和读取字段的过程类似。

对表和字段的操作,有一个很重要的步骤,就是计算 Where 条件的范围,目前 MYDB 的 Where 只支持两个条件的与和或。MYDB 只支持已索引字段作为 Where 的条件。。计算 Where 的范围,具体可以查看 TableparseWhere()calWhere() 方法,以及 Field 类的 calExp() 方法。

  • parseWhere():解析Where语句,返回一个WhereWhere.singleExp1表示第一个条件,Where.logicOp表示两个条件之间是and还是or,Where.singleExp2表示第二个条件
    • 其中条件singleExp所属类SingleExpression包含属性:字段field、条件比较符号compareOp、和字段比较的数值value
  • calWhere():处理逻辑连接词and和or
  • calExp():根据Where的条件,返回一个区间:
    • 如果compareOp<,表示字段 < value的一个区间。区间left = 0,区间right = 条件的value - 1;
    • 如果compareOp>,表示字段> value的一个区间。区间left = value,区间right = Long.MAX_VALUE
    • 如果如果compareOp=,表示字段= value的一个区间。区间left = right = value

由于 TBM 的表管理,使用的是链表串起的 Table 结构,所以就必须保存一个链表的头节点,即第一个表的 UID,这样在 MYDB 启动时,才能快速找到表信息。

MYDB 使用 Booter 类和 bt 文件,来管理 MYDB 的启动信息,虽然现在所需的启动信息,只有一个:头表的 UID。Booter 类对外提供了两个方法:load 和 update,并保证了其原子性。update 在修改 bt 文件内容时,没有直接对 bt 文件进行修改,而是首先将内容写入一个 bt_tmp 文件中,随后将这个文件重命名为 bt 文件。以期通过操作系统重命名文件的原子性,来保证操作的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void update(byte[] data) {
File tmp = new File(path + BOOTER_TMP_SUFFIX);
try {
tmp.createNewFile();
} catch (Exception e) {
Panic.panic(e);
}
if(!tmp.canRead() || !tmp.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
try(FileOutputStream out = new FileOutputStream(tmp)) {
out.write(data);
out.flush();
} catch(IOException e) {
Panic.panic(e);
}
try {
Files.move(tmp.toPath(), new File(path+BOOTER_SUFFIX).toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch(IOException e) {
Panic.panic(e);
}
file = new File(path+BOOTER_SUFFIX);
if(!file.canRead() || !file.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
}

TableManager

TBM 层对外提供服务的是 TableManager 接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface TableManager {
BeginRes begin(Begin begin);
byte[] commit(long xid) throws Exception;
byte[] abort(long xid);

byte[] show(long xid);
byte[] create(long xid, Create create) throws Exception;

byte[] insert(long xid, Insert insert) throws Exception;
byte[] read(long xid, Select select) throws Exception;
byte[] update(long xid, Update update) throws Exception;
byte[] delete(long xid, Delete delete) throws Exception;
}

由于 TableManager 已经是直接被最外层 Server 调用(MYDB 是 C/S 结构),这些方法直接返回执行的结果,例如错误信息或者结果信息的字节数组(可读)。

7. 服务端客户端的实现及其通信规则

MYDB 被设计为 C/S 结构,类似于 MySQL。支持启动一个服务器,并有多个客户端去连接,通过 socket 通信,执行 SQL 返回结果。

C/S通信

MYDB 使用了一种特殊的二进制格式,用于客户端和服务端通信。传输的最基本结构,是 Package

1
2
3
4
public class Package {
byte[] data;
Exception err;
}

每个 Package 在发送前,由 Encoder 编码为字节数组,在对方收到后同样会由 EncoderPackage 对象。编码和解码的规则如下:

1
[Flag][data]

若 flag 为 0,表示发送的是数据,那么 data 即为这份数据本身;如果 flag 为 1,表示发送的是错误,data 是 Exception.getMessage() 的错误提示信息。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Encoder {
public byte[] encode(Package pkg) {
if(pkg.getErr() != null) {
// 发送错误信息
Exception err = pkg.getErr();
String msg = "Intern server error!";
if(err.getMessage() != null) {
msg = err.getMessage();
}
return Bytes.concat(new byte[]{1}, msg.getBytes());
} else {
// 发送数据信息
return Bytes.concat(new byte[]{0}, pkg.getData());
}
}

public Package decode(byte[] data) throws Exception {
if(data.length < 1) {
throw Error.InvalidPkgDataException;
}
if(data[0] == 0) {
// 接受数据
return new Package(Arrays.copyOfRange(data, 1, data.length), null);
} else if(data[0] == 1) {
// 接受错误信息
return new Package(null, new RuntimeException(new String(Arrays.copyOfRange(data, 1, data.length))));
} else {
throw Error.InvalidPkgDataException;
}
}
}

编码之后的信息会通过 Transporter 类,写入输出流发送出去。为了避免特殊字符造成问题,这里会将数据转成十六进制字符串(Hex String),并为信息末尾加上换行符。这样在发送和接收数据时,就可以很简单地使用 BufferedReaderWriter 来直接按行读写了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Transporter {
private Socket socket;
private BufferedReader reader;
private BufferedWriter writer;

public Transporter(Socket socket) throws IOException {
this.socket = socket;
this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
}

public void send(byte[] data) throws Exception {
String raw = hexEncode(data);
writer.write(raw);
writer.flush();
}

public byte[] receive() throws Exception {
// 按行读取
String line = reader.readLine();
if(line == null) {
close();
}
return hexDecode(line);
}

public void close() throws IOException {
writer.close();
reader.close();
socket.close();
}

// 编码后的数据转成:16进制字符串+换行符
// 换行符方便接收方收到数据后直接按行读取,一行就是一条数据
private String hexEncode(byte[] buf) {
return Hex.encodeHexString(buf, true)+"n";
}

private byte[] hexDecode(String buf) throws DecoderException {
return Hex.decodeHex(buf);
}
}

Packager 则是 EncoderTransporter 的结合体,直接对外提供 sendreceive 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Packager {
private Transporter transpoter;
private Encoder encoder;

public Packager(Transporter transpoter, Encoder encoder) {
this.transpoter = transpoter;
this.encoder = encoder;
}

public void send(Package pkg) throws Exception {
byte[] data = encoder.encode(pkg);
transpoter.send(data);
}

public Package receive() throws Exception {
byte[] data = transpoter.receive();
return encoder.decode(data);
}

public void close() throws Exception {
transpoter.close();
}
}

Server和Client的实现

ServerClient,偷懒直接使用了 Java 的 socket。

Server 启动一个 ServerSocket 监听端口,当有请求到来时直接把请求丢给一个新线程处理

HandleSocket 类实现了 Runnable 接口,在建立连接后初始化 Packager,随后就循环接收来自客户端的数据并处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class HandleSocket implements Runnable {
private Socket socket;
private TableManager tbm;

public HandleSocket(Socket socket, TableManager tbm) {
this.socket = socket;
this.tbm = tbm;
}

@Override
public void run() {
InetSocketAddress address = (InetSocketAddress)socket.getRemoteSocketAddress();
System.out.println("Establish connection: " + address.getAddress().getHostAddress()+":"+address.getPort());
Packager packager = null;
try {
Transporter t = new Transporter(socket);
Encoder e = new Encoder();
packager = new Packager(t, e);
} catch(IOException e) {
e.printStackTrace();
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
return;
}
Executor exe = new Executor(tbm);
while(true) {
Package pkg = null;
try {
// 收数据
pkg = packager.receive();
} catch(Exception e) {
break;
}
byte[] sql = pkg.getData();
byte[] result = null;
Exception e = null;
try {
result = exe.execute(sql);
} catch (Exception e1) {
e = e1;
e.printStackTrace();
}
pkg = new Package(result, e);
try {
// 发回去
packager.send(pkg);
} catch (Exception e1) {
e1.printStackTrace();
break;
}
}
exe.close();
try {
packager.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

处理的核心是 Executor 类,Executor 调用 Parser 获取到对应语句的结构化信息对象,并根据对象的类型,调用 TBM 的不同方法进行处理

top.guoziyang.mydb.backend.Launcher 类,则是服务器的启动入口。这个类解析了命令行参数。很重要的参数就是 -open 或者 -create。Launcher 根据两个参数,来决定是创建数据库文件,还是启动一个已有的数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void createDB(String path) {
TransactionManager tm = TransactionManager.create(path);
DataManager dm = DataManager.create(path, DEFALUT_MEM, tm);
VersionManager vm = new VersionManagerImpl(tm, dm);
TableManager.create(path, vm, dm);
tm.close();
dm.close();
}

private static void openDB(String path, long mem) {
TransactionManager tm = TransactionManager.open(path);
DataManager dm = DataManager.open(path, mem, tm);
VersionManager vm = new VersionManagerImpl(tm, dm);
TableManager tbm = TableManager.open(path, vm, dm);
new Server(port, tbm).start();
}

客户端有一个简单的 Shell,实际上只是读入用户的输入,并调用 Client.execute()。

1
2
3
4
5
6
7
8
public byte[] execute(byte[] stat) throws Exception {
Package pkg = new Package(stat, null);
Package resPkg = rt.roundTrip(pkg);
if(resPkg.getErr() != null) {
throw resPkg.getErr();
}
return resPkg.getData();
}

RoundTripper 类实际上实现了单次收发动作:

1
2
3
4
public Package roundTrip(Package pkg) throws Exception {
packager.send(pkg);
return packager.receive();
}

最后附上客户端的启动入口,很简单,把 Shell run 起来即可:

1
2
3
4
5
6
7
8
9
10
11
12
public class Launcher {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("127.0.0.1", 9999);
Encoder e = new Encoder();
Transporter t = new Transporter(socket);
Packager packager = new Packager(t, e);

Client client = new Client(packager);
Shell shell = new Shell(client);
shell.run();
}
}

8. 项目总结

为充分理解数据库知识,本项目参考MySQL数据库的设计原理,基于Java实现了简易的数据库MYDB,实现的功能如下:

  • 事务状态:active、committed、aborted
  • 数据库日志管理,保证数据的可靠性和数据恢复
  • 基本的缓存框架,用于缓存数据、日志等信息
  • 2PL和MVCC
  • 两种事务隔离级别(读提交和可重复读)和死锁处理
  • 基于B+树的聚簇索引,支持基于索引查找数据
  • 简单的表管理器和SQL解析器,支持SQL语句操作表
  • 基于socket的Server和Client

MYDB record
http://example.com/2025/02/21/MYDB-record/
作者
Kon4tsu
发布于
2025年2月21日
许可协议