rookie db

Zephyr Lv3

Rookie DB 实现

cs186 project实现
评价:这个project强度有点高,略微有些折磨。

B+ Tree

Task 1: LeafNode::fromBytes

本题要实现将叶子节点的信息从字节流中读取出来,首先阅读LeafNode::toBytes了解叶子节点的字节流构成

  1. 第1个字节用于表示结点类型,如果为1,代表该结点是叶子节点。
  2. 之后8个字节用于表示右兄弟结点的page id,如果为-1则表示没有兄弟结点。
  3. 然后4个字节用于表示该结点中的记录数
  4. 之后的所有字节用于表示每条记录的信息。
    只要按照以上顺序将信息解释出来即可。
    public static LeafNode fromBytes(BPlusTreeMetadata metadata, BufferManager bufferManager,
                                         LockContext treeContext, long pageNum) {
    	// TODO(proj2): implement
    	// Note: LeafNode has two constructors. To implement fromBytes be sure to
    	// use the constructor that reuses an existing page instead of fetching a
    	// brand new one.
    	Page page = bufferManager.fetchPage(treeContext, pageNum);
    	Buffer buf = page.getBuffer();
    
    	byte nodeType = buf.get();
    	assert (nodeType == (byte) 1);
    	long rightSibling = buf.getLong();
    	int size = buf.getInt();
    
    	List<DataBox> keys = new ArrayList<>();
    	List<RecordId> rids = new ArrayList<>();
    	for (int i = 0; i < size; ++i) {
    		keys.add(DataBox.fromBytes(buf, metadata.getKeySchema()));
    		rids.add(RecordId.fromBytes(buf));
    	}
    	return new LeafNode(metadata, bufferManager, page, keys, rids, Optional.ofNullable(rightSibling == -1 ? null : rightSibling), treeContext);
    }

Task 2: get, getLeftmostLeaf, put, remove

接下来就是实现LeafNode, InnerNode, and BPlusTree的基本方法

LeafNode

首先从最简单的叶子结点入手,对与叶子结点来说,无论是get还是getLeftmostLeaf都只要返回自身

 @Override
public LeafNode get(DataBox key) {
	return this;
}
// See BPlusNode.getLeftmostLeaf.
@Override
public LeafNode getLeftmostLeaf() {
	return this;
}

put()的执行逻辑大体上为:首先将传入的键值对插入有序数组中的对应位置。如果插入完成后数组的大小 < 2d (d表示B+树结点大小的一半),则直接返回。否则就要进行扩容操作。
扩容:将数组对半分成两份,第一份的大小为d,第二份为d+1,第一份的数据仍然保留在当前结点。将第二份的数据存入新建的结点中,并将该新结点作为当前结点的右兄弟。最后返回右兄弟结点中的第一条记录,用于上一级结点做范围划分。
注意这里调用的构造方法应当创建一个全新的叶子结点,而非上一问中创建指定数据页的叶子结点。

@Override
public Optional<Pair<DataBox, Long>> put(DataBox key, RecordId rid) {
	// TODO(proj2): implement

	if (getKey(key).isPresent()) {
		throw new BPlusTreeException("error");
	}
	int d = metadata.getOrder();
	int index = InnerNode.numLessThanEqual(key, keys);
	keys.add(index, key);
	rids.add(index, rid);
	if (keys.size() < 2*d+1) {
		sync();
		return Optional.empty();
	}
	List<DataBox> leftKeys = keys.subList(0, d);
	List<RecordId> leftRids = rids.subList(0, d);
	List<DataBox> rightKeys = keys.subList(d, 2*d+1);
	List<RecordId> rightRids = rids.subList(d, 2*d+1);

	LeafNode right = new LeafNode(metadata, bufferManager, rightKeys, rightRids, rightSibling, treeContext);
	this.keys = leftKeys;
	this.rids = leftRids;
	rightSibling = Optional.of(right.getPage().getPageNum());
	sync();
	return Optional.of(new Pair<DataBox, Long>(rightKeys.get(0), right.getPage().getPageNum()));
}

remove()执行逻辑也非常简单,只要找到记录在数组中的位置然后删除即可。

@Override
public void remove(DataBox key) {
	Optional<RecordId> r = getKey(key);
	if (r.isEmpty()) {
		return;
	}
	// TODO(proj2): implement
	int index = InnerNode.numLessThan(key, keys);
	keys.remove(index);
	rids.remove(index);
	sync();
	return;
}

InnerNode

接下来处理内部结点的基本方法。
get()方法需要先根据传入的key,找出该key位于哪一棵子树,然后对子树递归调用get方法直到抵达叶子节点并返回。

@Override
public LeafNode get(DataBox key) {
	// TODO(proj2): implement
	int index = numLessThanEqual(key, keys);
	BPlusNode child = getChild(index);
	return child.get(key);
}

getLeftmostLeaf()则是获取首个子树(即最左边的子树)然后递归调用getLeftmostLeaf()方法

@Override
public LeafNode getLeftmostLeaf() {
	assert(children.size() > 0);
	return getChild(0).getLeftmostLeaf();
}

put()方法首先要获取传入的参数要插入在哪一棵子树,然后递归调用put。如果返回值为空,则代表底层没有做扩容操作,直接返回。否则就要将返回的参数插入结点维护的数组,并扩充子结点。
这里的扩容操作与叶子结点大体上没有区别,唯一要注意的是,结点分裂时将数组拆成[0, d-1],[d+1, 2d]两部分,第d条记录作为本次分裂的返回值。re:叶子节点中存储的是真实的记录信息,不能丢弃,而上层结点的记录都是用于范围锁定,交给上层可以节省空间。

 @Override
public Optional<Pair<DataBox, Long>> put(DataBox key, RecordId rid) {
	// TODO(proj2): implement
	int index = numLessThanEqual(key, keys);
	BPlusNode child = getChild(index);
	Optional<Pair<DataBox, Long>> op = child.put(key, rid);
	if (op.isEmpty()) {
		return Optional.empty();
	}
	DataBox newKey = op.get().getFirst();
	Long newPage = op.get().getSecond();
	int d = metadata.getOrder();
	keys.add(index, newKey);
	// 返回的是右边的孩子,因此要放置在索引的右侧
	children.add(index + 1, newPage);

	if (keys.size() < 2*d + 1) {
		sync();
		return Optional.empty();
	}
	
	return split(d);
}

private Optional<Pair<DataBox, Long>> split(int d) {
	List<DataBox> leftKeys = keys.subList(0, d);
	List<DataBox> rightKeys = keys.subList(d + 1, 2 * d + 1);
	List<Long> leftChildren = children.subList(0, d + 1);
	List<Long> rightChildren = children.subList(d + 1, 2 * d + 2);

	DataBox retKey = keys.get(d);

	InnerNode right = new InnerNode(metadata, bufferManager, rightKeys, rightChildren, treeContext);
	this.keys = leftKeys;
	this.children = leftChildren;
	sync();
	return Optional.of(new Pair<DataBox, Long>(retKey, right.getPage().getPageNum()));
}

remove()方法的思路和上述的方法都差不多,就不多赘述。

@Override
    public void remove(DataBox key) {
        // TODO(proj2): implement
        LeafNode leaf = get(key);
        leaf.remove(key);
        sync();
        return;
    }

BPlusTree

最后是B+树的数据结构,它的基本方法的思路也没有太大差别,依旧是递归的思想。

public Optional<RecordId> get(DataBox key) {
	typecheck(key);
	// TODO(proj4_integration): Update the following line
	LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);
	// TODO(proj2): implement
	LeafNode leafNode = root.get(key);
	return leafNode.getKey(key);

}

put方法有一些微小的变化,这里先明确一个点xx.put()如果有返回值,代表的是xx出现溢出而非其子结点出现溢出,InnerNode是对子结点调用put方法,而在BPlusTree中是直接对root调用put方法,因此这里一旦出现返回值,代表的是根节点的溢出。
对于根节点的溢出,将返回值作为新的索引列表中的元素,将现有的root数据页作为新索引键的左结点,新的数据页作为右结点。

public void put(DataBox key, RecordId rid) {
	typecheck(key);
	// TODO(proj4_integration): Update the following line
	LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);

	// TODO(proj2): implement
	// Note: You should NOT update the root variable directly.
	// Use the provided updateRoot() helper method to change
	// the tree's root if the old root splits.

	Optional<Pair<DataBox, Long>> op = root.put(key, rid);
	if (op.isEmpty()) {
		return;
	}
	List<DataBox> keys = new ArrayList<>();
	List<Long> children = new ArrayList<>();
	keys.add(op.get().getFirst());
	children.add(root.getPage().getPageNum());
	children.add(op.get().getSecond());
	updateRoot(new InnerNode(metadata, bufferManager, keys, children, lockContext));
	return;
}

Task 3: Scans

这里要实现BPlusTree中的两个方法 scanAllscanGreaterEqual
在此之前需要先实现迭代器的方法,先观察一下迭代器的结构

LeafNode curNode; // 当前所处的叶子节点
Iterator<RecordId> curIterator; // 该叶子结点中记录的迭代器
byte type; // 迭代器类型
DataBox cmpKey; // 用于比较的键

对于hasNext(),如果curIterator.hasNext()返回true,则直接返回。否则就代表当前叶子结点的记录已经被遍历完了,检查是否还有右兄弟结点,没有就返回false。有就根据迭代器的类型获取指定的迭代器,然后返回迭代器的情况。

@Override
public boolean hasNext() {
	if (!curIterator.hasNext()) {
		if (curNode.getRightSibling().isPresent()) {
			curNode = curNode.getRightSibling().get();
			if (type == 0) {
				curIterator = curNode.scanAll();
			} else {
				curIterator = curNode.scanGreaterEqual(cmpKey);
			}
		} else {
			return false;
		}
	}
	return curIterator.hasNext();
}

@Override
public RecordId next() {
	return curIterator.next();
}

之后的操作就很简单了,根据方法要求,new出指定的迭代器即可。

Task 4: Bulk Load

bulk load的基本思路和普通的插入大体相同,区别在于bulk load在叶子节点的填充因子超过3/4就会分裂。同时bulk load也省略了查找插入位置的步骤,只需要不断往最后的叶子结点插入。

LeafNode

对于叶子结点来说,循环插入数据,如果超出阈值,就将最后插入的数据删除并返回。

