rookie db

Rookie DB 实现
cs186 project实现
评价:这个project强度有点高,略微有些折磨。
B+ Tree
Task 1: LeafNode::fromBytes
本题要实现将叶子节点的信息从字节流中读取出来,首先阅读LeafNode::toBytes
了解叶子节点的字节流构成
- 第1个字节用于表示结点类型,如果为1,代表该结点是叶子节点。
- 之后8个字节用于表示右兄弟结点的page id,如果为-1则表示没有兄弟结点。
- 然后4个字节用于表示该结点中的记录数
- 之后的所有字节用于表示每条记录的信息。
只要按照以上顺序将信息解释出来即可。public static LeafNode fromBytes(BPlusTreeMetadata metadata, BufferManager bufferManager, LockContext treeContext, long pageNum) { // TODO(proj2): implement // Note: LeafNode has two constructors. To implement fromBytes be sure to // use the constructor that reuses an existing page instead of fetching a // brand new one. Page page = bufferManager.fetchPage(treeContext, pageNum); Buffer buf = page.getBuffer(); byte nodeType = buf.get(); assert (nodeType == (byte) 1); long rightSibling = buf.getLong(); int size = buf.getInt(); List<DataBox> keys = new ArrayList<>(); List<RecordId> rids = new ArrayList<>(); for (int i = 0; i < size; ++i) { keys.add(DataBox.fromBytes(buf, metadata.getKeySchema())); rids.add(RecordId.fromBytes(buf)); } return new LeafNode(metadata, bufferManager, page, keys, rids, Optional.ofNullable(rightSibling == -1 ? null : rightSibling), treeContext); }
Task 2: get, getLeftmostLeaf, put, remove
接下来就是实现LeafNode
, InnerNode
, and BPlusTree
的基本方法
LeafNode
首先从最简单的叶子结点入手,对与叶子结点来说,无论是get还是getLeftmostLeaf都只要返回自身
@Override
public LeafNode get(DataBox key) {
return this;
}
// See BPlusNode.getLeftmostLeaf.
@Override
public LeafNode getLeftmostLeaf() {
return this;
}
put()
的执行逻辑大体上为:首先将传入的键值对插入有序数组中的对应位置。如果插入完成后数组的大小 < 2d (d表示B+树结点大小的一半),则直接返回。否则就要进行扩容操作。
扩容:将数组对半分成两份,第一份的大小为d,第二份为d+1,第一份的数据仍然保留在当前结点。将第二份的数据存入新建的结点中,并将该新结点作为当前结点的右兄弟。最后返回右兄弟结点中的第一条记录,用于上一级结点做范围划分。
注意这里调用的构造方法应当创建一个全新的叶子结点,而非上一问中创建指定数据页的叶子结点。
@Override
public Optional<Pair<DataBox, Long>> put(DataBox key, RecordId rid) {
// TODO(proj2): implement
if (getKey(key).isPresent()) {
throw new BPlusTreeException("error");
}
int d = metadata.getOrder();
int index = InnerNode.numLessThanEqual(key, keys);
keys.add(index, key);
rids.add(index, rid);
if (keys.size() < 2*d+1) {
sync();
return Optional.empty();
}
List<DataBox> leftKeys = keys.subList(0, d);
List<RecordId> leftRids = rids.subList(0, d);
List<DataBox> rightKeys = keys.subList(d, 2*d+1);
List<RecordId> rightRids = rids.subList(d, 2*d+1);
LeafNode right = new LeafNode(metadata, bufferManager, rightKeys, rightRids, rightSibling, treeContext);
this.keys = leftKeys;
this.rids = leftRids;
rightSibling = Optional.of(right.getPage().getPageNum());
sync();
return Optional.of(new Pair<DataBox, Long>(rightKeys.get(0), right.getPage().getPageNum()));
}
remove()
执行逻辑也非常简单,只要找到记录在数组中的位置然后删除即可。
@Override
public void remove(DataBox key) {
Optional<RecordId> r = getKey(key);
if (r.isEmpty()) {
return;
}
// TODO(proj2): implement
int index = InnerNode.numLessThan(key, keys);
keys.remove(index);
rids.remove(index);
sync();
return;
}
InnerNode
接下来处理内部结点的基本方法。get()
方法需要先根据传入的key,找出该key位于哪一棵子树,然后对子树递归调用get方法直到抵达叶子节点并返回。
@Override
public LeafNode get(DataBox key) {
// TODO(proj2): implement
int index = numLessThanEqual(key, keys);
BPlusNode child = getChild(index);
return child.get(key);
}
getLeftmostLeaf()
则是获取首个子树(即最左边的子树)然后递归调用getLeftmostLeaf()
方法
@Override
public LeafNode getLeftmostLeaf() {
assert(children.size() > 0);
return getChild(0).getLeftmostLeaf();
}
put()
方法首先要获取传入的参数要插入在哪一棵子树,然后递归调用put
。如果返回值为空,则代表底层没有做扩容操作,直接返回。否则就要将返回的参数插入结点维护的数组,并扩充子结点。
这里的扩容操作与叶子结点大体上没有区别,唯一要注意的是,结点分裂时将数组拆成[0, d-1],[d+1, 2d]两部分,第d条记录作为本次分裂的返回值。re:叶子节点中存储的是真实的记录信息,不能丢弃,而上层结点的记录都是用于范围锁定,交给上层可以节省空间。
@Override
public Optional<Pair<DataBox, Long>> put(DataBox key, RecordId rid) {
// TODO(proj2): implement
int index = numLessThanEqual(key, keys);
BPlusNode child = getChild(index);
Optional<Pair<DataBox, Long>> op = child.put(key, rid);
if (op.isEmpty()) {
return Optional.empty();
}
DataBox newKey = op.get().getFirst();
Long newPage = op.get().getSecond();
int d = metadata.getOrder();
keys.add(index, newKey);
// 返回的是右边的孩子,因此要放置在索引的右侧
children.add(index + 1, newPage);
if (keys.size() < 2*d + 1) {
sync();
return Optional.empty();
}
return split(d);
}
private Optional<Pair<DataBox, Long>> split(int d) {
List<DataBox> leftKeys = keys.subList(0, d);
List<DataBox> rightKeys = keys.subList(d + 1, 2 * d + 1);
List<Long> leftChildren = children.subList(0, d + 1);
List<Long> rightChildren = children.subList(d + 1, 2 * d + 2);
DataBox retKey = keys.get(d);
InnerNode right = new InnerNode(metadata, bufferManager, rightKeys, rightChildren, treeContext);
this.keys = leftKeys;
this.children = leftChildren;
sync();
return Optional.of(new Pair<DataBox, Long>(retKey, right.getPage().getPageNum()));
}
remove()
方法的思路和上述的方法都差不多,就不多赘述。
@Override
public void remove(DataBox key) {
// TODO(proj2): implement
LeafNode leaf = get(key);
leaf.remove(key);
sync();
return;
}
BPlusTree
最后是B+树的数据结构,它的基本方法的思路也没有太大差别,依旧是递归的思想。
public Optional<RecordId> get(DataBox key) {
typecheck(key);
// TODO(proj4_integration): Update the following line
LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);
// TODO(proj2): implement
LeafNode leafNode = root.get(key);
return leafNode.getKey(key);
}
put
方法有一些微小的变化,这里先明确一个点xx.put()如果有返回值,代表的是xx出现溢出而非其子结点出现溢出,InnerNode
是对子结点调用put方法,而在BPlusTree
中是直接对root调用put方法,因此这里一旦出现返回值,代表的是根节点的溢出。
对于根节点的溢出,将返回值作为新的索引列表中的元素,将现有的root数据页作为新索引键的左结点,新的数据页作为右结点。
public void put(DataBox key, RecordId rid) {
typecheck(key);
// TODO(proj4_integration): Update the following line
LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);
// TODO(proj2): implement
// Note: You should NOT update the root variable directly.
// Use the provided updateRoot() helper method to change
// the tree's root if the old root splits.
Optional<Pair<DataBox, Long>> op = root.put(key, rid);
if (op.isEmpty()) {
return;
}
List<DataBox> keys = new ArrayList<>();
List<Long> children = new ArrayList<>();
keys.add(op.get().getFirst());
children.add(root.getPage().getPageNum());
children.add(op.get().getSecond());
updateRoot(new InnerNode(metadata, bufferManager, keys, children, lockContext));
return;
}
Task 3: Scans
这里要实现BPlusTree
中的两个方法 scanAll
,scanGreaterEqual
在此之前需要先实现迭代器的方法,先观察一下迭代器的结构
LeafNode curNode; // 当前所处的叶子节点
Iterator<RecordId> curIterator; // 该叶子结点中记录的迭代器
byte type; // 迭代器类型
DataBox cmpKey; // 用于比较的键
对于hasNext()
,如果curIterator.hasNext()
返回true,则直接返回。否则就代表当前叶子结点的记录已经被遍历完了,检查是否还有右兄弟结点,没有就返回false。有就根据迭代器的类型获取指定的迭代器,然后返回迭代器的情况。
@Override
public boolean hasNext() {
if (!curIterator.hasNext()) {
if (curNode.getRightSibling().isPresent()) {
curNode = curNode.getRightSibling().get();
if (type == 0) {
curIterator = curNode.scanAll();
} else {
curIterator = curNode.scanGreaterEqual(cmpKey);
}
} else {
return false;
}
}
return curIterator.hasNext();
}
@Override
public RecordId next() {
return curIterator.next();
}
之后的操作就很简单了,根据方法要求,new出指定的迭代器即可。
Task 4: Bulk Load
bulk load的基本思路和普通的插入大体相同,区别在于bulk load在叶子节点的填充因子超过3/4就会分裂。同时bulk load也省略了查找插入位置的步骤,只需要不断往最后的叶子结点插入。
LeafNode
对于叶子结点来说,循环插入数据,如果超出阈值,就将最后插入的数据删除并返回。
@Override
public Optional<Pair<DataBox, Long>> bulkLoad(Iterator<Pair<DataBox, RecordId>> data,
float fillFactor) {
// TODO(proj2): implement
int d = metadata.getOrder();
float limit = 2 * d * fillFactor;
while (data.hasNext()) {
Pair<DataBox, RecordId> pair = data.next();
DataBox key = pair.getFirst();
RecordId rid = pair.getSecond();
keys.add(key);
rids.add(rid);
if (keys.size() - limit <= 0.001) {
continue;
}
DataBox ret = keys.remove(keys.size() - 1);
RecordId recordId = rids.remove(rids.size() - 1);
LeafNode right = new LeafNode(metadata, bufferManager, new ArrayList<>(List.of(ret)), new ArrayList<>(List.of(recordId)), rightSibling, treeContext);
rightSibling = Optional.of(right.getPage().getPageNum());
sync();
return Optional.of(new Pair<>(ret, right.getPage().getPageNum()));
}
sync();
return Optional.empty();
}
InnerNode
获取最右侧的孩子结点,并且对它进行bulk load,如果返回值为空,则代表所有记录都插入完毕。否则就代表目前的孩子结点已经满了,将新返回的结点记录到子结点数组中,更新当前子结点,并继续进行批量导入。
当该结点存储的索引键达到上限时就进行分裂操作,具体操作与上文提到的相同。
@Override
public Optional<Pair<DataBox, Long>> bulkLoad(Iterator<Pair<DataBox, RecordId>> data,
float fillFactor) {
// TODO(proj2): implement
BPlusNode child = getChild(children.size() - 1);
Optional<Pair<DataBox, Long>> op = child.bulkLoad(data, fillFactor);
int d = metadata.getOrder();
while (op.isPresent()) {
DataBox newKey = op.get().getFirst();
Long pageNum = op.get().getSecond();
keys.add(newKey);
children.add(pageNum);
if (keys.size() >= 2 * d + 1) {
return split(d);
}
child = getChild(children.size() - 1);
op = child.bulkLoad(data, fillFactor);
}
sync();
return Optional.empty();
}
BPlusTree
对于根节点来说,批量导入和单独插入差别并不大,只是将单一操作变成了根据插入结果判断是否继续的循环操作。分裂操作也跟上文提到的相同。
public void bulkLoad(Iterator<Pair<DataBox, RecordId>> data, float fillFactor) {
// TODO(proj4_integration): Update the following line
LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);
// TODO(proj2): implement
// Note: You should NOT update the root variable directly.
// Use the provided updateRoot() helper method to change
// the tree's root if the old root splits.
Optional<Pair<DataBox, Long>> pair = root.bulkLoad(data, fillFactor);
while (pair.isPresent()) {
List<DataBox> keys = new ArrayList<>();
List<Long> children = new ArrayList<>();
keys.add(pair.get().getFirst());
children.add(root.getPage().getPageNum());
children.add(pair.get().getSecond());
updateRoot(new InnerNode(metadata, bufferManager, keys, children, lockContext));
pair = root.bulkLoad(data, fillFactor);
}
return;
}
Joins and Query Optimization
Join Algorithm
Task 1:BNLJ
本阶段要实现的方法 fetchNextRecord
, fetchNextLeftBlock
, fetchNextRightBlock
获取左表下一部分数据块
private void fetchNextLeftBlock() {
// 左表遍历完毕,退出
if (!leftSourceIterator.hasNext()) {
return;
}
int limit = numBuffers - 2;
this.leftBlockIterator = getBlockIterator(leftSourceIterator, getLeftSource().getSchema(), limit);
this.leftBlockIterator.markNext();
leftRecord = leftBlockIterator.next();
}
左表可用缓冲大小为总缓冲数 - 2,每次获取新的数据块迭代器,都要标记其中的第一个数据,用于之后的回溯。
获取右表的下一部分数据
private void fetchNextRightPage() {
if (!rightSourceIterator.hasNext()) {
return;
}
this.rightPageIterator = getBlockIterator(rightSourceIterator, getRightSource().getSchema(), 1);
this.rightPageIterator.markNext();
}
使用的缓冲大小为1,即每次只读取被驱动表的一页数据到内存进行匹配,同样地,获取到的迭代器也要进行标记。
获取下一跳满足条件的记录
private Record fetchNextRecord() {
while (true) {
if (rightPageIterator.hasNext()) {
Record rightRecord = rightPageIterator.next();
if (compare(leftRecord, rightRecord) == 0) {
return leftRecord.concat(rightRecord);
}
} else if (leftBlockIterator.hasNext()) {
leftRecord = leftBlockIterator.next();
rightPageIterator.reset();
rightPageIterator.markNext();
} else if (rightSourceIterator.hasNext()) {
fetchNextRightPage();
leftBlockIterator.reset();
leftBlockIterator.markNext();
leftRecord = leftBlockIterator.next();
} else if (leftSourceIterator.hasNext()) {
fetchNextLeftBlock();
rightSourceIterator.reset();
rightSourceIterator.markNext();
fetchNextRightPage();
} else {
return null;
}
}
}
总共有四种情况
约定: L为驱动表, R为被驱动表
case 1:right page迭代器还有值可用,表明R在内存中的内容还没有完全被探索完毕。此时继续获取迭代器中的值,并进行匹配。
case 2:R的页迭代器被耗尽,但L的块迭代器还有值,代表L当前记录已经探索完了R在内存中的记录,此时需要获取L的下一条记录,并重置R的页迭代器。
case 3:L的块迭代器耗尽,但R的源迭代器还有值,代表L放在内存中的数据已经探索完了R在内存中的记录,此时要获取新的R的数据页,并将L的块迭代器重置,开始新一轮匹配。
case 4:R的源迭代器耗尽,L的源迭代器还有值,代表L在内存中的记录已经探索完了整张R,此时获取L新的数据块,并重置R的源迭代器,开始新一轮匹配。
当L的迭代器耗尽时,匹配完毕。
错误示范:
private Record fetchNextRecord() {
while (leftRecord != null || leftSourceIterator.hasNext()) {
// 左表数据块迭代器与右表所有的页迭代器的循环
while (rightSourceIterator.hasNext() || rightPageIterator.hasNext()) {
// 针对某一块左表记录的循环
while (leftRecord != null) {
// 针对某一左表记录的循环
// 扫描右表当前页的记录,寻找匹配
while (rightPageIterator.hasNext()) {
Record rightRecord = rightPageIterator.next();
if (compare(this.leftRecord, rightRecord) == 0) {
return leftRecord.concat(rightRecord);
}
}
// 换到下一个左表记录
leftRecord = leftBlockIterator.hasNext() ? leftBlockIterator.next() : null;
// 重置右表迭代器
rightPageIterator.reset();
rightPageIterator.markNext();
}
// 重置左表迭代器
leftBlockIterator.reset();
leftBlockIterator.markNext();
leftRecord = leftBlockIterator.hasNext() ? leftBlockIterator.next() : null;
// 获取新的右表迭代器
fetchNextRightPage();
}
// 获取新的左表块迭代器
fetchNextLeftBlock();
// 重置整个右表迭代器
rightSourceIterator.reset();
rightSourceIterator.markNext();
// 重新获取右表迭代器
fetchNextRightPage();
}
return null;
}
上方的写法看上去差不多,但是如果被驱动表被读取完毕,而驱动表块迭代器没有迭代完,就会直接去fetch下一个驱动表块迭代器
其判断逻辑为:先看驱动表有没有剩余数据,再看被驱动表有无剩余数据,然后检查驱动表块有无剩余数据,最后匹配,与题目要求恰好相反。因为被驱动表必定会先被读完,因此每块只有第一个数据能完整探索被驱动表
Task 2:GHJ
本阶段要实现partition
, buildAndProbe
, run
以及思考SHJ与GHJ失效的可能情况。
分区实现
比较简单,根据记录参与join的字段以及当前分区的轮次生成对应的哈希值,在哈希对应的分区下添加记录。
private void partition(Partition[] partitions, Iterable<Record> records, boolean left, int pass) {
for (Record record : records) {
int columnIndex = left ? getLeftColumnIndex() : getRightColumnIndex();
DataBox columnValue = record.getValue(columnIndex);
int hash = HashFunc.hashDataBox(columnValue, pass);
int index = hash % partitions.length;
if (index < 0) {
index += partitions.length;
}
partitions[index].add(record);
}
return;
}
左右分区的连接实现
首先选择驱动表,只有当左右分区中有一个分区的数据页数小于缓冲区数-2(剩下两个缓冲区,一个用于输入被驱动表,一个输出结果),该方法才能进行。
然后将驱动表的数据哈希过后写入哈希表中。然后从被驱动表中读取数据,探索整张哈希表进行匹配。
private void buildAndProbe(Partition leftPartition, Partition rightPartition) {
// true if the probe records come from the left partition, false otherwise
boolean probeFirst;
// We'll build our in memory hash table with these records
Iterable<Record> buildRecords;
// We'll probe the table with these records
Iterable<Record> probeRecords;
// The index of the join column for the build records
int buildColumnIndex;
// The index of the join column for the probe records
int probeColumnIndex;
if (leftPartition.getNumPages() <= this.numBuffers - 2) {
buildRecords = leftPartition;
buildColumnIndex = getLeftColumnIndex();
probeRecords = rightPartition;
probeColumnIndex = getRightColumnIndex();
probeFirst = false;
} else if (rightPartition.getNumPages() <= this.numBuffers - 2) {
buildRecords = rightPartition;
buildColumnIndex = getRightColumnIndex();
probeRecords = leftPartition;
probeColumnIndex = getLeftColumnIndex();
probeFirst = true;
} else {
throw new IllegalArgumentException(
"Neither the left nor the right records in this partition " +
"fit in B-2 pages of memory."
);
}
Map<DataBox, List<Record>> hashTable = new HashMap<>();
for (Record buildRecord : buildRecords) {
DataBox buildJoinValue = buildRecord.getValue(buildColumnIndex);
if (!hashTable.containsKey(buildJoinValue)) {
hashTable.put(buildJoinValue, new ArrayList<>());
}
hashTable.get(buildJoinValue).add(buildRecord);
}
for (Record probeRecord : probeRecords) {
DataBox probeJoinValue = probeRecord.getValue(probeColumnIndex);
if (!hashTable.containsKey(probeJoinValue)) {
continue;
}
List<Record> matchRecords = hashTable.get(probeJoinValue);
for (Record matchRecord : matchRecords) {
Record record;
if (!probeFirst) {
record = matchRecord.concat(probeRecord);
} else {
record = probeRecord.concat(matchRecord);
}
this.joinedRecords.add(record);
}
}
}
运行实现
首先获取左右表的分区,在遍历分区时,两张表共用下标索引(分区使用的是同一个哈希函数,如果左右表分区数不一致,那可以直接判断不一致的分区必然不存在匹配,因此不必考虑)。
如果两个分区有任意一个满足探索条件,即可调用对应方法,否则递归调用run,将这两个分区进行进一步的划分然后连接。
private void run(Iterable<Record> leftRecords, Iterable<Record> rightRecords, int pass) {
assert pass >= 1;
if (pass > 5)
throw new IllegalStateException("Reached the max number of passes");
// Create empty partitions
Partition[] leftPartitions = createPartitions(true);
Partition[] rightPartitions = createPartitions(false);
// Partition records into left and right
this.partition(leftPartitions, leftRecords, true, pass);
this.partition(rightPartitions, rightRecords, false, pass);
for (int i = 0; i < leftPartitions.length; i++) {
Partition leftPartition = leftPartitions[i];
Partition rightPartition = rightPartitions[i];
int limit = this.numBuffers - 2;
if (leftPartition.getNumPages() <= limit || rightPartition.getNumPages() <= limit) {
buildAndProbe(leftPartition, rightPartition);
} else {
run(leftPartition, rightPartition, pass + 1);
}
}
}
在有SHJ参照的情况下GHJ的实现可以说相当简单。
SHJ失效,GHJ保持有效的情况
SHJ在驱动表的分区大小大于B-2时会失效,因此只要驱动表中有足够多的冗余数据,便可使其失效。
但由于GHJ要保持有效,因此pass不能过深,保持被驱动表的大小满足条件即可。
由题知,缓冲区大小为6,每个数据页存储8个数据,因此只需要往一个分区中填入(6-2)*8+1=33个数据即可
public static Pair<List<Record>, List<Record>> getBreakSHJInputs() {
ArrayList<Record> leftRecords = new ArrayList<>();
ArrayList<Record> rightRecords = new ArrayList<>();
for(int i = 0; i < 33; i++) {
leftRecords.add(createRecord(1));
}
// GHJ只要有一张表的分区数据页小于B-2就不会加深pass
// 因此这里在右表只放1个数据,确保不需要多次递归
rightRecords.add(createRecord(1));
return new Pair<>(leftRecords, rightRecords);
}
GHJ失效
GHJ的生效条件是两个分区中有一个满足条件,且pass不能过大,因此让其失效只要破坏这两个条件即可。在本题中,只需让被驱动表也不满足探索条件,且有大量冗余数据即可完成。
因为冗余数据会带来不断的无效分区,导致pass数急速上升。
public static Pair<List<Record>, List<Record>> getBreakGHJInputs() {
ArrayList<Record> leftRecords = new ArrayList<>();
ArrayList<Record> rightRecords = new ArrayList<>();
// 首先同样要让表数据页大小超过缓冲区数-2
// 但在本题中,但要让GHJ提高pass,必须让两张表都满足上述条件
// 而对于GHJ来说,如果表中均为冗余数据,则会不断造成无效分区,pass急速增加
// 因此,在两表中添加大量冗余数据即可达成目标。
for(int i = 0; i < 33; i++) {
leftRecords.add(createRecord(1));
rightRecords.add(createRecord(1));
}
return new Pair<>(leftRecords, rightRecords);
}
Task 3:External Sort
本题中要实现sortRun
, mergeSortedRuns
, mergePass
, and sort
sortRun
pass0的操作,将排序段内的数据排序,实现很简单。
public Run sortRun(Iterator<Record> records) {
List<Record> recordList = new ArrayList<>();
while (records.hasNext()) {
recordList.add(records.next());
}
recordList.sort(comparator);
return makeRun(recordList);
}
mergeSortedRuns
对一系列有序的排序段做归并操作
此处使用优先队列来提取要进入输出排序段的记录,同时在优先队列中,每条记录以<Record, Integer>的形式存储,整型数追踪该记录来源的排序段,每当一个记录被送到输出段,就从该记录来源的排序段再提取一个记录放入队列。
public Run mergeSortedRuns(List<Run> runs) {
assert (runs.size() <= this.numBuffers - 1);
// TODO(proj3_part/1): implement
PriorityQueue<Pair<Record, Integer>> pq = new PriorityQueue<>(new RecordPairComparat
List<Iterator<Record>> iterators = new ArrayList<>();
// 先获取每一个排序段的迭代器
for (Run run : runs) {
iterators.add(run.iterator());
}
int i = 0;
// 获取每个排序段中的最小元素,并将其填入优先队列
for (Iterator<Record> iterator : iterators) {
if (iterator.hasNext()) {
pq.add(new Pair<>(iterator.next(), i));
}
i++;
}
List<Record> output = new ArrayList<>();
while (!pq.isEmpty()) {
Pair<Record, Integer> pair = pq.poll();
Record record = pair.getFirst();
output.add(record);
Iterator<Record> it = iterators.get(pair.getSecond());
if (it.hasNext()) {
pq.add(new Pair<>(it.next(), pair.getSecond()));
}
}
return makeRun(output);
}
mergePass
一轮pass要做的归并操作,将排序段B-1个一组进行归并,生成一个新的有序排序段
主要工作为计算归并次数并给排序段分组,实现也很简单。
public List<Run> mergePass(List<Run> runs) {
List<Run> output = new ArrayList<>();
// 每次归并都有n-1个缓冲区用于输入,1个缓冲区用于输出
int runsNum = numBuffers - 1;
// 计算归并次数
int N = (int) Math.ceil(runs.size() / runsNum);
for (int i = 0; i < N; i++) {
// 划分出与缓冲区大小相同的run
List<Run> curRuns;
if ((i + 1) * runsNum > runs.size()) {
curRuns = runs.subList(i * runsNum, runs.size());
} else {
curRuns = runs.subList(i * runsNum, (i + 1) * runsNum);
}
// 调用归并有序排序段方法
Run out = mergeSortedRuns(curRuns);
output.add(out);
}
return output;
}
sort
外部排序的驱动方法,只需要先按缓冲区大小划分出排序段,然后为每个排序段排序(调用sortRun),再不断归并,直到只剩下一个排序段。
public Run sort() {
// Iterator over the records of the relation we want to sort
Iterator<Record> sourceIterator = getSource().iterator();
List<Iterator<Record>> initialStates = new ArrayList<>();
while (sourceIterator.hasNext()) {
// 将程序划分为若干个排序段,每一个排序段的大小与缓冲区一致
BacktrackingIterator<Record> it = getBlockIterator(sourceI
initialStates.add(it);
}
List<Run> runs = new ArrayList<>();
// 对每一个排序段进行排序
for (Iterator<Record> state : initialStates) {
Run run = sortRun(state);
runs.add(run);
}
// 不断归并,直到列表中只剩下唯一一个排序段
while (runs.size() > 1) {
runs = mergePass(runs);
}
assert runs.size() == 1;
return runs.get(0);
}
Task 4:Sort Merge Join
本题要实现该Join方法的迭代器的获取下一条记录的部分
用marked来标记当前是处于匹配阶段还是寻找阶段。
若处于匹配阶段,则不断移动左右迭代器,保证每一次移动都能确保最后左表记录<=右表。然后标记右迭代器当前元素,开始进入匹配阶段。
如果左右记录相等,则匹配成功,继续移动右表记录,若右表无记录可用,则移动左表,并重置右表,重新计入寻找阶段。
如果左右记录不等,代表匹配阶段结束,重置右表并移动左表,重新进入寻找阶段。
private Record fetchNextRecord() {
if (leftRecord == null) {
return null;
}
while (leftRecord != null) {
if (!marked) {
while (compare(leftRecord, rightRecord) < 0) {
if (leftIterator.hasNext()) {
leftRecord = leftIterator.next();
} else {
// 若左表已经用完,则代表join结束,直接终止
leftRecord = null;
return null;
}
}
while (compare(leftRecord, rightRecord) > 0) {
if (rightIterator.hasNext()) {
rightRecord = rightIterator.next();
} else {
// 若右表记录恒小于左表当前记录,则代表左表自此之后的记录无法匹配,也可终止。
leftRecord = null;
return null;
}
}
marked = true;
rightIterator.markPrev();
}
if (compare(leftRecord, rightRecord) == 0) {
Record ret = leftRecord.concat(rightRecord);
if (rightIterator.hasNext()) {
rightRecord = rightIterator.next();
} else {
// 若右表记录已经用完,则代表左表当前记录已经完成匹配,前进左表,并重置右表
if (leftIterator.hasNext()) {
marked = false;
rightIterator.reset();
rightRecord = rightIterator.next();
leftRecord = leftIterator.next();
} else {
// 左表耗尽,停止
leftRecord = null;
}
}
return ret;
} else {
// 左右表记录不匹配,重置右表后,重新进入查询阶段
marked = false;
rightIterator.reset();
rightRecord = rightIterator.next();
if (leftIterator.hasNext()) {
leftRecord = leftIterator.next();
} else {
leftRecord = null;
return null;
}
}
}
return null;
}
Query Optimization
Task 5: Single Table Access Selection (Pass 1)
读取算法选择,在顺序遍历,以及索引遍历两种方法中,选取IO开销最小的来读取指定的表。
public QueryOperator minCostSingleAccess(String table) {
QueryOperator minOp = new SequentialScanOperator(this.transaction, table);
// 1. 获取sequential scan的io开销
int cost = minOp.estimateIOCost();
// 2. 获取这张表上的索引
List<Integer> indexColumns = getEligibleIndexColumns(table);
// 在使用索引查询时,标记此刻使用的索引
int skipIndex = -1;
// 遍历所有的索引
for (Integer indexColumn : indexColumns) {
SelectPredicate pre = selectPredicates.get(indexColumn);
// 新建一个索引查找并估算它的io开销,与现有最小的io开销对比
IndexScanOperator indexScanOperator = new IndexScanOperator(transaction, table, pre.column, pre.operator, pre.value);
int ioCost = indexScanOperator.estimateIOCost();
if (ioCost < cost) {
minOp = indexScanOperator;
cost = ioCost;
skipIndex = indexColumn;
}
}
// push down所有的除索引外的选择,因为可能已经有了用户索引扫描的谓词,再下放没有意义
minOp = addEligibleSelections(minOp, skipIndex);
return minOp;
}
Task 6: Join Selection (Pass i > 1)
此阶段要进行join操作,由于使用的是左深树,因此本题中使用prevMap,以及pass1Map存储表与对应操作的映射。
prevMap<Set<String>, QueryOperator>:存储上一轮pass的join结果,字符串集合用于存储已经被拼接在一起的表。QueryOperator存储最后一次操作的操作符。
pass1Map<Set<String>, QueryOperator>:存储第一轮读取表的结果,集合中只有一个表名,操作符代表读取这张表的操作符。
解题思路:
针对prevMap中的每个表集合,如果包含某个join操作需要用到的表,且该join操作的另一张表尚未被拼接进来,便寻找本次join开销最小的算法,并加入结果集合中。
注意如果是左表还没有被join的情况,则在调用minCostJoinType时要交换左右表参数,避免违背左深树原则。(debug 4小时的血泪教训)
public Map<Set<String>, QueryOperator> minCostJoins(
Map<Set<String>, QueryOperator> prevMap,
Map<Set<String>, QueryOperator> pass1Map) {
Map<Set<String>, QueryOperator> result = new HashMap<>();
// pass1Map存储的是一张单一的表和它的遍历方式
// 遍历已经做过join操作的表,即左深树的左边结点
for (Set<String> tables : prevMap.keySet()) {
// 遍历join谓词
for (JoinPredicate joinPredicate : joinPredicates) {
String leftColumn = joinPredicate.leftColumn;
String leftTable = joinPredicate.leftTable;
String rightColumn = joinPredicate.rightColumn;
String rightTable = joinPredicate.rightTable;
QueryOperator leftOp = null;
QueryOperator rightOp = null;
HashSet<String> set = new HashSet<>(tables);
QueryOperator op = null;
// 如果右表为还没有被join的表,则将其从pass1Map中提取出来
if (tables.contains(leftTable) && !tables.contains(rightTable)) {
leftOp = prevMap.get(tables);
for (Set<String> rTables : pass1Map.keySet()) {
if (rTables.contains(rightTable)) {
rightOp = pass1Map.get(rTables);
}
}
set.add(rightTable);
op = minCostJoinType(leftOp, rightOp, leftColumn, rightColumn);
} else if (!tables.contains(leftTable) && tables.contains(rightTable)) {
rightOp = prevMap.get(tables);
for (Set<String> lTables : pass1Map.keySet()) {
if (lTables.contains(leftTable)) {
leftOp = pass1Map.get(lTables);
}
}
set.add(leftTable);
op = minCostJoinType(rightOp, leftOp, rightColumn, leftColumn);
} else {
continue;
}
// QueryOperator op = minCostJoinType(leftOp, rightOp, leftColumn, rightColumn);
// System.out.println(op.toString());
int oldCost = Integer.MAX_VALUE;
if (result.containsKey(set)) {
QueryOperator oldOp = result.get(set);
oldCost = oldOp.estimateIOCost();
}
if (op.estimateIOCost() < oldCost) {
result.put(set, op);
}
}
}
return result;
}
Task 7: Optimal Plan Selection
外部驱动方法实现
首先获取读取数据页的最快速方法,接着循环调用Join,直到结果集中只剩下一个元素。
本题dp思想的体现:每一轮Pass,都会计算出拼接i
张表需要的最小IO开销,因此在Pass i+1 时,只需要求解出进行本次join最小的开销即可。例如:在Pass 2中求出 t1,t2,t3 | t1,t2,t4的最小开销,则在Pass 3时会计算Pass 2中的两种情况,从哪一种可以用更低的开销实现四表拼接。
public Iterator<Record> execute() {
this.transaction.setAliasMap(this.aliases);
Map<Set<String>, QueryOperator> pass1Map = new HashMap<>();
for (String tableName : tableNames) {
QueryOperator op = minCostSingleAccess(tableName);
Set<String> tables = new HashSet<>();
tables.add(tableName);
pass1Map.put(tables, op);
}
Map<Set<String>, QueryOperator> prevMap = Map.copyOf(pass1Map);
while (prevMap.size() != 1) {
prevMap = minCostJoins(prevMap, pass1Map);
}
// Set the final operator to the lowest cost operator from the last
// pass, add group by, project, sort and limit operators, and return an
// iterator over the final operator.
this.finalOperator = minCostOperator(prevMap);
this.addGroupBy();
this.addProject();
this.addSort();
this.addLimit();
return finalOperator.iterator();
}
Concurrency
Queuing
Task1:LockType
首先要实现数据库对各种锁类型的支持,需要实现的方法非常简单。
首先是判断两种锁类型是否兼容。有手就行
public static boolean compatible(LockType a, LockType b) {
if (a == null || b == null) {
throw new NullPointerException("null lock type");
}
if (a == LockType.NL || b == LockType.NL) {
return true;
}
if (a == LockType.X || b == LockType.X) {
return false;
}
if ((a == LockType.SIX || b == LockType.SIX) && a != b) {
return false;
}
if ((a == LockType.S && b == LockType.IX) || (a == LockType.IX && b == LockType.S)) {
return false;
}
return true;
}
然后要实现 判断某一种锁是否能被另一把锁替代 的方法,也非常简单
public static boolean substitutable(LockType substitute, LockType required) {
if (required == null || substitute == null) {
throw new NullPointerException("null lock type");
}
// TODO(proj4_part1): implement
if (required == substitute) {
return true;
}
if (required == LockType.X) {
return false;
}
if (required == LockType.SIX && LockType.X != substitute) {
return false;
}
if (required == LockType.S && LockType.SIX != substitute
&& LockType.X != substitute) {
return false;
}
if (required == LockType.IX && (substitute == LockType.IS || substitute == LockType.NL)) {
return false;
}
if (required == LockType.IS && substitute == LockType.NL) {
return false;
}
return true;
}
最后实现 判断某一把锁是否可以作为另一把锁的父级锁
先获取对各类锁来说最基本的父级锁。然后调用上一问实现的方法,判断提供的parentLockType
是否为基本的父级锁类型,而基本类型的替代类型也可以返回true。
public static boolean canBeParentLock(LockType parentLockType, LockType childLockType) {
if (parentLockType == null || childLockType == null) {
throw new NullPointerException("null lock type");
}
return substitutable(parentLockType, parentLock(childLockType));
}
// 提供的辅助方法
public static LockType parentLock(LockType a) {
if (a == null) {
throw new NullPointerException("null lock type");
}
switch (a) {
case S: return IS;
case X: return IX;
case IS: return IS;
case IX: return IX;
case SIX: return IX;
case NL: return NL;
default: throw new UnsupportedOperationException("bad lock type");
}
}
Task 2: LockManager
本题中主要实现以下方法acquireAndRelease()
, acquire()
, release()
, promote()
LockManager
负责管理所有的锁对象,以及各种资源的分配。该级别负责排队逻辑,根据需要阻塞/解除阻塞事务,并且判断事务是否具有某个锁的来源。
在实现LockManager的主要方法之前,先实现资源管理类的有关方法。
首先是检查当前事务向该资源添加的锁是否兼容。有手就行。
public boolean checkCompatible(LockType lockType, long except) {
for (Lock lock : locks) {
if (lock.transactionNum == except) {
continue;
}
if (!LockType.compatible(lock.lockType, lockType)) {
return false;
}
}
return true;
}
接下来是向该资源上锁,或是修改锁的类型。
实现思路:遍历当前资源中的锁,如果有事务编号和插入锁一致的,就代表该事务原本就持有涉及该资源的锁(由于每个事务最多给一个资源上一把锁,因此这里我们可以用事务编号来查找当前事务是否持有该资源的锁)。如果已经持有锁,那么就修改锁的类型之后返回。否则就进行插入操作。
public void grantOrUpdateLock(Lock lock) {
// 如果这把锁对应的事务已经在该资源上有了锁,就进行更新
for (Lock existLock : locks) {
if (existLock.transactionNum.equals(lock.transactionNum)) {
existLock.lockType = lock.lockType;
return;
}
}
// 否则就是新建锁,但由于该事务可能是头回加入,所以要进行额外的处理
locks.add(lock);
if (transactionLocks.containsKey(lock.transactionNum)) {
transactionLocks.get(lock.transactionNum).add(lock);
} else {
ArrayList<Lock> tLocks = new ArrayList<>();
tLocks.add(lock);
transactionLocks.put(lock.transactionNum, tLocks);
}
}
将被阻塞的锁请求添加到阻塞队列中。需要注意的只有一点,在将请求添加到队列之后,要让涉及的事务做好阻塞准备(题目要求)。
public void addToQueue(LockRequest request, boolean addFront) {
if (addFront) {
waitingQueue.addFirst(request);
} else {
waitingQueue.addLast(request);
}
request.transaction.prepareBlock();
return;
}
接着实现阻塞队列释放的方法
实现思路:不断获取下一条请求,直至阻塞队列为空或者需要的锁仍然无法被获取。对于可以实现的请求,将需要的锁授予对应的事务,并解除事务的阻塞状态。如果该请求中还包含要释放的锁,则在此处调用对应的释放方法。
private void processQueue() {
Iterator<LockRequest> requests = waitingQueue.iterator();
while (requests.hasNext()) {
LockRequest req = requests.next();
if (checkCompatible(req.lock.lockType, req.transaction.getTransNum())) {
if (LockType.substitutable(req.lock.lockType,
getTransactionLockType(req.transaction.getTransNum()))
) {
grantOrUpdateLock(req.lock);
if (req.releasedLocks != null && !req.releasedLocks.isEmpty()) {
for (Lock releasedLock : req.releasedLocks) {
ResourceEntry resourceEntry = getResourceEntry(releasedLock.name);
resourceEntry.releaseLock(releasedLock);
}
}
}
waitingQueue.remove(req);
req.transaction.unblock();
} else {
return;
}
}
return;
}
释放锁的方法。将锁从当前资源以及指定事务的锁集合中移除,如果事务已经不再持有锁,就将该事务从锁记录中移除,避免OOM。然后释放队列中的请求。
public void releaseLock(Lock lock) {
locks.remove(lock);
transactionLocks.get(lock.transactionNum).remove(lock);
// 如果事务内已经没有锁,就释放空间防止OOM
if (transactionLocks.get(lock.transactionNum).isEmpty()) {
transactionLocks.remove(lock.transactionNum);
}
processQueue();
return;
}
获取指定资源中指定事务的锁类型。非常简单。
public LockType getTransactionLockType(long transaction) {
for (Lock lock : locks) {
if (lock.transactionNum.equals(transaction)) {
return lock.lockType;
}
}
return LockType.NL;
}
有了以上基础,后面的实现就比较简单了,4个方法的实现同质化比较严重。acquireAndRelease()
实现,用同步代码块包裹涉及锁请求的操作,避免并发请求锁带来问题。
先进行异常状态的处理,如果要获取的锁类型和当前已经持有的相同则抛出异常,避免处理无用请求。接着,如果要释放的锁根本不存在,则抛出异常,期间可以收集一会儿要释放的锁,存储到集合中。
异常处理完毕后,检查锁的兼容性,如果不兼容,则代表当前请求需要被阻塞,修改标记变量,将请求添加到队列头部(高优先级操作),该请求中应当额外包含当锁被请求到时,要释放的锁列表。如果不需要阻塞,则直接授予事务对应的锁,并释放指定的锁。
public void acquireAndRelease(TransactionContext transaction, ResourceName name,
LockType lockType, List<ResourceName> releaseNames)
throws DuplicateLockRequestException, NoLockHeldException {
boolean shouldBlock = false;
synchronized (this) {
LockType exist = getLockType(transaction, name);
ResourceEntry resourceEntry = getResourceEntry(name);
if (exist == lockType) {
throw new DuplicateLockRequestException("duplicate request");
}
List<Lock> releaseLocks = new ArrayList<>();
boolean noLock = false;
for (ResourceName releaseName : releaseNames) {
LockType type = getLockType(transaction, releaseName);
if (type == LockType.NL) {
noLock = true;
} else {
releaseLocks.add(new Lock(releaseName, type, transaction.getTransNum()));
}
}
if (noLock) {
throw new NoLockHeldException("no lock");
}
Lock newLock = new Lock(name, lockType, transaction.getTransNum());
if (!resourceEntry.checkCompatible(lockType, transaction.getTransNum())) {
shouldBlock = true;
LockRequest req = new LockRequest(transaction, newLock, releaseLocks);
resourceEntry.addToQueue(req, true);
} else {
resourceEntry.grantOrUpdateLock(newLock);
for (Lock releaseLock : releaseLocks) {
getResourceEntry(releaseLock.name).releaseLock(releaseLock);
}
}
}
if (shouldBlock) {
transaction.block();
}
}
acquire
的实现和acquireAndRelease
没有太大区别,仅仅只是不包含释放锁的列表,以及阻塞时将请求插在队尾。不过由于该方法的优先级不高,请求插入在队尾,因此除了锁不兼容,当阻塞队列中有内容时也会被阻塞。
public void acquire(TransactionContext transaction, ResourceName name,
LockType lockType) throws DuplicateLockRequestException {
boolean shouldBlock = false;
synchronized (this) {
LockType exist = getLockType(transaction, name);
ResourceEntry resourceEntry = getResourceEntry(name);
if (exist == lockType) {
throw new DuplicateLockRequestException("duplicate request");
}
Lock newLock = new Lock(name, lockType, transaction.getTransNum());
if (!resourceEntry.checkCompatible(lockType, transaction.getTransNum())
|| !resourceEntry.waitingQueue.isEmpty()) {
shouldBlock = true;
LockRequest req = new LockRequest(transaction, newLock);
resourceEntry.addToQueue(req, false);
} else {
resourceEntry.grantOrUpdateLock(newLock);
}
}
if (shouldBlock) {
transaction.block();
}
}
release
方法的实现非常简单,没什么好说的。
public void release(TransactionContext transaction, ResourceName name)
throws NoLockHeldException {
synchronized (this) {
LockType exist = getLockType(transaction, name);
if (exist == LockType.NL) {
throw new NoLockHeldException("no lock");
}
ResourceEntry resourceEntry = getResourceEntry(name);
resourceEntry.releaseLock(new Lock(name, exist, transaction.getTransNum()));
}
}
promote
方法用于更新锁的类型,属于高优先级操作,阻塞时将请求添加到队首,处理异常时要额外处理锁类型非法的情况,即当新的所类型不是原来锁类型的加强版时,抛出异常。
public void promote(TransactionContext transaction, ResourceName name,
LockType newLockType)
throws DuplicateLockRequestException, NoLockHeldException, InvalidLockException {
boolean shouldBlock = false;
synchronized (this) {
LockType exist = getLockType(transaction, name);
ResourceEntry resourceEntry = getResourceEntry(name);
if (exist == newLockType) {
throw new DuplicateLockRequestException("duplicate request");
} else if (exist == LockType.NL) {
throw new NoLockHeldException("no lock");
} else if (!LockType.substitutable(newLockType, exist)) {
throw new InvalidLockException("invalid lock type");
}
Lock newLock = new Lock(name, newLockType, transaction.getTransNum());
if (!resourceEntry.checkCompatible(newLockType, transaction.getTransNum())) {
shouldBlock = true;
LockRequest req = new LockRequest(transaction, newLock);
resourceEntry.addToQueue(req, true);
} else {
resourceEntry.grantOrUpdateLock(newLock);
}
}
if (shouldBlock) {
transaction.block();
}
}
Multigranularity
Task 3: LockContext
本题中要实现LockContext的有关方法,它在本数据库架构中负责对指定的某一资源进行操作,也就是树状结构中的结点。
接下来我们要实现acquire()
, promote()
, release()
, escalte()
, getExplicitLockType()
, getEffectiveLockType()
在实现涉及锁操作的方法前,先实现有关的获取锁类型的方法会更高效一些。
getExplicitLockType()
就是获取当前context中,对应事务的锁类型,可以直接调用之前在LockManager中实现的方法实现。
public LockType getExplicitLockType(TransactionContext transaction) {
if (transaction == null) return LockType.NL;
return lockman.getLockType(transaction, name);
}
getEffectiveLockType()
首先会获取当前上下文中对应事务的锁类型,操作和getExplicitLockType()
无异。如果当前上下文中没有锁,就开始考虑受到父节点影响的,隐式的锁类型(例如父节点获取了SIX锁,则相当于子节点都隐式地获取了S锁),需要注意的是,意向锁并不会隐式传递给子节点。
需要额外处理的就是SIX锁与意向锁。
public LockType getEffectiveLockType(TransactionContext transaction) {
if (transaction == null) return LockType.NL;
LockType lockType = LockType.NL;
LockContext ctx = this;
do {
lockType = lockman.getLockType(transaction, ctx.name);
if (lockType == LockType.SIX) {
return LockType.S;
}
if (lockType.isIntent()) {
lockType = LockType.NL;
}
ctx = ctx.parent;
} while (ctx != null && lockType == LockType.NL);
return lockType;
}
hasSIXAncestor()
会判断当前上下文的祖先结点是否持有SIX锁,实现与getEffectiveLockType()
仅在循环中的判断。
private boolean hasSIXAncestor(TransactionContext transaction) {
LockType lockType = LockType.NL;
LockContext ctx = this;
do {
lockType = lockman.getLockType(transaction, ctx.name);
if (lockType == LockType.SIX) {
return true;
}
ctx = ctx.parent;
} while (ctx != null && lockType == LockType.NL);
return false;
}
实现完锁类型获取方法后,我们再实现操作子结点锁数量的方法。由于锁的一部分操作需要子结点锁数量的记录,因此必须确保相关操作后,对应的数量都能实时更新。
因为数据库的层级结构为树状结构,所以很容易能想到要使用递归来处理。递归的终止条件为到达根节点,期间将每一层结点记录的子结点锁数量修改成指定的值。注意在添加时,事务的锁信息可能还未曾被context记录过,因此使用getOrDefault()
private void increaseChildLockNum(TransactionContext transaction, LockContext parent, int delta) {
if (parent == null) return;
Integer num = parent.numChildLocks.getOrDefault(transaction.getTransNum(), 0);
num += delta;
parent.numChildLocks.put(transaction.getTransNum(), num);
increaseChildLockNum(transaction, parent.parent, delta);
}
private void decreaseChildLockNum(TransactionContext transaction, LockContext parent, int delta) {
if (parent == null) return;
Integer num = parent.numChildLocks.getOrDefault(transaction.getTransNum(), 0);
if (num == 0) return;
num -= delta;
parent.numChildLocks.put(transaction.getTransNum(), num);
decreaseChildLockNum(transaction, parent.parent, delta);
}
有了以上辅助方法,可以开始实现一部分锁操作方法。acquire()
要处理的异常有:
- 只读资源被尝试上锁
- 重复请求同类型锁
- 祖先已持有SIX锁,仍旧尝试请求S锁或IS锁,带来冗余。
- 获取的新锁无法将已有的锁作为前置,例如,无法从IS锁获取到X锁。
前3种非常好处理,最后一种要使用之前在LockType实现的canBeParentLock()
方法来判断新锁能否将旧锁作为前置,注意要剔除没有锁的情况,无锁肯定不满足成为任意一把锁的父级锁的条件,但能作为任何一把锁的前置。初次之外,由于getEffectiveLockType()
方法会将意向锁处理为无锁,因此需要再添加一个帮助方法,它会获取离自己最近的一把锁作为影响锁。
private LockType acquireEffectiveLockType(TransactionContext transaction) {
if (transaction == null) return LockType.NL;
LockType lockType = LockType.NL;
LockContext ctx = this;
do {
lockType = lockman.getLockType(transaction, ctx.name);
ctx = ctx.parent;
} while (ctx != null && lockType == LockType.NL);
return lockType;
}
public void acquire(TransactionContext transaction, LockType lockType)
throws InvalidLockException, DuplicateLockRequestException {
if (readonly) {
throw new UnsupportedOperationException("the context is readonly");
}
LockType existLock = acquireEffectiveLockType(transaction);
System.out.println(existLock);
if ((lockType == LockType.S || lockType == LockType.IS) && hasSIXAncestor(transaction)) {
throw new InvalidLockException("invalid request");
}
if (existLock != LockType.NL && !LockType.canBeParentLock(existLock, lockType)) {
throw new InvalidLockException("invalid lock");
}
lockman.acquire(transaction, name, lockType);
increaseChildLockNum(transaction, parent, 1);
return;
}
release()
会释放指定事务在当前资源持有的锁,他要处理的异常有:
- 要释放的锁不存在
- 子结点还有锁没有被释放 在多粒度锁协议中,每次锁定都是直接锁定一条路径,因此如果要释放锁,则当前结点所能到达的子结点都不能持有锁。
- 只读异常
public void release(TransactionContext transaction)
throws NoLockHeldException, InvalidLockException {
if (readonly) {
throw new UnsupportedOperationException("readonly");
}
LockType curExist = getExplicitLockType(transaction);
if (curExist == LockType.NL) {
throw new NoLockHeldException("no lock");
}
if (numChildLocks.getOrDefault(transaction.getTransNum(), 0) != 0) {
throw new InvalidLockException("invalid");
}
lockman.release(transaction, name);
decreaseChildLockNum(transaction, parent, 1);
return;
}
promotion()
方法可以将当前事务在资源中持有的锁进行更新,需要注意的是,如果目标是SIX锁,则必须将当前结点的子结点锁持有的S锁和IS锁全部释放,因此我们先实现一个辅助方法来获取持有S锁和IS锁的所有子结点。
private List<ResourceName> sisDescendants(TransactionContext transaction) {
List<ResourceName> names = new ArrayList<>();
List<Lock> locks = lockman.getLocks(transaction);
for (Lock lock : locks) {
if (lock.lockType == LockType.S || lock.lockType == LockType.IS) {
if (lock.name.isDescendantOf(this.name)) {
names.add(lock.name);
}
}
}
return names;
}
接下来我们就可以开始实现promotion()
方法了,他要处理的异常有:
- 只读异常
- 重复请求异常
- 锁不存在异常
- 请求的锁不是原锁的强化版,用LockType的
substituable()
判断
如果目标是SIX锁,就先获取持有S锁和IS锁的所有子结点,然后将他们全部释放,注意此时decreaseChildLockNum()
传入的不是父节点而是自身,因为删除操作并非针对当前资源,而是它的子结点。public void promote(TransactionContext transaction, LockType newLockType) throws DuplicateLockRequestException, NoLockHeldException, InvalidLockException { if (readonly) { throw new UnsupportedOperationException("readonly"); } LockType existLock = getExplicitLockType(transaction); if (existLock == newLockType) { throw new DuplicateLockRequestException("duplicate"); } else if (existLock == LockType.NL) { throw new NoLockHeldException("no lock"); } if (!LockType.substitutable(newLockType, existLock)) { throw new InvalidLockException("invalid lock"); } if (newLockType == LockType.SIX) { List<ResourceName> names = sisDescendants(transaction); for (ResourceName resourceName : names) { lockman.release(transaction, resourceName); } decreaseChildLockNum(transaction, this, names.size()); } lockman.promote(transaction, name, newLockType); return; }
最后就是escalate()
方法,它会将某个事务在指定资源中所有的锁都提升到指定的粒度,从此以下的锁全部释放。因此我们需要实现一个辅助方法来获取所有持有锁的子结点。
基本逻辑和sisDescendants
类似
private List<ResourceName> allChildrenNames(TransactionContext transaction) {
List<ResourceName> names = new ArrayList<>();
List<Lock> locks = lockman.getLocks(transaction);
for (Lock lock : locks) {
if (lock.name.isDescendantOf(name))
names.add(lock.name);
}
return names;
}
escalate在本题中只会将某一粒度的锁提升为S锁或X锁,IS->S, IX/SIX->X。这里可以使用之前实现的LockManager的acquireAndRelease()
,这样就无需再一遍遍调用release()
。
public void escalate(TransactionContext transaction) throws NoLockHeldException {
if (readonly) {
throw new UnsupportedOperationException("unsupported");
}
LockType existLockType = getExplicitLockType(transaction);
if (existLockType == LockType.NL) {
throw new NoLockHeldException("no lock");
}
List<ResourceName> names = allChildrenNames(transaction);
decreaseChildLockNum(transaction, this, names.size());
if (existLockType == LockType.IS) {
lockman.acquireAndRelease(transaction, name, LockType.S, names);
} else if (existLockType == LockType.IX || existLockType == LockType.SIX) {
lockman.acquireAndRelease(transaction, name, LockType.X, names);
}
return;
}
Task 4: LockUtil
拥有了以上基础,我们就可以着手将请求锁的方法封装为一个工具类。该工具类需要具备如下功能:
- 永远只在已有锁的基础上,做最小程度的升级来满足新的要求。
- 只能升级锁,不能降级。
- 如果父级结点不满足新锁的前置条件,则为父级结点请求需要的前置锁。
因此我们可以将本题划分为两个部分:
- 确保所有的父级结点持有的锁都满足了前置条件。
- 根据现有的锁,最小幅度地提升锁的粒度来满足条件。
我们可以将碰到的情况分为如下几种:
- 现有的锁已经满足了请求锁的要求,函数不做任何处理。
- 现有的锁为IX锁,请求锁为S锁,由于本方法不能让锁的粒度降低,但又要满足新的要求,所以此时分配SIX锁。
- 当前的锁为意向锁,继续请求锁必然意味着粒度提高,不再是意向锁(特殊情况就是第2种情况),此时需要子结点放弃持有的锁,然后升级。
- 其余情况,如果没有当前结点没有锁,就请求新锁,已有锁就升级。
分析完毕之后,先开始实现本题的第一部分,确保所有的父级结点都满足条件。鉴于粒度结构属于树状结构,因此使用递归来实现。
递归的终止条件设定为遍历到达根节点,或者当前结点的条件已经满足要求,否则就请求需要的锁并继续递归访问父节点。
private static void ensureAncestorLock(LockContext parent, LockType requestType, TransactionContext transaction) {
if (parent == null || requestType == null) return;
LockType effectiveLockType = parent.getEffectiveLockType(transaction);
LockType explicitLockType = parent.getExplicitLockType(transaction);
if (LockType.substitutable(explicitLockType, requestType) || LockType.substitutable(effectiveLockType, requestType))
return;
LockContext grandContext = parent.parentContext();
LockType grandLock = LockType.parentLock(requestType);
ensureAncestorLock(grandContext, grandLock, transaction);
if (explicitLockType == LockType.NL) {
parent.acquire(transaction, requestType);
} else {
parent.promote(transaction, requestType);
}
}
接下来就可以轻松实现ensureSufficientLockHeld()
public static void ensureSufficientLockHeld(LockContext lockContext, LockType requestType) {
// requestType must be S, X, or NL
assert (requestType == LockType.S || requestType == LockType.X || requestType == LockType.NL);
// Do nothing if the transaction or lockContext is null
TransactionContext transaction = TransactionContext.getTransaction();
if (transaction == null || lockContext == null) return;
// You may find these variables useful
LockContext parentContext = lockContext.parentContext();
LockType effectiveLockType = lockContext.getEffectiveLockType(transaction);
LockType explicitLockType = lockContext.getExplicitLockType(transaction);
if (LockType.substitutable(explicitLockType, requestType)) {
return;
}
// 如果当前隐式的锁类型已经满足需要,就不做任何操作
if (LockType.substitutable(effectiveLockType, requestType)) {
return;
}
// 确保父级已经获取了前置锁
if (parentContext != null) {
LockType parentLock = LockType.parentLock(requestType);
ensureAncestorLock(parentContext, parentLock, transaction);
}
if (explicitLockType == LockType.IX && requestType == LockType.S) {
lockContext.promote(transaction, LockType.SIX);
return;
}
if (explicitLockType.isIntent()) {
lockContext.escalate(transaction);
return;
}
if (explicitLockType == LockType.NL) {
lockContext.acquire(transaction, requestType);
} else {
lockContext.promote(transaction, requestType);
}
return;
}
Task5非常简单,就不多赘述了。
Recovery
Forward Processing
Task 1:Transaction Status
本题要实现事务状态的转变,要实现的方法为commit()
, abort()
, end()
需要注意的点有:
- commit方法在返回结果前,要将对应的日志刷入磁盘来保证数据的持久性。
- end方法在面对已经abort了的事务时,要将该事务做的所有操作回滚。
commit和abort实现方法类似,都是先获取事务的最后一条日志编号,然后根据他生成新的日志,最后修改事务状态,以及事务对应的最后一条记录编号
public long commit(long transNum) {
TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);
long lastLSN = transactionTableEntry.lastLSN;
long curLSN = logManager.appendToLog(new CommitTransactionLogRecord(transNum, lastLSN));
logManager.flushToLSN(curLSN);
transactionTableEntry.transaction.setStatus(Transaction.Status.COMMITTING);
transactionTableEntry.lastLSN = curLSN;
return curLSN;
}
public long abort(long transNum) {
TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);
long lastLSN = transactionTableEntry.lastLSN;
long abortLSN = logManager.appendToLog(new AbortTransactionLogRecord(transNum, lastLSN));
transactionTableEntry.lastLSN = abortLSN;
transactionTableEntry.transaction.setStatus(Transaction.Status.ABORTING);
return abortLSN;
}
end方法相较以上两者,多了回滚的处理。首先需要找出回滚到那一条日志编号,显然对于一个被抛弃的事务来说,他所做的所有操作都要被回滚,因此可以从最后一条记录开始一步一步往回,直到没有前一条日志。在回滚结束后,添加End日志并将该事务从事务表中移除。
public long end(long transNum) {
TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);
if (transactionTableEntry.transaction.getStatus().equals(Transaction.Status.ABORTING)) {
LogRecord logRecord = logManager.fetchLogRecord(transactionTableEntry.lastLSN);
while (logRecord != null && logRecord.getPrevLSN().isPresent()) {
Long prevLSN = logRecord.getPrevLSN().get();
logRecord = logManager.fetchLogRecord(prevLSN);
}
rollbackToLSN(transNum, logRecord.getLSN());
}
long lastLSN = transactionTableEntry.lastLSN;
EndTransactionLogRecord endRecord = new EndTransactionLogRecord(transNum, lastLSN);
long l = logManager.appendToLog(endRecord);
transactionTableEntry.lastLSN = l;
transactionTableEntry.transaction.setStatus(Transaction.Status.COMPLETE);
transactionTable.remove(transNum);
return l;
}
具体的回滚操作步骤如下:
- 如果当前所处的日志编号大于目标编号,则继续,否则退出
- 如果该条日志记录的是可undo的操作,则添加一条clr日志,并执行undo操作
- 向之前的日志记录移动
private void rollbackToLSN(long transNum, long LSN) { TransactionTableEntry transactionEntry = transactionTable.get(transNum); LogRecord lastRecord = logManager.fetchLogRecord(transactionEntry.lastLSN); long lastRecordLSN = lastRecord.getLSN(); long currentLSN = lastRecord.getUndoNextLSN().orElse(lastRecordLSN); while (currentLSN > LSN) { LogRecord logRecord = logManager.fetchLogRecord(currentLSN); if (logRecord.isUndoable()) { // 获取clr日志 LogRecord clr = logRecord.undo(transactionEntry.lastLSN); long lsn = logManager.appendToLog(clr); // 更新entry信息 transactionEntry.lastLSN = lsn; // 调用redo来执行undo操作(题目要求) clr.redo(this, diskSpaceManager, bufferManager); } currentLSN = logRecord.getPrevLSN().orElse(-1L); } }
Task 2:Logging
本题需要实现日志功能,logAllocPart
, logFreePart
, logAllocPage
, logFreePage
均由题目实现,我们需要实现logPageWrite()
,用于记录数据页写入操作的日志。实现与abort类似,唯一要注意的是,如果dirtyPage Table中已经有了这张表的修改信息,那就不要在往里面put,因为该表记录的LSN属于首个记录对该表的修改操作的日志。
public long logPageWrite(long transNum, long pageNum, short pageOffset, byte[] before,
byte[] after) {
assert (before.length == after.length);
assert (before.length <= BufferManager.EFFECTIVE_PAGE_SIZE / 2);
TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);
long lastLSN = transactionTableEntry.lastLSN;
UpdatePageLogRecord updatePageLogRecord = new UpdatePageLogRecord(transNum, pageNum, lastLSN, pageOffset, before, after);
long lsn = logManager.appendToLog(updatePageLogRecord);
transactionTableEntry.lastLSN = lsn;
dirtyPageTable.putIfAbsent(pageNum, lsn);
return lsn;
}
Task 3: Savepoints
本题要实现根据存档点回滚,存档点有它对应的LSN,因此我们可以直接调用实现过的rollBackToLSN()
。
public void rollbackToSavepoint(long transNum, String name) {
TransactionTableEntry transactionEntry = transactionTable.get(transNum);
assert (transactionEntry != null);
// All of the transaction's changes strictly after the record at LSN should be undone.
long savepointLSN = transactionEntry.getSavepoint(name);
rollbackToLSN(transNum, savepointLSN);
return;
}
Task 4: Checkpoints
在之前的课程中,我们了解到,数据库会周期性的记录一些模糊检查点,来提升恢复操作的效率。本题我们就是要实现检查点的功能。
设定检查点的要求为:如果目前脏页表和事务表的大小超过某一阈值,就将这些部分交给一个endcheckpoint管理,之后开始重新记录,直至下一个检查点达到要求。
实现思路:
- 遍历脏页表,如果当前两表大小达到要求,就新建一个检查点。然后清空用于记录的临时表,重新记录。
- 遍历事务表,继续和上一步一样的操作。
- 建立最后一个检查点。
tips:EndCheckpointLogRecord.fitsInOneRecord()
是用于判断当前两表大小是否符合要求的方法。public synchronized void checkpoint() { // Create begin checkpoint log record and write to log LogRecord beginRecord = new BeginCheckpointLogRecord(); long beginLSN = logManager.appendToLog(beginRecord); Map<Long, Long> chkptDPT = new HashMap<>(); Map<Long, Pair<Transaction.Status, Long>> chkptTxnTable = new HashMap<>(); // TODO(proj5): generate end checkpoint record(s) for DPT and transaction table int pageNumber = 0; int transactionNumber = 0; for (Long pageNum : dirtyPageTable.keySet()) { Long recLSN = dirtyPageTable.get(pageNum); // 如果再加新的一页不能满足条件,就新建一个checkpoint if (!EndCheckpointLogRecord.fitsInOneRecord(pageNumber + 1, transactionNumber)) { EndCheckpointLogRecord endpoint = new EndCheckpointLogRecord(new HashMap<>(chkptDPT), new HashMap<>(chkptTxnTable)); logManager.appendToLog(endpoint); flushToLSN(endpoint.getLSN()); chkptDPT.clear(); pageNumber = 0; } pageNumber += 1; chkptDPT.put(pageNum, recLSN); } for (Long transNum : transactionTable.keySet()) { TransactionTableEntry transactionTableEntry = transactionTable.get(transNum); if (!EndCheckpointLogRecord.fitsInOneRecord(pageNumber, transactionNumber + 1)) { EndCheckpointLogRecord endpoint = new EndCheckpointLogRecord(new HashMap<>(chkptDPT), new HashMap<>(chkptTxnTable)); logManager.appendToLog(endpoint); flushToLSN(endpoint.getLSN()); chkptDPT.clear(); chkptTxnTable.clear(); pageNumber = 0; transactionNumber = 0; } Transaction transaction = transactionTableEntry.transaction; chkptTxnTable.put(transNum, new Pair<>(transaction.getStatus(), transactionTableEntry.lastLSN)); transactionNumber += 1; } // Last end checkpoint record LogRecord endRecord = new EndCheckpointLogRecord(chkptDPT, chkptTxnTable); logManager.appendToLog(endRecord); // Ensure checkpoint is fully flushed before updating the master record flushToLSN(endRecord.getLSN()); // Update master record MasterLogRecord masterRecord = new MasterLogRecord(beginLSN); logManager.rewriteMasterRecord(masterRecord); }
Restart Recovery
Task 5: Analysis
在恢复操作开始时,首先需要进行分析。来重建系统崩溃时的事务表和脏页表
- 获取日志的Master record,借此来获取begincheckpoint(在初始化时,这两个对象会被一并创建)
- 遍历之后的所有日志,接下来讲每种日志都怎么处理。
- 事务操作日志:如果对应的事务没有在事务表里,就将其添加进去,然后更新事务的lastLSN
- 数据页操作日志:
UpdatePage/UndoUpdatePage
会修改内存中的数据页,讲其添加进脏页表,但不需要刷入磁盘。FreePage/UndoAllocPage
会将数据持久化到磁盘,将对应的数据页从脏页表删除。AllocPage/UndoFreePage
不需要我们处理。 - 事务状态日志:将对应的事务状态调为
COMMITTING
,RECOVERY_ABORTING
, orCOMPLETE
。如果是end日志,首先要将事务清理掉(调用cleanup()
),然后从事务表中移除,接着将事务编号记录到endedTransactions
集合中,最后修改事务状态。 - 检查点日志:对于
BeginCheckpoint
不需要任何操作。对于EndCheckpoint
,将检查点记录的脏页表全部加入到脏页表中,即使脏页表中存在某条记录(考虑到检查点提供的信息的准确性是高于日志的)。对于事务表中的每一条事务,如果已经处于endedTransactions
,则不用管,如果有未被记录的,就进行添加。并且,只有当检查点中的事务的lastLSN大于等于目前内存中事务的lastLSN时才修改内存中的日志编号。最后,如果检查点中事务状态优先于内存中事务状态,则进行修改。 - 最后遍历一遍当前的事务表,将处于
COMMITING
状态的事务结束,处于RUNNING
状态的调整为RECOVERY_ABORTING
。要注意操作的先后顺序。
定义的用于判断事务状态优先级的方法。遵循:
- running -> committing -> complete
- running -> aborting -> complete
/** 判断状态1是否比状态2领先 */ private boolean judgeAdvance(Transaction.Status type1, Transaction.Status type2) { if (type1.equals(Transaction.Status.RUNNING)) return false; if (type1.equals(Transaction.Status.ABORTING) || type1.equals(Transaction.Status.COMMITTING)) { if (type2.equals(Transaction.Status.RUNNING)) return true; else return false; } if (type1.equals(Transaction.Status.COMPLETE)) return true; return true; }
void restartAnalysis() {
// Read master record
LogRecord record = logManager.fetchLogRecord(0L);
// Type checking
assert (record != null && record.getType() == LogType.MASTER);
MasterLogRecord masterRecord = (MasterLogRecord) record;
// Get start checkpoint LSN
long LSN = masterRecord.lastCheckpointLSN;
// Set of transactions that have completed
Set<Long> endedTransactions = new HashSet<>();
// TODO(proj5): implement
Iterator<LogRecord> iterator = logManager.scanFrom(LSN);
while (iterator.hasNext()) {
LogRecord logRecord = iterator.next();
// 如果这是一次事务操作
if (logRecord.getTransNum().isPresent()) {
Long transNum = logRecord.getTransNum().get();
// 如果是一个新事务,就将其添加到事务表里
if (transactionTable.get(transNum) == null) {
Transaction transaction = newTransaction.apply(transNum);
startTransaction(transaction);
}
// 更新事务的LSN
TransactionTableEntry tableEntry = transactionTable.get(transNum);
tableEntry.lastLSN = logRecord.getLSN();
}
// 如果是数据页操作
if (logRecord.getPageNum().isPresent()) {
LogType type = logRecord.getType();
// 对于更新涉及写入的更新操作,将该页加入脏页表中
if (type.equals(LogType.UPDATE_PAGE) || type.equals(LogType.UNDO_UPDATE_PAGE)) {
dirtyPage(logRecord.getPageNum().get(), logRecord.getLSN());
}
// 对于内存管理,先将更改写入磁盘,再从脏页表中移除
if (type.equals(LogType.FREE_PAGE) || type.equals(LogType.UNDO_ALLOC_PAGE)) {
pageFlushHook(logRecord.getLSN());
dirtyPageTable.remove(logRecord.getPageNum().get());
}
// 其余操作不需要管理
}
if (logRecord.getType().equals(LogType.COMMIT_TRANSACTION)) {
Long transNum = logRecord.getTransNum().get();
TransactionTableEntry tableEntry = transactionTable.get(transNum);
tableEntry.transaction.setStatus(Transaction.Status.COMMITTING);
}
if (logRecord.getType().equals(LogType.ABORT_TRANSACTION)) {
Long transNum = logRecord.getTransNum().get();
TransactionTableEntry tableEntry = transactionTable.get(transNum);
tableEntry.transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);
}
// 如果是结束事务的日志,则需要先清除事务,从事务表移除,添加到endedTransactions,最后修改状态
if (logRecord.getType().equals(LogType.END_TRANSACTION)) {
Long transNum = logRecord.getTransNum().get();
TransactionTableEntry tableEntry = transactionTable.get(transNum);
tableEntry.transaction.cleanup();
transactionTable.remove(transNum);
endedTransactions.add(transNum);
tableEntry.transaction.setStatus(Transaction.Status.COMPLETE);
}
if (logRecord.getType().equals(LogType.END_CHECKPOINT)) {
// 对于检查点脏页表的数据,将其全部加入
Map<Long, Long> dpt = logRecord.getDirtyPageTable();
dirtyPageTable.putAll(dpt);
Map<Long, Pair<Transaction.Status, Long>> tTable = logRecord.getTransactionTable();
// 如果检查点的事务表中有事务不在恢复事务表中,就将其添加
// 更新事务表的LSN
for (Long transNum : tTable.keySet()) {
if (endedTransactions.contains(transNum)) continue;
if (!transactionTable.containsKey(transNum)) {
startTransaction(newTransaction.apply(transNum));
}
Pair<Transaction.Status, Long> pair = tTable.get(transNum);
TransactionTableEntry tableEntry = transactionTable.get(transNum);
Long lsn = pair.getSecond();
if (lsn >= tableEntry.lastLSN) {
tableEntry.lastLSN = lsn;
}
// 如果存档点中的事务状态领先,则要进行修改
if (judgeAdvance(pair.getFirst(), tableEntry.transaction.getStatus())) {
// 如果是aborting则需要修改为recovery_abort
if (pair.getFirst().equals(Transaction.Status.ABORTING)) {
tableEntry.transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);
} else {
tableEntry.transaction.setStatus(pair.getFirst());
}
}
}
}
}
for (Long transNum : transactionTable.keySet()) {
TransactionTableEntry tableEntry = transactionTable.get(transNum);
Transaction transaction = tableEntry.transaction;
// 这里cleanup必须在end前调用,因为end会修改状态,导致cleanup报错
if (transaction.getStatus().equals(Transaction.Status.COMMITTING)) {
transaction.cleanup();
end(transNum);
}
// 这里要注意以下顺序,abort()会将事务修改为abort状态,但我们这时候需要的是RECOVERY_ABORTING
// 因此abort方法要在前面。不过日志也理应在事务操作之前添加
if (transaction.getStatus().equals(Transaction.Status.RUNNING)) {
abort(transNum);
transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);
}
}
return;
}
Task 6: Redo
这一步进行redo操作,使得事务原本做出的修改全部生效。而进行redo的操作要满足以下条件之一:
- 是与分区有关的日志记录(AllocPart, UndoAllocPart, FreePart, UndoFreePart)。
- 分配页面的记录(AllocPage, UndoFreePage)
- 修改数据页的记录(UpdatePage, UndoUpdatePage, UndoAllocPage, FreePage),并且满足:对应的数据页处于脏页表,记录的LSN大于等于脏页表中的recLSN,数据页记录的LSN小于记录的LSN。
由于redo是从前往后进行的,并且只涉及写入操作(等于只操作脏页表),因此我们先要获取脏页表中最小的LSN。/** 从脏页表中获取recLSN最小的值 */ private long getLowestRecLSN() { return dirtyPageTable.values().stream() .sorted() .limit(1) .collect(Collectors.toList()) .get(0); }
void restartRedo() {
// TODO(proj5): implement
long lowestRecLSN = getLowestRecLSN();
Iterator<LogRecord> iterator = logManager.scanFrom(lowestRecLSN);
while (iterator.hasNext()) {
LogRecord logRecord = iterator.next();
LogType type = logRecord.getType();
if (type.equals(LogType.ALLOC_PART)
|| type.equals(LogType.FREE_PART)
|| type.equals(LogType.UNDO_FREE_PART)
|| type.equals(LogType.UNDO_ALLOC_PART)) {
logRecord.redo(this, diskSpaceManager, bufferManager);
}
if (type.equals(LogType.ALLOC_PAGE)
|| type.equals(LogType.UNDO_FREE_PAGE)) {
logRecord.redo(this, diskSpaceManager, bufferManager);
}
if (type.equals(LogType.UPDATE_PAGE)
|| type.equals(LogType.UNDO_UPDATE_PAGE)
|| type.equals(LogType.FREE_PAGE)
|| type.equals(LogType.UNDO_ALLOC_PAGE)) {
Long pageNum = logRecord.getPageNum().get();
// 指定的数据页在脏页表中,并且日志的LSN大于等于表中的recLSN
if (dirtyPageTable.containsKey(pageNum)
&& logRecord.getLSN() >= dirtyPageTable.get(pageNum)) {
Page page = bufferManager.fetchPage(new DummyLockContext(), pageNum);
try {
if (page.getPageLSN() < logRecord.getLSN()) {
logRecord.redo(this, diskSpaceManager, bufferManager);
}
} finally {
page.unpin();
}
}
}
}
return;
}
Task 7: Undo
这一部分要将被抛弃的事务做出的修改撤销。为了避免一个一个撤销事务操作带来的随机细碎的IO消耗(没撤销一个事务都要重新遍历一遍日志)。我们用优先队列存储各事务接下来的日志编号,使用一次遍历撤销所有事务。
循环撤销事务步骤如下:
- 如果该操作可以撤销,就进行撤销并且添加CLR日志。
- 如果该日志有undoNextLSN就添加改编号到队列,否则添加当前日志的prevLSN。
- 如果新的LSN编号为0,就结束该事务。
这里我们先要获取所有事务的最终日志编号,并将其与事务捆绑,放到大顶堆中。/** 获取abort状态的事务编号与lastLSN */ private List<Pair<Long, Long>> getAbortedTransactionLastLSN() { List<Pair<Long, Long>> ret = new ArrayList<>(); for (Long transNum : transactionTable.keySet()) { Transaction transaction = transactionTable.get(transNum).transaction; if (transaction.getStatus().equals(Transaction.Status.RECOVERY_ABORTING)) { ret.add(new Pair<>(transNum, transactionTable.get(transNum).lastLSN)); } } return ret; }
void restartUndo() {
// TODO(proj5): implement
PriorityQueue<Pair<Long, Long>> pq = new PriorityQueue<>((a, b) -> (int)(b.getSecond() - a.getSecond()));
pq.addAll(getAbortedTransactionLastLSN());
while (!pq.isEmpty()) {
Pair<Long, Long> pair = pq.poll();
Long transNum = pair.getFirst();
Long lastLSN = pair.getSecond();
LogRecord logRecord = logManager.fetchLogRecord(lastLSN);
TransactionTableEntry tableEntry = transactionTable.get(transNum);
if (logRecord.isUndoable()) {
LogRecord CLR = logRecord.undo(tableEntry.lastLSN);
long lsn = logManager.appendToLog(CLR);
tableEntry.lastLSN = lsn;
CLR.redo(this, diskSpaceManager, bufferManager);
}
long newLSN;
if (logRecord.getUndoNextLSN().isPresent()) {
newLSN = logRecord.getUndoNextLSN().get();
} else {
newLSN = logRecord.getPrevLSN().orElse(0L);
}
if (newLSN == 0) {
tableEntry.transaction.cleanup();
end(transNum);
continue;
}
pq.add(new Pair<>(transNum, newLSN));
}
return;
}
- 标题: rookie db
- 作者: Zephyr
- 创建于 : 2022-07-05 19:02:18
- 更新于 : 2023-01-26 12:32:42
- 链接: https://faustpromaxpx.github.io/2022/07/05/rookie-db/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。