概览
高可用,Replay
场景
元数据写入
public void createDb(CreateDbStmt stmt) throws DdlException {
String fullDbName = stmt.getFullDbName();
long id = Env.getCurrentEnv().getNextId();
Database db = new Database(id, fullDbName);
unprotectCreateDb(db);
Env.getCurrentEnv().getEditLog().logCreateDb(db);
}
镜像生成 - Checkpoint
- 创建Checkpoint Catalog;加载老镜像;从BDBJE中读取日志并回放
- 生成新镜像
- 推送新镜像到其他FE
- 删除老的journal
- 删除老的image
MetaWriter.write
public static void write(File imageFile, Env env) throws IOException {
// save image does not need any lock. because only checkpoint thread will call this method.
LOG.info("start to save image to {}. is ckpt: {}",
imageFile.getAbsolutePath(), Env.isCheckpointThread());
final Reference<Long> checksum = new Reference<>(0L);
long saveImageStartTime = System.currentTimeMillis();
// MetaHeader should use output stream in the future.
long startPosition = MetaHeader.write(imageFile);
List<MetaIndex> metaIndices = Lists.newArrayList();
FileOutputStream imageFileOut = new FileOutputStream(imageFile, true);
try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(imageFileOut),
startPosition)) {
writer.setDelegate(dos, metaIndices);
long replayedJournalId = env.getReplayedJournalId();
// 1. write header first
checksum.setRef(
writer.doWork("header", () -> env.saveHeader(dos, replayedJournalId, checksum.getRef())));
// 2. write other modules
for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) {
checksum.setRef(writer.doWork(m.name, () -> {
try {
return (long) m.writeMethod.invoke(env, dos, checksum.getRef());
} catch (IllegalAccessException | InvocationTargetException e) {
LOG.warn("failed to write meta module: {}", m.name, e);
throw new RuntimeException(e);
}
}));
}
// 3. force sync to disk
imageFileOut.getChannel().force(true);
}
MetaFooter.write(imageFile, metaIndices, checksum.getRef());
long saveImageEndTime = System.currentTimeMillis();
LOG.info("finished save image {} in {} ms. checksum is {}", imageFile.getAbsolutePath(),
(saveImageEndTime - saveImageStartTime), checksum.getRef());
}
总结
EditLog
代理了对BDBJEJournal的写和读操作
logEdit(写)
public void logCreateDb(Database db) {
logEdit(OperationType.OP_CREATE_DB, db);
}
private synchronized long logEdit(short op, Writable writable) {
long logId = -1;
logId = journal.write(op, writable);
// get a new transactionId
txId++;
if (txId >= Config.edit_log_roll_num) {
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId,
Config.edit_log_roll_num);
rollEditLog();
txId = 0;
}
return logId;
}
public void rollEditLog() {
journal.rollJournal();
}
loadJournal(读)
public static void loadJournal(Env env, Long logId, JournalEntity journal) {
short opCode = journal.getOpCode();
try {
switch (opCode) {
case OperationType.OP_CREATE_DB: {
Database db = (Database) journal.getData();
env.replayCreateDb(db);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
throw e;
}
}
}
BDBJEJournal
包含两个重要属性currentJournalDB,nextJournalId
write(写)
public synchronized long write(short op, Writable writable) throws IOException {
JournalEntity entity = new JournalEntity();
entity.setOpCode(op);
entity.setData(writable);
// id is the key
long id = nextJournalId.getAndIncrement();
Long idLong = id;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(idLong, theKey);
// entity is the value
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
entity.write(buffer);
DatabaseEntry theData = new DatabaseEntry(buffer.getData());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
}
LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
// Write the key value pair to bdb.
boolean writeSucceed = false;
for (int i = 0; i < RETRY_TIME; i++) {
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
writeSucceed = true;
break;
}
}
return id;
}
rollJournal
public synchronized void rollJournal() {
// Doesn't need to roll if current database contains no journals
if (currentJournalDB.count() == 0) {
return;
}
long newName = nextJournalId.get();
String currentDbName = currentJournalDB.getDatabaseName();
long currentName = Long.parseLong(currentDbName);
long newNameVerify = currentName + currentJournalDB.count();
if (newName == newNameVerify) {
LOG.info("roll edit log. new db name is {}", newName);
currentJournalDB = bdbEnvironment.openDatabase(Long.(newName));
} else {
...
}
}
open
public synchronized void open() {
if (bdbEnvironment == null) {
File dbEnv = new File(environmentPath);
bdbEnvironment = new BDBEnvironment();
HostInfo helperNode = Env.getServingEnv().getHelperNode();
String helperHostPort = helperNode.getHost() + ":" + helperNode.getPort();
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
Env.getServingEnv().isElectable());
}
// Open a new journal database or get last existing one as current journal
// database
List<Long> dbNames = null;
dbNames = getDatabaseNames();
if (dbNames.size() == 0) {
String dbName = Long.(Env.getServingEnv().getReplayedJournalId() + 1);
LOG.info("the very first time to open bdb, dbname is {}", dbName);
currentJournalDB = bdbEnvironment.openDatabase(dbName);
} else {
// get last database as current journal database
currentJournalDB = bdbEnvironment.openDatabase(dbNames.get(dbNames.size() - 1).());
}
// set next journal id
nextJournalId.set(getMaxJournalId() + 1);
}