@Override
public Optional<Pair<DataBox, Long>> bulkLoad(Iterator<Pair<DataBox, RecordId>> data,
		float fillFactor) {
	// TODO(proj2): implement
	int d = metadata.getOrder();
	float limit = 2 * d * fillFactor;
	while (data.hasNext()) {
		Pair<DataBox, RecordId> pair = data.next();
		DataBox key = pair.getFirst();
		RecordId rid = pair.getSecond();
		keys.add(key);
		rids.add(rid);
		if (keys.size() - limit <= 0.001) {
			continue;
		}
		DataBox ret = keys.remove(keys.size() - 1);
		RecordId recordId = rids.remove(rids.size() - 1);
		LeafNode right = new LeafNode(metadata, bufferManager, new ArrayList<>(List.of(ret)), new ArrayList<>(List.of(recordId)), rightSibling, treeContext);
		rightSibling = Optional.of(right.getPage().getPageNum());
		sync();
		return Optional.of(new Pair<>(ret, right.getPage().getPageNum()));
	}
	sync();
	return Optional.empty();
}

InnerNode

获取最右侧的孩子结点,并且对它进行bulk load,如果返回值为空,则代表所有记录都插入完毕。否则就代表目前的孩子结点已经满了,将新返回的结点记录到子结点数组中,更新当前子结点,并继续进行批量导入。
当该结点存储的索引键达到上限时就进行分裂操作,具体操作与上文提到的相同。

@Override
public Optional<Pair<DataBox, Long>> bulkLoad(Iterator<Pair<DataBox, RecordId>> data,
		float fillFactor) {
	// TODO(proj2): implement
	BPlusNode child = getChild(children.size() - 1);
	Optional<Pair<DataBox, Long>> op = child.bulkLoad(data, fillFactor);
	int d = metadata.getOrder();
	while (op.isPresent()) {
		DataBox newKey = op.get().getFirst();
		Long pageNum = op.get().getSecond();
		keys.add(newKey);
		children.add(pageNum);
		if (keys.size() >= 2 * d + 1) {
			return split(d);
		}
		child = getChild(children.size() - 1);
		op = child.bulkLoad(data, fillFactor);
	}
	sync();
	return Optional.empty();
}

BPlusTree

对于根节点来说,批量导入和单独插入差别并不大,只是将单一操作变成了根据插入结果判断是否继续的循环操作。分裂操作也跟上文提到的相同。

public void bulkLoad(Iterator<Pair<DataBox, RecordId>> data, float fillFactor) {
        // TODO(proj4_integration): Update the following line
        LockUtil.ensureSufficientLockHeld(lockContext, LockType.NL);

        // TODO(proj2): implement
        // Note: You should NOT update the root variable directly.
        // Use the provided updateRoot() helper method to change
        // the tree's root if the old root splits.
        Optional<Pair<DataBox, Long>> pair = root.bulkLoad(data, fillFactor);
        while (pair.isPresent()) {
            List<DataBox> keys = new ArrayList<>();
            List<Long> children = new ArrayList<>();
            keys.add(pair.get().getFirst());
            children.add(root.getPage().getPageNum());
            children.add(pair.get().getSecond());
            updateRoot(new InnerNode(metadata, bufferManager, keys, children, lockContext));
            pair = root.bulkLoad(data, fillFactor);
        }
        return;
    }

Joins and Query Optimization

Join Algorithm

Task 1:BNLJ

本阶段要实现的方法 fetchNextRecord, fetchNextLeftBlock, fetchNextRightBlock

获取左表下一部分数据块

private void fetchNextLeftBlock() {
            // 左表遍历完毕,退出
    if (!leftSourceIterator.hasNext()) {
        return;
    }
    int limit = numBuffers - 2;
    this.leftBlockIterator = getBlockIterator(leftSourceIterator, getLeftSource().getSchema(), limit);
    this.leftBlockIterator.markNext();
    leftRecord = leftBlockIterator.next();
            
}

左表可用缓冲大小为总缓冲数 - 2,每次获取新的数据块迭代器,都要标记其中的第一个数据,用于之后的回溯。

获取右表的下一部分数据

private void fetchNextRightPage() {
    if (!rightSourceIterator.hasNext()) {
        return;
    }

	this.rightPageIterator = getBlockIterator(rightSourceIterator, getRightSource().getSchema(), 1);
	this.rightPageIterator.markNext();
}

使用的缓冲大小为1,即每次只读取被驱动表的一页数据到内存进行匹配,同样地,获取到的迭代器也要进行标记。

获取下一跳满足条件的记录

private Record fetchNextRecord() {
	while (true) {
        if (rightPageIterator.hasNext()) {
            Record rightRecord = rightPageIterator.next();
            if (compare(leftRecord, rightRecord) == 0) {
                return leftRecord.concat(rightRecord);
            }
        } else if (leftBlockIterator.hasNext()) {
            leftRecord = leftBlockIterator.next();
            rightPageIterator.reset();
            rightPageIterator.markNext();
        } else if (rightSourceIterator.hasNext()) {
            fetchNextRightPage();
            leftBlockIterator.reset();
            leftBlockIterator.markNext();
            leftRecord = leftBlockIterator.next();
        } else if (leftSourceIterator.hasNext()) {
            fetchNextLeftBlock();
            rightSourceIterator.reset();
            rightSourceIterator.markNext();
            fetchNextRightPage();
        } else {
            return null;
        }
    }
}

总共有四种情况

约定: L为驱动表, R为被驱动表

case 1:right page迭代器还有值可用,表明R在内存中的内容还没有完全被探索完毕。此时继续获取迭代器中的值,并进行匹配。

case 2:R的页迭代器被耗尽,但L的块迭代器还有值,代表L当前记录已经探索完了R在内存中的记录,此时需要获取L的下一条记录,并重置R的页迭代器。

case 3:L的块迭代器耗尽,但R的源迭代器还有值,代表L放在内存中的数据已经探索完了R在内存中的记录,此时要获取新的R的数据页,并将L的块迭代器重置,开始新一轮匹配。

case 4:R的源迭代器耗尽,L的源迭代器还有值,代表L在内存中的记录已经探索完了整张R,此时获取L新的数据块,并重置R的源迭代器,开始新一轮匹配。

当L的迭代器耗尽时,匹配完毕。

错误示范:

private Record fetchNextRecord() {
    while (leftRecord != null || leftSourceIterator.hasNext()) {
        // 左表数据块迭代器与右表所有的页迭代器的循环
        while (rightSourceIterator.hasNext() || rightPageIterator.hasNext()) {
            // 针对某一块左表记录的循环
            while (leftRecord != null) {
                // 针对某一左表记录的循环
                // 扫描右表当前页的记录,寻找匹配
                while (rightPageIterator.hasNext()) {
                    Record rightRecord = rightPageIterator.next();
                    if (compare(this.leftRecord, rightRecord) == 0) {
                    	return leftRecord.concat(rightRecord);
                    }
                }
                // 换到下一个左表记录
                leftRecord = leftBlockIterator.hasNext() ? leftBlockIterator.next() : null;
                // 重置右表迭代器
                rightPageIterator.reset();
                rightPageIterator.markNext();
            }
            // 重置左表迭代器
            leftBlockIterator.reset();
            leftBlockIterator.markNext();
            leftRecord = leftBlockIterator.hasNext() ? leftBlockIterator.next() : null;
            // 获取新的右表迭代器
            fetchNextRightPage();
            }
        // 获取新的左表块迭代器
        fetchNextLeftBlock();
        // 重置整个右表迭代器
        rightSourceIterator.reset();
        rightSourceIterator.markNext();
        // 重新获取右表迭代器
        fetchNextRightPage();
    }
    return null;
}

上方的写法看上去差不多,但是如果被驱动表被读取完毕,而驱动表块迭代器没有迭代完,就会直接去fetch下一个驱动表块迭代器
其判断逻辑为:先看驱动表有没有剩余数据,再看被驱动表有无剩余数据,然后检查驱动表块有无剩余数据,最后匹配,与题目要求恰好相反。因为被驱动表必定会先被读完,因此每块只有第一个数据能完整探索被驱动表

Task 2:GHJ

本阶段要实现partition, buildAndProbe, run以及思考SHJ与GHJ失效的可能情况。

分区实现

比较简单,根据记录参与join的字段以及当前分区的轮次生成对应的哈希值,在哈希对应的分区下添加记录。

private void partition(Partition[] partitions, Iterable<Record> records, boolean left, int pass) {

    for (Record record : records) {
        int columnIndex = left ? getLeftColumnIndex() : getRightColumnIndex();
        DataBox columnValue = record.getValue(columnIndex);
        int hash = HashFunc.hashDataBox(columnValue, pass);

        int index = hash % partitions.length;
        if (index < 0) {
        	index += partitions.length;
        }
        partitions[index].add(record);
    }
    return;
}

左右分区的连接实现

首先选择驱动表,只有当左右分区中有一个分区的数据页数小于缓冲区数-2(剩下两个缓冲区,一个用于输入被驱动表,一个输出结果),该方法才能进行。

然后将驱动表的数据哈希过后写入哈希表中。然后从被驱动表中读取数据,探索整张哈希表进行匹配。

private void buildAndProbe(Partition leftPartition, Partition rightPartition) {
	// true if the probe records come from the left partition, false otherwise
	boolean probeFirst;
    // We'll build our in memory hash table with these records
    Iterable<Record> buildRecords;
    // We'll probe the table with these records
    Iterable<Record> probeRecords;
    // The index of the join column for the build records
    int buildColumnIndex;
    // The index of the join column for the probe records
    int probeColumnIndex;

	if (leftPartition.getNumPages() <= this.numBuffers - 2) {
		buildRecords = leftPartition;
        buildColumnIndex = getLeftColumnIndex();
        probeRecords = rightPartition;
        probeColumnIndex = getRightColumnIndex();
        probeFirst = false;
    } else if (rightPartition.getNumPages() <= this.numBuffers - 2) {
        buildRecords = rightPartition;
        buildColumnIndex = getRightColumnIndex();
        probeRecords = leftPartition;
        probeColumnIndex = getLeftColumnIndex();
        probeFirst = true;
    } else {
        throw new IllegalArgumentException(
        "Neither the left nor the right records in this partition " +
        "fit in B-2 pages of memory."
        );
    }

    Map<DataBox, List<Record>> hashTable = new HashMap<>();
    for (Record buildRecord : buildRecords) {
        DataBox buildJoinValue = buildRecord.getValue(buildColumnIndex);
        if (!hashTable.containsKey(buildJoinValue)) {
        	hashTable.put(buildJoinValue, new ArrayList<>());
        }
        hashTable.get(buildJoinValue).add(buildRecord);
    }

    for (Record probeRecord : probeRecords) {
        DataBox probeJoinValue = probeRecord.getValue(probeColumnIndex);
        if (!hashTable.containsKey(probeJoinValue)) {
        	continue;
        }
        List<Record> matchRecords = hashTable.get(probeJoinValue);
        for (Record matchRecord : matchRecords) {
            Record record;
            if (!probeFirst) {
                record = matchRecord.concat(probeRecord);
            } else {
                record = probeRecord.concat(matchRecord);
            }
            this.joinedRecords.add(record);
        }
    }
}

运行实现

首先获取左右表的分区,在遍历分区时,两张表共用下标索引(分区使用的是同一个哈希函数,如果左右表分区数不一致,那可以直接判断不一致的分区必然不存在匹配,因此不必考虑)。

如果两个分区有任意一个满足探索条件,即可调用对应方法,否则递归调用run,将这两个分区进行进一步的划分然后连接。

private void run(Iterable<Record> leftRecords, Iterable<Record> rightRecords, int pass) {
    assert pass >= 1;
    if (pass > 5) 
        throw new IllegalStateException("Reached the max number of passes");

    // Create empty partitions
    Partition[] leftPartitions = createPartitions(true);
    Partition[] rightPartitions = createPartitions(false);

    // Partition records into left and right
    this.partition(leftPartitions, leftRecords, true, pass);
    this.partition(rightPartitions, rightRecords, false, pass);

    for (int i = 0; i < leftPartitions.length; i++) {
        Partition leftPartition = leftPartitions[i];
        Partition rightPartition = rightPartitions[i];
        int limit = this.numBuffers - 2;
        if (leftPartition.getNumPages() <= limit || rightPartition.getNumPages() <= limit) {
        	buildAndProbe(leftPartition, rightPartition);
        } else {
        	run(leftPartition, rightPartition, pass + 1);
        }
    }
}

在有SHJ参照的情况下GHJ的实现可以说相当简单。

SHJ失效,GHJ保持有效的情况

SHJ在驱动表的分区大小大于B-2时会失效,因此只要驱动表中有足够多的冗余数据,便可使其失效。

但由于GHJ要保持有效,因此pass不能过深,保持被驱动表的大小满足条件即可。

由题知,缓冲区大小为6,每个数据页存储8个数据,因此只需要往一个分区中填入(6-2)*8+1=33个数据即可

public static Pair<List<Record>, List<Record>> getBreakSHJInputs() {
    ArrayList<Record> leftRecords = new ArrayList<>();
    ArrayList<Record> rightRecords = new ArrayList<>();
    
    for(int i = 0; i < 33; i++) {
        leftRecords.add(createRecord(1));
    }
    // GHJ只要有一张表的分区数据页小于B-2就不会加深pass
    // 因此这里在右表只放1个数据,确保不需要多次递归
    rightRecords.add(createRecord(1));
    return new Pair<>(leftRecords, rightRecords);
}

GHJ失效

GHJ的生效条件是两个分区中有一个满足条件,且pass不能过大,因此让其失效只要破坏这两个条件即可。在本题中,只需让被驱动表也不满足探索条件,且有大量冗余数据即可完成。

因为冗余数据会带来不断的无效分区,导致pass数急速上升。

public static Pair<List<Record>, List<Record>> getBreakGHJInputs() {
    ArrayList<Record> leftRecords = new ArrayList<>();
    ArrayList<Record> rightRecords = new ArrayList<>();
    // 首先同样要让表数据页大小超过缓冲区数-2
    // 但在本题中,但要让GHJ提高pass,必须让两张表都满足上述条件
    // 而对于GHJ来说,如果表中均为冗余数据,则会不断造成无效分区,pass急速增加
    // 因此,在两表中添加大量冗余数据即可达成目标。
    for(int i = 0; i < 33; i++) {
        leftRecords.add(createRecord(1));
        rightRecords.add(createRecord(1));
    }
    return new Pair<>(leftRecords, rightRecords);
}

Task 3:External Sort

本题中要实现sortRun, mergeSortedRuns, mergePass, and sort

sortRun

pass0的操作,将排序段内的数据排序,实现很简单。

public Run sortRun(Iterator<Record> records) {
    List<Record> recordList = new ArrayList<>();
    while (records.hasNext()) {
        recordList.add(records.next());
    }
    recordList.sort(comparator);
    return makeRun(recordList);
}

mergeSortedRuns

对一系列有序的排序段做归并操作

此处使用优先队列来提取要进入输出排序段的记录,同时在优先队列中,每条记录以<Record, Integer>的形式存储,整型数追踪该记录来源的排序段,每当一个记录被送到输出段,就从该记录来源的排序段再提取一个记录放入队列。

public Run mergeSortedRuns(List<Run> runs) {
    assert (runs.size() <= this.numBuffers - 1);
    // TODO(proj3_part/1): implement
    PriorityQueue<Pair<Record, Integer>> pq = new PriorityQueue<>(new RecordPairComparat
    List<Iterator<Record>> iterators = new ArrayList<>();
    // 先获取每一个排序段的迭代器
    for (Run run : runs) {
        iterators.add(run.iterator());
    }
    int i = 0;
    // 获取每个排序段中的最小元素,并将其填入优先队列
    for (Iterator<Record> iterator : iterators) {
        if (iterator.hasNext()) {
            pq.add(new Pair<>(iterator.next(), i));
        }
        i++;
    }
    List<Record> output = new ArrayList<>();
    while (!pq.isEmpty()) {
        Pair<Record, Integer> pair = pq.poll();
        Record record = pair.getFirst();
        output.add(record);
        Iterator<Record> it = iterators.get(pair.getSecond());
        if (it.hasNext()) {
            pq.add(new Pair<>(it.next(), pair.getSecond()));
        }
    }
    return makeRun(output);
}

mergePass

一轮pass要做的归并操作,将排序段B-1个一组进行归并,生成一个新的有序排序段

主要工作为计算归并次数并给排序段分组,实现也很简单。

public List<Run> mergePass(List<Run> runs) {
    List<Run> output = new ArrayList<>();
    // 每次归并都有n-1个缓冲区用于输入,1个缓冲区用于输出
    int runsNum = numBuffers - 1;
    // 计算归并次数
    int N = (int) Math.ceil(runs.size() / runsNum);
    for (int i = 0; i < N; i++) {
        // 划分出与缓冲区大小相同的run
        List<Run> curRuns;
        if ((i + 1) * runsNum > runs.size()) {
            curRuns = runs.subList(i * runsNum, runs.size());
        } else {
            curRuns = runs.subList(i * runsNum, (i + 1) * runsNum);
        }
        // 调用归并有序排序段方法
        Run out = mergeSortedRuns(curRuns);
        output.add(out);
    }
    return output;
}

sort

外部排序的驱动方法,只需要先按缓冲区大小划分出排序段,然后为每个排序段排序(调用sortRun),再不断归并,直到只剩下一个排序段。

public Run sort() {
    // Iterator over the records of the relation we want to sort
    Iterator<Record> sourceIterator = getSource().iterator();
    List<Iterator<Record>> initialStates = new ArrayList<>();
    while (sourceIterator.hasNext()) {
        // 将程序划分为若干个排序段,每一个排序段的大小与缓冲区一致
        BacktrackingIterator<Record> it = getBlockIterator(sourceI
        initialStates.add(it);
    }
    List<Run> runs = new ArrayList<>();
    // 对每一个排序段进行排序
    for (Iterator<Record> state : initialStates) {
        Run run = sortRun(state);
        runs.add(run);
    }
    // 不断归并,直到列表中只剩下唯一一个排序段
    while (runs.size() > 1) {
        runs = mergePass(runs);
    }
    assert runs.size() == 1;
    return runs.get(0); 
}

Task 4:Sort Merge Join

本题要实现该Join方法的迭代器的获取下一条记录的部分

用marked来标记当前是处于匹配阶段还是寻找阶段。

若处于匹配阶段,则不断移动左右迭代器,保证每一次移动都能确保最后左表记录<=右表。然后标记右迭代器当前元素,开始进入匹配阶段。

如果左右记录相等,则匹配成功,继续移动右表记录,若右表无记录可用,则移动左表,并重置右表,重新计入寻找阶段。

如果左右记录不等,代表匹配阶段结束,重置右表并移动左表,重新进入寻找阶段。

private Record fetchNextRecord() {
	if (leftRecord == null) {
    	return null;
	}
	while (leftRecord != null) {
        if (!marked) {
            while (compare(leftRecord, rightRecord) < 0) {
                if (leftIterator.hasNext()) {
                    leftRecord = leftIterator.next();
                } else {
                    // 若左表已经用完,则代表join结束,直接终止
                    leftRecord = null;
                    return null;
                }
            }
            while (compare(leftRecord, rightRecord) > 0) {
                if (rightIterator.hasNext()) {
                    rightRecord = rightIterator.next();
                } else {
                    // 若右表记录恒小于左表当前记录,则代表左表自此之后的记录无法匹配,也可终止。
                    leftRecord = null;
                    return null;
                }
            }
            marked = true;
            rightIterator.markPrev();
    	}
        if (compare(leftRecord, rightRecord) == 0) {
            Record ret = leftRecord.concat(rightRecord);
            if (rightIterator.hasNext()) {
                rightRecord = rightIterator.next();
            } else {
                // 若右表记录已经用完,则代表左表当前记录已经完成匹配,前进左表,并重置右表
                if (leftIterator.hasNext()) {
                    marked = false;
                    rightIterator.reset();
                    rightRecord = rightIterator.next();
                    leftRecord = leftIterator.next();
                } else {
                    // 左表耗尽,停止
                    leftRecord = null;
                }
            }
            return ret;
        } else {
            // 左右表记录不匹配,重置右表后,重新进入查询阶段
            marked = false;
            rightIterator.reset();
            rightRecord = rightIterator.next();
            if (leftIterator.hasNext()) {
                leftRecord = leftIterator.next();
            } else {
                leftRecord = null;
                return null;
            }
        }
    }
    return null;
}

Query Optimization

Task 5: Single Table Access Selection (Pass 1)

读取算法选择,在顺序遍历,以及索引遍历两种方法中,选取IO开销最小的来读取指定的表。

public QueryOperator minCostSingleAccess(String table) {
    QueryOperator minOp = new SequentialScanOperator(this.transaction, table);

    // 1. 获取sequential scan的io开销
    int cost = minOp.estimateIOCost();
    // 2. 获取这张表上的索引
    List<Integer> indexColumns = getEligibleIndexColumns(table);
    // 在使用索引查询时,标记此刻使用的索引
    int skipIndex = -1;
    // 遍历所有的索引
    for (Integer indexColumn : indexColumns) {
        SelectPredicate pre = selectPredicates.get(indexColumn);
        // 新建一个索引查找并估算它的io开销,与现有最小的io开销对比
        IndexScanOperator indexScanOperator = new IndexScanOperator(transaction, table, pre.column, pre.operator, pre.value);
        int ioCost = indexScanOperator.estimateIOCost();
        if (ioCost < cost) {
            minOp = indexScanOperator;
            cost = ioCost;
            skipIndex = indexColumn;
        }
    }
    // push down所有的除索引外的选择,因为可能已经有了用户索引扫描的谓词,再下放没有意义
    minOp = addEligibleSelections(minOp, skipIndex);
    return minOp;
}

Task 6: Join Selection (Pass i > 1)

此阶段要进行join操作,由于使用的是左深树,因此本题中使用prevMap,以及pass1Map存储表与对应操作的映射。

prevMap<Set<String>, QueryOperator>:存储上一轮pass的join结果,字符串集合用于存储已经被拼接在一起的表。QueryOperator存储最后一次操作的操作符。

pass1Map<Set<String>, QueryOperator>:存储第一轮读取表的结果,集合中只有一个表名,操作符代表读取这张表的操作符。

解题思路:

针对prevMap中的每个表集合,如果包含某个join操作需要用到的表,且该join操作的另一张表尚未被拼接进来,便寻找本次join开销最小的算法,并加入结果集合中。

注意如果是左表还没有被join的情况,则在调用minCostJoinType时要交换左右表参数,避免违背左深树原则。(debug 4小时的血泪教训)

public Map<Set<String>, QueryOperator> minCostJoins(
    Map<Set<String>, QueryOperator> prevMap,
    Map<Set<String>, QueryOperator> pass1Map) {
    Map<Set<String>, QueryOperator> result = new HashMap<>();

    // pass1Map存储的是一张单一的表和它的遍历方式
    // 遍历已经做过join操作的表,即左深树的左边结点
    for (Set<String> tables : prevMap.keySet()) {
        // 遍历join谓词
        for (JoinPredicate joinPredicate : joinPredicates) {
            String leftColumn = joinPredicate.leftColumn;
            String leftTable = joinPredicate.leftTable;
            String rightColumn = joinPredicate.rightColumn;
            String rightTable = joinPredicate.rightTable;
            QueryOperator leftOp = null;
            QueryOperator rightOp = null;
            HashSet<String> set = new HashSet<>(tables);
            QueryOperator op = null;

            // 如果右表为还没有被join的表,则将其从pass1Map中提取出来
            if (tables.contains(leftTable) && !tables.contains(rightTable)) {
                leftOp = prevMap.get(tables);
                for (Set<String> rTables : pass1Map.keySet()) {
                    if (rTables.contains(rightTable)) {
                        rightOp = pass1Map.get(rTables);
                    }
                }
                set.add(rightTable);
                op = minCostJoinType(leftOp, rightOp, leftColumn, rightColumn);
            } else if (!tables.contains(leftTable) && tables.contains(rightTable)) {
                rightOp = prevMap.get(tables);
                for (Set<String> lTables : pass1Map.keySet()) {
                    if (lTables.contains(leftTable)) {
                        leftOp = pass1Map.get(lTables);
                    }
                }
                set.add(leftTable);
                op = minCostJoinType(rightOp, leftOp, rightColumn, leftColumn);
            } else {
                continue;
            }
            //                QueryOperator op = minCostJoinType(leftOp, rightOp, leftColumn, rightColumn);
            //                System.out.println(op.toString());
            int oldCost = Integer.MAX_VALUE;
            if (result.containsKey(set)) {
                QueryOperator oldOp = result.get(set);
                oldCost = oldOp.estimateIOCost();
            }
            if (op.estimateIOCost() < oldCost) {
                result.put(set, op);
            }
        }
    }
    return result;
}

Task 7: Optimal Plan Selection

外部驱动方法实现

首先获取读取数据页的最快速方法,接着循环调用Join,直到结果集中只剩下一个元素。

本题dp思想的体现:每一轮Pass,都会计算出拼接i张表需要的最小IO开销,因此在Pass i+1 时,只需要求解出进行本次join最小的开销即可。例如:在Pass 2中求出 t1,t2,t3 | t1,t2,t4的最小开销,则在Pass 3时会计算Pass 2中的两种情况,从哪一种可以用更低的开销实现四表拼接。

public Iterator<Record> execute() {
    this.transaction.setAliasMap(this.aliases);
    
    Map<Set<String>, QueryOperator> pass1Map = new HashMap<>();
    for (String tableName : tableNames) {
        QueryOperator op = minCostSingleAccess(tableName);
        Set<String> tables = new HashSet<>();
        tables.add(tableName);
        pass1Map.put(tables, op);
    }
    
    Map<Set<String>, QueryOperator> prevMap = Map.copyOf(pass1Map);
    while (prevMap.size() != 1) {
        prevMap = minCostJoins(prevMap, pass1Map);
    }

    // Set the final operator to the lowest cost operator from the last
    // pass, add group by, project, sort and limit operators, and return an
    // iterator over the final operator.
    this.finalOperator = minCostOperator(prevMap);

    this.addGroupBy();
    this.addProject();
    this.addSort();
    this.addLimit();
    return finalOperator.iterator();
}

Concurrency

Queuing

Task1:LockType

首先要实现数据库对各种锁类型的支持,需要实现的方法非常简单。
首先是判断两种锁类型是否兼容。有手就行

public static boolean compatible(LockType a, LockType b) {
	if (a == null || b == null) {
		throw new NullPointerException("null lock type");
	}
	if (a == LockType.NL || b == LockType.NL) {
		return true;
	}
	if (a == LockType.X || b == LockType.X) {
		return false;
	}
	if ((a == LockType.SIX || b == LockType.SIX) && a != b) {
		return false;
	}
	if ((a == LockType.S && b == LockType.IX) || (a == LockType.IX && b == LockType.S)) {
		return false;
	}
	return true;
}

然后要实现 判断某一种锁是否能被另一把锁替代 的方法,也非常简单

public static boolean substitutable(LockType substitute, LockType required) {
	if (required == null || substitute == null) {
		throw new NullPointerException("null lock type");
	}
	// TODO(proj4_part1): implement
	if (required == substitute) {
		return true;
	}
	if (required == LockType.X) {
		return false;
	}
	if (required == LockType.SIX && LockType.X != substitute) {
		return false;
	}
	if (required == LockType.S && LockType.SIX != substitute 
	&& LockType.X != substitute) {
		return false;
	}
	if (required == LockType.IX && (substitute == LockType.IS || substitute == LockType.NL)) {
		return false;
	}
	if (required == LockType.IS && substitute == LockType.NL) {
		return false;
	}

	return true;
}

最后实现 判断某一把锁是否可以作为另一把锁的父级锁
先获取对各类锁来说最基本的父级锁。然后调用上一问实现的方法,判断提供的parentLockType是否为基本的父级锁类型,而基本类型的替代类型也可以返回true。

public static boolean canBeParentLock(LockType parentLockType, LockType childLockType) {  
    if (parentLockType == null || childLockType == null) {  
        throw new NullPointerException("null lock type");  
    }  
    return substitutable(parentLockType, parentLock(childLockType));  
}

// 提供的辅助方法
public static LockType parentLock(LockType a) {  
    if (a == null) {  
        throw new NullPointerException("null lock type");  
    }  
    switch (a) {  
    case S: return IS;  
    case X: return IX;  
    case IS: return IS;  
    case IX: return IX;  
    case SIX: return IX;  
    case NL: return NL;  
    default: throw new UnsupportedOperationException("bad lock type");  
    }  
}

Task 2: LockManager

本题中主要实现以下方法acquireAndRelease(), acquire(), release(), promote()
LockManager负责管理所有的锁对象,以及各种资源的分配。该级别负责排队逻辑,根据需要阻塞/解除阻塞事务,并且判断事务是否具有某个锁的来源。

在实现LockManager的主要方法之前,先实现资源管理类的有关方法。

首先是检查当前事务向该资源添加的锁是否兼容。有手就行。

public boolean checkCompatible(LockType lockType, long except) {
	for (Lock lock : locks) {
		if (lock.transactionNum == except) {
			continue;
		}
		if (!LockType.compatible(lock.lockType, lockType)) {
			return false;
		}
	}
	return true;
}

接下来是向该资源上锁,或是修改锁的类型。
实现思路:遍历当前资源中的锁,如果有事务编号和插入锁一致的,就代表该事务原本就持有涉及该资源的锁(由于每个事务最多给一个资源上一把锁,因此这里我们可以用事务编号来查找当前事务是否持有该资源的锁)。如果已经持有锁,那么就修改锁的类型之后返回。否则就进行插入操作。

public void grantOrUpdateLock(Lock lock) {
	// 如果这把锁对应的事务已经在该资源上有了锁,就进行更新
	for (Lock existLock : locks) {
		if (existLock.transactionNum.equals(lock.transactionNum)) {
			existLock.lockType = lock.lockType;
			return;
		}
	}
	// 否则就是新建锁,但由于该事务可能是头回加入,所以要进行额外的处理
	locks.add(lock);
	if (transactionLocks.containsKey(lock.transactionNum)) {
		transactionLocks.get(lock.transactionNum).add(lock);
	} else {
		ArrayList<Lock> tLocks = new ArrayList<>();
		tLocks.add(lock);
		transactionLocks.put(lock.transactionNum, tLocks);
	}
}

将被阻塞的锁请求添加到阻塞队列中。需要注意的只有一点,在将请求添加到队列之后,要让涉及的事务做好阻塞准备(题目要求)。

public void addToQueue(LockRequest request, boolean addFront) {
	if (addFront) {
		waitingQueue.addFirst(request);
	} else {
		waitingQueue.addLast(request);
	}
	request.transaction.prepareBlock();
	return;
}

接着实现阻塞队列释放的方法
实现思路:不断获取下一条请求,直至阻塞队列为空或者需要的锁仍然无法被获取。对于可以实现的请求,将需要的锁授予对应的事务,并解除事务的阻塞状态。如果该请求中还包含要释放的锁,则在此处调用对应的释放方法。

private void processQueue() {
	Iterator<LockRequest> requests = waitingQueue.iterator();
	while (requests.hasNext()) {
		LockRequest req = requests.next();
		if (checkCompatible(req.lock.lockType, req.transaction.getTransNum())) {
			if (LockType.substitutable(req.lock.lockType, 
			getTransactionLockType(req.transaction.getTransNum()))
			) {
				grantOrUpdateLock(req.lock);
				if (req.releasedLocks != null && !req.releasedLocks.isEmpty()) {
					for (Lock releasedLock : req.releasedLocks) {
						ResourceEntry resourceEntry = getResourceEntry(releasedLock.name);
						resourceEntry.releaseLock(releasedLock);
					}
				}
			}
			waitingQueue.remove(req);
			req.transaction.unblock();
		} else {
			return;
		}
	}
	return;
}

释放锁的方法。将锁从当前资源以及指定事务的锁集合中移除,如果事务已经不再持有锁,就将该事务从锁记录中移除,避免OOM。然后释放队列中的请求。

public void releaseLock(Lock lock) {
	locks.remove(lock);
	transactionLocks.get(lock.transactionNum).remove(lock);
	// 如果事务内已经没有锁,就释放空间防止OOM
	if (transactionLocks.get(lock.transactionNum).isEmpty()) {
		transactionLocks.remove(lock.transactionNum);
	}
	processQueue();
	return;
}

获取指定资源中指定事务的锁类型。非常简单。

public LockType getTransactionLockType(long transaction) {
	
	for (Lock lock : locks) {
		if (lock.transactionNum.equals(transaction)) {
			return lock.lockType;
		}
	}
	return LockType.NL;
}

有了以上基础,后面的实现就比较简单了,4个方法的实现同质化比较严重。
acquireAndRelease()实现,用同步代码块包裹涉及锁请求的操作,避免并发请求锁带来问题。
先进行异常状态的处理,如果要获取的锁类型和当前已经持有的相同则抛出异常,避免处理无用请求。接着,如果要释放的锁根本不存在,则抛出异常,期间可以收集一会儿要释放的锁,存储到集合中。
异常处理完毕后,检查锁的兼容性,如果不兼容,则代表当前请求需要被阻塞,修改标记变量,将请求添加到队列头部(高优先级操作),该请求中应当额外包含当锁被请求到时,要释放的锁列表。如果不需要阻塞,则直接授予事务对应的锁,并释放指定的锁。

public void acquireAndRelease(TransactionContext transaction, ResourceName name,
                                  LockType lockType, List<ResourceName> releaseNames)
            throws DuplicateLockRequestException, NoLockHeldException {
	boolean shouldBlock = false;
	synchronized (this) {
		LockType exist = getLockType(transaction, name);
		ResourceEntry resourceEntry = getResourceEntry(name);
		if (exist == lockType) {
			throw new DuplicateLockRequestException("duplicate request");
		}
		List<Lock> releaseLocks = new ArrayList<>();
		boolean noLock = false;
		for (ResourceName releaseName : releaseNames) {
			LockType type = getLockType(transaction, releaseName);
			if (type == LockType.NL) {
				noLock = true;
			} else {
				releaseLocks.add(new Lock(releaseName, type, transaction.getTransNum()));
			}
		}
		if (noLock) {
			throw new NoLockHeldException("no lock");
		}

		Lock newLock = new Lock(name, lockType, transaction.getTransNum());
		if (!resourceEntry.checkCompatible(lockType, transaction.getTransNum())) {
			shouldBlock = true;
			LockRequest req = new LockRequest(transaction, newLock, releaseLocks);
			resourceEntry.addToQueue(req, true);
		} else {
			resourceEntry.grantOrUpdateLock(newLock);
			for (Lock releaseLock : releaseLocks) {
				getResourceEntry(releaseLock.name).releaseLock(releaseLock);
			}
		}
	}
	if (shouldBlock) {
		transaction.block();
	}
}

acquire的实现和acquireAndRelease没有太大区别,仅仅只是不包含释放锁的列表,以及阻塞时将请求插在队尾。不过由于该方法的优先级不高,请求插入在队尾,因此除了锁不兼容,当阻塞队列中有内容时也会被阻塞。

public void acquire(TransactionContext transaction, ResourceName name,  
                    LockType lockType) throws DuplicateLockRequestException {  
    boolean shouldBlock = false;  
    synchronized (this) {  
        LockType exist = getLockType(transaction, name);  
        ResourceEntry resourceEntry = getResourceEntry(name);  
        if (exist == lockType) {  
            throw new DuplicateLockRequestException("duplicate request");  
        }  
        Lock newLock = new Lock(name, lockType, transaction.getTransNum());  
        if (!resourceEntry.checkCompatible(lockType, transaction.getTransNum()) 
		        || !resourceEntry.waitingQueue.isEmpty()) {  
            shouldBlock = true;  
            LockRequest req = new LockRequest(transaction, newLock);  
            resourceEntry.addToQueue(req, false);  
        } else {  
            resourceEntry.grantOrUpdateLock(newLock);  
        }  
    }  
    if (shouldBlock) {  
        transaction.block();  
    }  
}

release方法的实现非常简单,没什么好说的。

public void release(TransactionContext transaction, ResourceName name)  
        throws NoLockHeldException {    
    synchronized (this) {  
        LockType exist = getLockType(transaction, name);  
        if (exist == LockType.NL) {  
            throw new NoLockHeldException("no lock");  
        }  
        ResourceEntry resourceEntry = getResourceEntry(name);  
        resourceEntry.releaseLock(new Lock(name, exist, transaction.getTransNum()));  
    }  
}

promote方法用于更新锁的类型,属于高优先级操作,阻塞时将请求添加到队首,处理异常时要额外处理锁类型非法的情况,即当新的所类型不是原来锁类型的加强版时,抛出异常。

public void promote(TransactionContext transaction, ResourceName name,  
                    LockType newLockType)  
        throws DuplicateLockRequestException, NoLockHeldException, InvalidLockException {   
    boolean shouldBlock = false;  
    synchronized (this) {  
        LockType exist = getLockType(transaction, name);  
        ResourceEntry resourceEntry = getResourceEntry(name);  
        if (exist == newLockType) {  
            throw new DuplicateLockRequestException("duplicate request");  
        } else if (exist == LockType.NL) {  
            throw new NoLockHeldException("no lock");  
        } else if (!LockType.substitutable(newLockType, exist)) {  
            throw new InvalidLockException("invalid lock type");  
        }  
        Lock newLock = new Lock(name, newLockType, transaction.getTransNum());  
        if (!resourceEntry.checkCompatible(newLockType, transaction.getTransNum())) {  
            shouldBlock = true;  
            LockRequest req = new LockRequest(transaction, newLock);  
            resourceEntry.addToQueue(req, true);  
        } else {  
            resourceEntry.grantOrUpdateLock(newLock);  
        }  
    }  
    if (shouldBlock) {  
        transaction.block();  
    }  
}

Multigranularity

Task 3: LockContext

本题中要实现LockContext的有关方法,它在本数据库架构中负责对指定的某一资源进行操作,也就是树状结构中的结点。
接下来我们要实现acquire(), promote(), release(), escalte(), getExplicitLockType(), getEffectiveLockType()

在实现涉及锁操作的方法前,先实现有关的获取锁类型的方法会更高效一些。

getExplicitLockType()就是获取当前context中,对应事务的锁类型,可以直接调用之前在LockManager中实现的方法实现。

public LockType getExplicitLockType(TransactionContext transaction) {  
    if (transaction == null) return LockType.NL;  
    return lockman.getLockType(transaction, name);  
}

getEffectiveLockType()首先会获取当前上下文中对应事务的锁类型,操作和getExplicitLockType()无异。如果当前上下文中没有锁,就开始考虑受到父节点影响的,隐式的锁类型(例如父节点获取了SIX锁,则相当于子节点都隐式地获取了S锁),需要注意的是,意向锁并不会隐式传递给子节点。
需要额外处理的就是SIX锁与意向锁。

public LockType getEffectiveLockType(TransactionContext transaction) {  
    if (transaction == null) return LockType.NL;  
    LockType lockType = LockType.NL;  
    LockContext ctx = this;  
    do {  
        lockType = lockman.getLockType(transaction, ctx.name);  
        if (lockType == LockType.SIX) {  
            return LockType.S;  
        }  
        if (lockType.isIntent()) {  
            lockType = LockType.NL;  
        }  
        ctx = ctx.parent;  
    } while (ctx != null && lockType == LockType.NL);  
    return lockType;  
}

hasSIXAncestor()会判断当前上下文的祖先结点是否持有SIX锁,实现与getEffectiveLockType()仅在循环中的判断。

private boolean hasSIXAncestor(TransactionContext transaction) {   
    LockType lockType = LockType.NL;  
    LockContext ctx = this;  
    do {  
        lockType = lockman.getLockType(transaction, ctx.name);  
        if (lockType == LockType.SIX) {  
            return true;  
        }  
        ctx = ctx.parent;  
    } while (ctx != null && lockType == LockType.NL);  
    return false;  
}

实现完锁类型获取方法后,我们再实现操作子结点锁数量的方法。由于锁的一部分操作需要子结点锁数量的记录,因此必须确保相关操作后,对应的数量都能实时更新。
因为数据库的层级结构为树状结构,所以很容易能想到要使用递归来处理。递归的终止条件为到达根节点,期间将每一层结点记录的子结点锁数量修改成指定的值。注意在添加时,事务的锁信息可能还未曾被context记录过,因此使用getOrDefault()

private void increaseChildLockNum(TransactionContext transaction, LockContext parent, int delta) {  
    if (parent == null) return;  
    Integer num = parent.numChildLocks.getOrDefault(transaction.getTransNum(), 0);  
    num += delta;  
    parent.numChildLocks.put(transaction.getTransNum(), num);  
    increaseChildLockNum(transaction, parent.parent, delta);  
}  
  
private void decreaseChildLockNum(TransactionContext transaction, LockContext parent, int delta) {  
    if (parent == null) return;  
    Integer num = parent.numChildLocks.getOrDefault(transaction.getTransNum(), 0);  
    if (num == 0) return;  
    num -= delta;  
    parent.numChildLocks.put(transaction.getTransNum(), num);  
    decreaseChildLockNum(transaction, parent.parent, delta);  
}

有了以上辅助方法,可以开始实现一部分锁操作方法。
acquire()要处理的异常有:

  1. 只读资源被尝试上锁
  2. 重复请求同类型锁
  3. 祖先已持有SIX锁,仍旧尝试请求S锁或IS锁,带来冗余。
  4. 获取的新锁无法将已有的锁作为前置,例如,无法从IS锁获取到X锁。
    前3种非常好处理,最后一种要使用之前在LockType实现的canBeParentLock()方法来判断新锁能否将旧锁作为前置,注意要剔除没有锁的情况,无锁肯定不满足成为任意一把锁的父级锁的条件,但能作为任何一把锁的前置。初次之外,由于getEffectiveLockType()方法会将意向锁处理为无锁,因此需要再添加一个帮助方法,它会获取离自己最近的一把锁作为影响锁。
private LockType acquireEffectiveLockType(TransactionContext transaction) {  
    if (transaction == null) return LockType.NL;  
    LockType lockType = LockType.NL;  
    LockContext ctx = this;  
    do {  
        lockType = lockman.getLockType(transaction, ctx.name);  
        ctx = ctx.parent;  
    } while (ctx != null && lockType == LockType.NL);  
    return lockType;  
}
public void acquire(TransactionContext transaction, LockType lockType)  
        throws InvalidLockException, DuplicateLockRequestException {  
    if (readonly) {  
        throw new UnsupportedOperationException("the context is readonly");  
    }  
    LockType existLock = acquireEffectiveLockType(transaction);  
    System.out.println(existLock);  
    if ((lockType == LockType.S || lockType == LockType.IS) && hasSIXAncestor(transaction)) {  
        throw new InvalidLockException("invalid request");  
    }  
    if (existLock != LockType.NL && !LockType.canBeParentLock(existLock, lockType)) {  
        throw new InvalidLockException("invalid lock");  
    }  
    lockman.acquire(transaction, name, lockType);  
    increaseChildLockNum(transaction, parent, 1);  
    return;  
}

release()会释放指定事务在当前资源持有的锁,他要处理的异常有:

  1. 要释放的锁不存在
  2. 子结点还有锁没有被释放 在多粒度锁协议中,每次锁定都是直接锁定一条路径,因此如果要释放锁,则当前结点所能到达的子结点都不能持有锁。
  3. 只读异常
public void release(TransactionContext transaction)
            throws NoLockHeldException, InvalidLockException {
	if (readonly) {
		throw new UnsupportedOperationException("readonly");
	}
	LockType curExist = getExplicitLockType(transaction);
	if (curExist == LockType.NL) {
		throw new NoLockHeldException("no lock");
	}

	if (numChildLocks.getOrDefault(transaction.getTransNum(), 0) != 0) {
		throw new InvalidLockException("invalid");
	}
	lockman.release(transaction, name);
	decreaseChildLockNum(transaction, parent, 1);
	return;
}

promotion()方法可以将当前事务在资源中持有的锁进行更新,需要注意的是,如果目标是SIX锁,则必须将当前结点的子结点锁持有的S锁和IS锁全部释放,因此我们先实现一个辅助方法来获取持有S锁和IS锁的所有子结点。

private List<ResourceName> sisDescendants(TransactionContext transaction) {  
    List<ResourceName> names = new ArrayList<>();  
    List<Lock> locks = lockman.getLocks(transaction);  
    for (Lock lock : locks) {  
        if (lock.lockType == LockType.S || lock.lockType == LockType.IS) {  
            if (lock.name.isDescendantOf(this.name)) {  
                names.add(lock.name);  
            }  
        }  
    }  
    return names;  
}

接下来我们就可以开始实现promotion()方法了,他要处理的异常有:

  1. 只读异常
  2. 重复请求异常
  3. 锁不存在异常
  4. 请求的锁不是原锁的强化版,用LockType的substituable()判断
    如果目标是SIX锁,就先获取持有S锁和IS锁的所有子结点,然后将他们全部释放,注意此时decreaseChildLockNum()传入的不是父节点而是自身,因为删除操作并非针对当前资源,而是它的子结点。
    public void promote(TransactionContext transaction, LockType newLockType)  
            throws DuplicateLockRequestException, NoLockHeldException, InvalidLockException {   
        if (readonly) {  
            throw new UnsupportedOperationException("readonly");  
        }  
        LockType existLock = getExplicitLockType(transaction);  
        if (existLock == newLockType) {  
            throw new DuplicateLockRequestException("duplicate");  
        } else if (existLock == LockType.NL) {  
            throw new NoLockHeldException("no lock");  
        }  
        if (!LockType.substitutable(newLockType, existLock)) {  
            throw new InvalidLockException("invalid lock");  
        }  
      
        if (newLockType == LockType.SIX) {  
            List<ResourceName> names = sisDescendants(transaction);  
            for (ResourceName resourceName : names) {  
                lockman.release(transaction, resourceName);  
            }  
            decreaseChildLockNum(transaction, this, names.size());  
        }  
        lockman.promote(transaction, name, newLockType);  
      
        return;  
    }

最后就是escalate()方法,它会将某个事务在指定资源中所有的锁都提升到指定的粒度,从此以下的锁全部释放。因此我们需要实现一个辅助方法来获取所有持有锁的子结点。
基本逻辑和sisDescendants类似

private List<ResourceName> allChildrenNames(TransactionContext transaction) {  
    List<ResourceName> names = new ArrayList<>();  
    List<Lock> locks = lockman.getLocks(transaction);  
    for (Lock lock : locks) {  
        if (lock.name.isDescendantOf(name))  
            names.add(lock.name);  
    }  
    return names;  
}

escalate在本题中只会将某一粒度的锁提升为S锁或X锁,IS->S, IX/SIX->X。这里可以使用之前实现的LockManager的acquireAndRelease(),这样就无需再一遍遍调用release()

public void escalate(TransactionContext transaction) throws NoLockHeldException {   
    if (readonly) {  
        throw new UnsupportedOperationException("unsupported");  
    }  
    LockType existLockType = getExplicitLockType(transaction);  
    if (existLockType == LockType.NL) {  
        throw new NoLockHeldException("no lock");  
    }  
  
    List<ResourceName> names = allChildrenNames(transaction);  
  
    decreaseChildLockNum(transaction, this, names.size());  
    if (existLockType == LockType.IS) {  
        lockman.acquireAndRelease(transaction, name, LockType.S, names);  
    } else if (existLockType == LockType.IX || existLockType == LockType.SIX) {  
        lockman.acquireAndRelease(transaction, name, LockType.X, names);  
    }  
    return;  
}

Task 4: LockUtil

拥有了以上基础,我们就可以着手将请求锁的方法封装为一个工具类。该工具类需要具备如下功能:

  1. 永远只在已有锁的基础上,做最小程度的升级来满足新的要求。
  2. 只能升级锁,不能降级。
  3. 如果父级结点不满足新锁的前置条件,则为父级结点请求需要的前置锁。

因此我们可以将本题划分为两个部分:

  1. 确保所有的父级结点持有的锁都满足了前置条件。
  2. 根据现有的锁,最小幅度地提升锁的粒度来满足条件。

我们可以将碰到的情况分为如下几种:

  1. 现有的锁已经满足了请求锁的要求,函数不做任何处理。
  2. 现有的锁为IX锁,请求锁为S锁,由于本方法不能让锁的粒度降低,但又要满足新的要求,所以此时分配SIX锁。
  3. 当前的锁为意向锁,继续请求锁必然意味着粒度提高,不再是意向锁(特殊情况就是第2种情况),此时需要子结点放弃持有的锁,然后升级。
  4. 其余情况,如果没有当前结点没有锁,就请求新锁,已有锁就升级。

分析完毕之后,先开始实现本题的第一部分,确保所有的父级结点都满足条件。鉴于粒度结构属于树状结构,因此使用递归来实现。
递归的终止条件设定为遍历到达根节点,或者当前结点的条件已经满足要求,否则就请求需要的锁并继续递归访问父节点。

private static void ensureAncestorLock(LockContext parent, LockType requestType, TransactionContext transaction) {
	if (parent == null || requestType == null) return;
	LockType effectiveLockType = parent.getEffectiveLockType(transaction);
	LockType explicitLockType = parent.getExplicitLockType(transaction);
	if (LockType.substitutable(explicitLockType, requestType) || LockType.substitutable(effectiveLockType, requestType))
		return;
	LockContext grandContext = parent.parentContext();
	LockType grandLock = LockType.parentLock(requestType);
	ensureAncestorLock(grandContext, grandLock, transaction);
	if (explicitLockType == LockType.NL) {
		parent.acquire(transaction, requestType);
	} else {
		parent.promote(transaction, requestType);
	}
}

接下来就可以轻松实现ensureSufficientLockHeld()

public static void ensureSufficientLockHeld(LockContext lockContext, LockType requestType) {  
    // requestType must be S, X, or NL  
    assert (requestType == LockType.S || requestType == LockType.X || requestType == LockType.NL);  
  
    // Do nothing if the transaction or lockContext is null  
    TransactionContext transaction = TransactionContext.getTransaction();  
    if (transaction == null || lockContext == null) return;  
  
    // You may find these variables useful  
    LockContext parentContext = lockContext.parentContext();  
    LockType effectiveLockType = lockContext.getEffectiveLockType(transaction);  
    LockType explicitLockType = lockContext.getExplicitLockType(transaction);  
  
    if (LockType.substitutable(explicitLockType, requestType)) {  
        return;  
    }  
  
    // 如果当前隐式的锁类型已经满足需要,就不做任何操作  
    if (LockType.substitutable(effectiveLockType, requestType)) {  
        return;  
    }  
  
    // 确保父级已经获取了前置锁  
    if (parentContext != null) {  
        LockType parentLock = LockType.parentLock(requestType);  
        ensureAncestorLock(parentContext, parentLock, transaction);  
    }  
  
    if (explicitLockType == LockType.IX && requestType == LockType.S) {  
        lockContext.promote(transaction, LockType.SIX);  
        return;  
    }  
  
    if (explicitLockType.isIntent()) {  
        lockContext.escalate(transaction);  
        return;  
    }  
  
    if (explicitLockType == LockType.NL) {  
        lockContext.acquire(transaction, requestType);  
    } else {  
        lockContext.promote(transaction, requestType);  
    }   
    return;  
}

Task5非常简单,就不多赘述了。

Recovery

Forward Processing

Task 1:Transaction Status

本题要实现事务状态的转变,要实现的方法为commit(), abort(), end()
需要注意的点有:

  1. commit方法在返回结果前,要将对应的日志刷入磁盘来保证数据的持久性。
  2. end方法在面对已经abort了的事务时,要将该事务做的所有操作回滚。

commit和abort实现方法类似,都是先获取事务的最后一条日志编号,然后根据他生成新的日志,最后修改事务状态,以及事务对应的最后一条记录编号

public long commit(long transNum) {
	TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);
	long lastLSN = transactionTableEntry.lastLSN;
	long curLSN = logManager.appendToLog(new CommitTransactionLogRecord(transNum, lastLSN));
	logManager.flushToLSN(curLSN);
	transactionTableEntry.transaction.setStatus(Transaction.Status.COMMITTING);
	transactionTableEntry.lastLSN = curLSN;
	return curLSN;
}
public long abort(long transNum) {  
    TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);  
    long lastLSN = transactionTableEntry.lastLSN;  
    long abortLSN = logManager.appendToLog(new AbortTransactionLogRecord(transNum, lastLSN));  
    transactionTableEntry.lastLSN = abortLSN;  
    transactionTableEntry.transaction.setStatus(Transaction.Status.ABORTING);  
    return abortLSN;  
}

end方法相较以上两者,多了回滚的处理。首先需要找出回滚到那一条日志编号,显然对于一个被抛弃的事务来说,他所做的所有操作都要被回滚,因此可以从最后一条记录开始一步一步往回,直到没有前一条日志。在回滚结束后,添加End日志并将该事务从事务表中移除。

public long end(long transNum) {  
    TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);  
    if (transactionTableEntry.transaction.getStatus().equals(Transaction.Status.ABORTING)) {  
        LogRecord logRecord = logManager.fetchLogRecord(transactionTableEntry.lastLSN);  
        while (logRecord != null && logRecord.getPrevLSN().isPresent()) {  
            Long prevLSN = logRecord.getPrevLSN().get();  
            logRecord = logManager.fetchLogRecord(prevLSN);  
        }  
        rollbackToLSN(transNum, logRecord.getLSN());  
    }  
    long lastLSN = transactionTableEntry.lastLSN;  
    EndTransactionLogRecord endRecord = new EndTransactionLogRecord(transNum, lastLSN);  
    long l = logManager.appendToLog(endRecord);  
    transactionTableEntry.lastLSN = l;  
    transactionTableEntry.transaction.setStatus(Transaction.Status.COMPLETE);  
    transactionTable.remove(transNum);  
    return l;  
}

具体的回滚操作步骤如下:

  1. 如果当前所处的日志编号大于目标编号,则继续,否则退出
  2. 如果该条日志记录的是可undo的操作,则添加一条clr日志,并执行undo操作
  3. 向之前的日志记录移动
    private void rollbackToLSN(long transNum, long LSN) {  
        TransactionTableEntry transactionEntry = transactionTable.get(transNum);  
        LogRecord lastRecord = logManager.fetchLogRecord(transactionEntry.lastLSN);  
        long lastRecordLSN = lastRecord.getLSN();  
        long currentLSN = lastRecord.getUndoNextLSN().orElse(lastRecordLSN);   
        while (currentLSN > LSN) {  
            LogRecord logRecord = logManager.fetchLogRecord(currentLSN);  
            if (logRecord.isUndoable()) {  
                // 获取clr日志  
                LogRecord clr = logRecord.undo(transactionEntry.lastLSN);  
                long lsn = logManager.appendToLog(clr);  
                // 更新entry信息  
                transactionEntry.lastLSN = lsn;  
                // 调用redo来执行undo操作(题目要求)
                clr.redo(this, diskSpaceManager, bufferManager);  
            }  
            currentLSN = logRecord.getPrevLSN().orElse(-1L);  
        }  
    }

Task 2:Logging

本题需要实现日志功能,logAllocPart, logFreePart, logAllocPage, logFreePage均由题目实现,我们需要实现logPageWrite(),用于记录数据页写入操作的日志。实现与abort类似,唯一要注意的是,如果dirtyPage Table中已经有了这张表的修改信息,那就不要在往里面put,因为该表记录的LSN属于首个记录对该表的修改操作的日志。

public long logPageWrite(long transNum, long pageNum, short pageOffset, byte[] before,  
                         byte[] after) {  
    assert (before.length == after.length);  
    assert (before.length <= BufferManager.EFFECTIVE_PAGE_SIZE / 2);  
    TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);  
    long lastLSN = transactionTableEntry.lastLSN;  
    UpdatePageLogRecord updatePageLogRecord = new UpdatePageLogRecord(transNum, pageNum, lastLSN, pageOffset, before, after);  
    long lsn = logManager.appendToLog(updatePageLogRecord);  
    transactionTableEntry.lastLSN = lsn;  
    dirtyPageTable.putIfAbsent(pageNum, lsn);  
  
    return lsn;  
}

Task 3: Savepoints

本题要实现根据存档点回滚,存档点有它对应的LSN,因此我们可以直接调用实现过的rollBackToLSN()

public void rollbackToSavepoint(long transNum, String name) {  
    TransactionTableEntry transactionEntry = transactionTable.get(transNum);  
    assert (transactionEntry != null);  
  
    // All of the transaction's changes strictly after the record at LSN should be undone.  
    long savepointLSN = transactionEntry.getSavepoint(name);  
    rollbackToLSN(transNum, savepointLSN);  
    return;  
}

Task 4: Checkpoints

在之前的课程中,我们了解到,数据库会周期性的记录一些模糊检查点,来提升恢复操作的效率。本题我们就是要实现检查点的功能。
设定检查点的要求为:如果目前脏页表和事务表的大小超过某一阈值,就将这些部分交给一个endcheckpoint管理,之后开始重新记录,直至下一个检查点达到要求。
实现思路:

  1. 遍历脏页表,如果当前两表大小达到要求,就新建一个检查点。然后清空用于记录的临时表,重新记录。
  2. 遍历事务表,继续和上一步一样的操作。
  3. 建立最后一个检查点。
    tips:EndCheckpointLogRecord.fitsInOneRecord()是用于判断当前两表大小是否符合要求的方法。
    public synchronized void checkpoint() {  
        // Create begin checkpoint log record and write to log  
        LogRecord beginRecord = new BeginCheckpointLogRecord();  
        long beginLSN = logManager.appendToLog(beginRecord);  
      
        Map<Long, Long> chkptDPT = new HashMap<>();  
        Map<Long, Pair<Transaction.Status, Long>> chkptTxnTable = new HashMap<>();  
      
        // TODO(proj5): generate end checkpoint record(s) for DPT and transaction table  
        int pageNumber = 0;  
        int transactionNumber = 0;  
        for (Long pageNum : dirtyPageTable.keySet()) {  
            Long recLSN = dirtyPageTable.get(pageNum);  
            // 如果再加新的一页不能满足条件,就新建一个checkpoint  
            if (!EndCheckpointLogRecord.fitsInOneRecord(pageNumber + 1, transactionNumber)) {  
                EndCheckpointLogRecord endpoint = new EndCheckpointLogRecord(new HashMap<>(chkptDPT), new HashMap<>(chkptTxnTable));  
                logManager.appendToLog(endpoint);  
                flushToLSN(endpoint.getLSN());  
                chkptDPT.clear();  
                pageNumber = 0;  
            }  
            pageNumber += 1;  
            chkptDPT.put(pageNum, recLSN);  
        }  
      
        for (Long transNum : transactionTable.keySet()) {  
            TransactionTableEntry transactionTableEntry = transactionTable.get(transNum);  
            if (!EndCheckpointLogRecord.fitsInOneRecord(pageNumber, transactionNumber + 1)) {  
                EndCheckpointLogRecord endpoint = new EndCheckpointLogRecord(new HashMap<>(chkptDPT), new HashMap<>(chkptTxnTable));  
                logManager.appendToLog(endpoint);  
                flushToLSN(endpoint.getLSN());  
                chkptDPT.clear();  
                chkptTxnTable.clear();  
                pageNumber = 0;  
                transactionNumber = 0;  
            }  
            Transaction transaction = transactionTableEntry.transaction;  
            chkptTxnTable.put(transNum, new Pair<>(transaction.getStatus(), transactionTableEntry.lastLSN));  
            transactionNumber += 1;  
        }  
      
        // Last end checkpoint record  
        LogRecord endRecord = new EndCheckpointLogRecord(chkptDPT, chkptTxnTable);  
        logManager.appendToLog(endRecord);  
        // Ensure checkpoint is fully flushed before updating the master record  
        flushToLSN(endRecord.getLSN());  
      
        // Update master record  
        MasterLogRecord masterRecord = new MasterLogRecord(beginLSN);  
        logManager.rewriteMasterRecord(masterRecord);  
    }

Restart Recovery

Task 5: Analysis

在恢复操作开始时,首先需要进行分析。来重建系统崩溃时的事务表和脏页表

  1. 获取日志的Master record,借此来获取begincheckpoint(在初始化时,这两个对象会被一并创建)
  2. 遍历之后的所有日志,接下来讲每种日志都怎么处理。
  3. 事务操作日志:如果对应的事务没有在事务表里,就将其添加进去,然后更新事务的lastLSN
  4. 数据页操作日志:UpdatePage/UndoUpdatePage会修改内存中的数据页,讲其添加进脏页表,但不需要刷入磁盘。FreePage/UndoAllocPage会将数据持久化到磁盘,将对应的数据页从脏页表删除。AllocPage/UndoFreePage不需要我们处理。
  5. 事务状态日志:将对应的事务状态调为COMMITTING, RECOVERY_ABORTING, or COMPLETE。如果是end日志,首先要将事务清理掉(调用cleanup()),然后从事务表中移除,接着将事务编号记录到endedTransactions集合中,最后修改事务状态。
  6. 检查点日志:对于BeginCheckpoint不需要任何操作。对于EndCheckpoint,将检查点记录的脏页表全部加入到脏页表中,即使脏页表中存在某条记录(考虑到检查点提供的信息的准确性是高于日志的)。对于事务表中的每一条事务,如果已经处于endedTransactions,则不用管,如果有未被记录的,就进行添加。并且,只有当检查点中的事务的lastLSN大于等于目前内存中事务的lastLSN时才修改内存中的日志编号。最后,如果检查点中事务状态优先于内存中事务状态,则进行修改。
  7. 最后遍历一遍当前的事务表,将处于COMMITING状态的事务结束,处于RUNNING状态的调整为RECOVERY_ABORTING。要注意操作的先后顺序。

定义的用于判断事务状态优先级的方法。遵循:

  • running -> committing -> complete
  • running -> aborting -> complete
    /** 判断状态1是否比状态2领先 */  
    private boolean judgeAdvance(Transaction.Status type1, Transaction.Status type2) {  
        if (type1.equals(Transaction.Status.RUNNING)) return false;  
        if (type1.equals(Transaction.Status.ABORTING) || type1.equals(Transaction.Status.COMMITTING)) {  
            if (type2.equals(Transaction.Status.RUNNING)) return true;  
            else return false;  
        }  
        if (type1.equals(Transaction.Status.COMPLETE)) return true;  
        return true;  
    }
void restartAnalysis() {  
    // Read master record  
    LogRecord record = logManager.fetchLogRecord(0L);  
    // Type checking  
    assert (record != null && record.getType() == LogType.MASTER);  
    MasterLogRecord masterRecord = (MasterLogRecord) record;  
    // Get start checkpoint LSN  
    long LSN = masterRecord.lastCheckpointLSN;  
    // Set of transactions that have completed  
    Set<Long> endedTransactions = new HashSet<>();  
    // TODO(proj5): implement  
    Iterator<LogRecord> iterator = logManager.scanFrom(LSN);  
    while (iterator.hasNext()) {  
        LogRecord logRecord = iterator.next();  
        // 如果这是一次事务操作  
        if (logRecord.getTransNum().isPresent()) {  
            Long transNum = logRecord.getTransNum().get();  
            // 如果是一个新事务,就将其添加到事务表里  
            if (transactionTable.get(transNum) == null) {  
                Transaction transaction = newTransaction.apply(transNum);  
                startTransaction(transaction);  
            }  
            // 更新事务的LSN  
            TransactionTableEntry tableEntry = transactionTable.get(transNum);  
            tableEntry.lastLSN = logRecord.getLSN();  
        }  
        // 如果是数据页操作  
        if (logRecord.getPageNum().isPresent()) {  
            LogType type = logRecord.getType();  
            // 对于更新涉及写入的更新操作,将该页加入脏页表中  
            if (type.equals(LogType.UPDATE_PAGE) || type.equals(LogType.UNDO_UPDATE_PAGE)) {  
                dirtyPage(logRecord.getPageNum().get(), logRecord.getLSN());  
            }  
            // 对于内存管理,先将更改写入磁盘,再从脏页表中移除  
            if (type.equals(LogType.FREE_PAGE) || type.equals(LogType.UNDO_ALLOC_PAGE)) {  
                pageFlushHook(logRecord.getLSN());  
                dirtyPageTable.remove(logRecord.getPageNum().get());  
            }  
            // 其余操作不需要管理  
        }  
  
        if (logRecord.getType().equals(LogType.COMMIT_TRANSACTION)) {  
            Long transNum = logRecord.getTransNum().get();  
            TransactionTableEntry tableEntry = transactionTable.get(transNum);  
            tableEntry.transaction.setStatus(Transaction.Status.COMMITTING);  
        }  
  
        if (logRecord.getType().equals(LogType.ABORT_TRANSACTION)) {  
            Long transNum = logRecord.getTransNum().get();  
            TransactionTableEntry tableEntry = transactionTable.get(transNum);  
            tableEntry.transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);  
        }  
  
        // 如果是结束事务的日志,则需要先清除事务,从事务表移除,添加到endedTransactions,最后修改状态  
        if (logRecord.getType().equals(LogType.END_TRANSACTION)) {  
            Long transNum = logRecord.getTransNum().get();  
            TransactionTableEntry tableEntry = transactionTable.get(transNum);  
            tableEntry.transaction.cleanup();  
            transactionTable.remove(transNum);  
            endedTransactions.add(transNum);  
            tableEntry.transaction.setStatus(Transaction.Status.COMPLETE);  
        }  
  
        if (logRecord.getType().equals(LogType.END_CHECKPOINT)) {  
            // 对于检查点脏页表的数据,将其全部加入  
            Map<Long, Long> dpt = logRecord.getDirtyPageTable();  
            dirtyPageTable.putAll(dpt);  
            Map<Long, Pair<Transaction.Status, Long>> tTable = logRecord.getTransactionTable();  
            // 如果检查点的事务表中有事务不在恢复事务表中,就将其添加  
            // 更新事务表的LSN  
            for (Long transNum : tTable.keySet()) {  
                if (endedTransactions.contains(transNum)) continue;  
                if (!transactionTable.containsKey(transNum)) {  
                    startTransaction(newTransaction.apply(transNum));  
                }  
                Pair<Transaction.Status, Long> pair = tTable.get(transNum);  
                TransactionTableEntry tableEntry = transactionTable.get(transNum);  
                Long lsn = pair.getSecond();  
                if (lsn >= tableEntry.lastLSN) {  
                    tableEntry.lastLSN = lsn;  
                }  
                // 如果存档点中的事务状态领先,则要进行修改  
                if (judgeAdvance(pair.getFirst(), tableEntry.transaction.getStatus())) {  
                    // 如果是aborting则需要修改为recovery_abort  
                    if (pair.getFirst().equals(Transaction.Status.ABORTING)) {  
                        tableEntry.transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);  
                    } else {  
                        tableEntry.transaction.setStatus(pair.getFirst());  
                    }  
                }  
            }  
        }  
    }  
    for (Long transNum : transactionTable.keySet()) {  
        TransactionTableEntry tableEntry = transactionTable.get(transNum);  
        Transaction transaction = tableEntry.transaction;  
        // 这里cleanup必须在end前调用,因为end会修改状态,导致cleanup报错  
        if (transaction.getStatus().equals(Transaction.Status.COMMITTING)) {  
            transaction.cleanup();  
            end(transNum);  
        }  
        // 这里要注意以下顺序,abort()会将事务修改为abort状态,但我们这时候需要的是RECOVERY_ABORTING  
        // 因此abort方法要在前面。不过日志也理应在事务操作之前添加  
        if (transaction.getStatus().equals(Transaction.Status.RUNNING)) {  
            abort(transNum);  
            transaction.setStatus(Transaction.Status.RECOVERY_ABORTING);  
        }  
  
    }  
    return;  
}

Task 6: Redo

这一步进行redo操作,使得事务原本做出的修改全部生效。而进行redo的操作要满足以下条件之一:

  1. 是与分区有关的日志记录(AllocPart, UndoAllocPart, FreePart, UndoFreePart)。
  2. 分配页面的记录(AllocPage, UndoFreePage)
  3. 修改数据页的记录(UpdatePage, UndoUpdatePage, UndoAllocPage, FreePage),并且满足:对应的数据页处于脏页表,记录的LSN大于等于脏页表中的recLSN,数据页记录的LSN小于记录的LSN。
    由于redo是从前往后进行的,并且只涉及写入操作(等于只操作脏页表),因此我们先要获取脏页表中最小的LSN。
    /** 从脏页表中获取recLSN最小的值 */  
    private long getLowestRecLSN() {  
        return dirtyPageTable.values().stream()  
                .sorted()  
                .limit(1)  
                .collect(Collectors.toList())  
                .get(0);  
    }
void restartRedo() {  
    // TODO(proj5): implement  
    long lowestRecLSN = getLowestRecLSN();  
    Iterator<LogRecord> iterator = logManager.scanFrom(lowestRecLSN);  
    while (iterator.hasNext()) {  
        LogRecord logRecord = iterator.next();  
        LogType type = logRecord.getType();  
        if (type.equals(LogType.ALLOC_PART)  
                || type.equals(LogType.FREE_PART)  
                || type.equals(LogType.UNDO_FREE_PART)  
                || type.equals(LogType.UNDO_ALLOC_PART)) {  
            logRecord.redo(this, diskSpaceManager, bufferManager);  
        }  
        if (type.equals(LogType.ALLOC_PAGE)  
                || type.equals(LogType.UNDO_FREE_PAGE)) {  
            logRecord.redo(this, diskSpaceManager, bufferManager);  
        }  
        if (type.equals(LogType.UPDATE_PAGE)  
                || type.equals(LogType.UNDO_UPDATE_PAGE)  
                || type.equals(LogType.FREE_PAGE)  
                || type.equals(LogType.UNDO_ALLOC_PAGE)) {  
            Long pageNum = logRecord.getPageNum().get();  
            // 指定的数据页在脏页表中,并且日志的LSN大于等于表中的recLSN  
            if (dirtyPageTable.containsKey(pageNum)  
                    && logRecord.getLSN() >= dirtyPageTable.get(pageNum)) {  
                Page page = bufferManager.fetchPage(new DummyLockContext(), pageNum);  
                try {  
                    if (page.getPageLSN() < logRecord.getLSN()) {  
                        logRecord.redo(this, diskSpaceManager, bufferManager);  
                    }  
                } finally {  
                    page.unpin();  
                }  
            }  
        }  
    }  
    return;  
}

Task 7: Undo

这一部分要将被抛弃的事务做出的修改撤销。为了避免一个一个撤销事务操作带来的随机细碎的IO消耗(没撤销一个事务都要重新遍历一遍日志)。我们用优先队列存储各事务接下来的日志编号,使用一次遍历撤销所有事务。
循环撤销事务步骤如下:

  1. 如果该操作可以撤销,就进行撤销并且添加CLR日志。
  2. 如果该日志有undoNextLSN就添加改编号到队列,否则添加当前日志的prevLSN。
  3. 如果新的LSN编号为0,就结束该事务。
    这里我们先要获取所有事务的最终日志编号,并将其与事务捆绑,放到大顶堆中。
    /** 获取abort状态的事务编号与lastLSN */  
    private List<Pair<Long, Long>> getAbortedTransactionLastLSN() {  
        List<Pair<Long, Long>> ret = new ArrayList<>();  
        for (Long transNum : transactionTable.keySet()) {  
            Transaction transaction = transactionTable.get(transNum).transaction;  
            if (transaction.getStatus().equals(Transaction.Status.RECOVERY_ABORTING)) {  
                ret.add(new Pair<>(transNum, transactionTable.get(transNum).lastLSN));  
            }  
        }  
        return ret;  
    }
void restartUndo() {  
    // TODO(proj5): implement  
    PriorityQueue<Pair<Long, Long>> pq = new PriorityQueue<>((a, b) -> (int)(b.getSecond() - a.getSecond()));  
    pq.addAll(getAbortedTransactionLastLSN());  
    while (!pq.isEmpty()) {  
        Pair<Long, Long> pair = pq.poll();  
        Long transNum = pair.getFirst();  
        Long lastLSN = pair.getSecond();  
        LogRecord logRecord = logManager.fetchLogRecord(lastLSN);  
        TransactionTableEntry tableEntry = transactionTable.get(transNum);  
        if (logRecord.isUndoable()) {  
            LogRecord CLR = logRecord.undo(tableEntry.lastLSN);  
            long lsn = logManager.appendToLog(CLR);  
            tableEntry.lastLSN = lsn;  
            CLR.redo(this, diskSpaceManager, bufferManager);  
        }  
        long newLSN;  
        if (logRecord.getUndoNextLSN().isPresent()) {  
            newLSN = logRecord.getUndoNextLSN().get();  
        } else {  
            newLSN = logRecord.getPrevLSN().orElse(0L);  
        }  
        if (newLSN == 0) {  
            tableEntry.transaction.cleanup();  
            end(transNum);  
            continue;  
        }  
        pq.add(new Pair<>(transNum, newLSN));  
    }  
  
    return;  
}
  • 标题: rookie db
  • 作者: Zephyr
  • 创建于 : 2022-07-05 19:02:18
  • 更新于 : 2023-01-26 12:32:42
  • 链接: https://faustpromaxpx.github.io/2022/07/05/rookie-db/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论