searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

【源码阅读】9. 元数据

2023-11-01 03:22:15
1
0

概览

高可用,Replay

场景

元数据写入

    public void createDb(CreateDbStmt stmt) throws DdlException {
        String fullDbName = stmt.getFullDbName();
        long id = Env.getCurrentEnv().getNextId();
        Database db = new Database(id, fullDbName);
        unprotectCreateDb(db);
        Env.getCurrentEnv().getEditLog().logCreateDb(db);
    }

 

镜像生成 - Checkpoint

  • 创建Checkpoint Catalog;加载老镜像;从BDBJE中读取日志并回放
  • 生成新镜像
  • 推送新镜像到其他FE
  • 删除老的journal
  • 删除老的image

MetaWriter.write

    public static void write(File imageFile, Env env) throws IOException {
        // save image does not need any lock. because only checkpoint thread will call this method.
        LOG.info("start to save image to {}. is ckpt: {}",
                imageFile.getAbsolutePath(), Env.isCheckpointThread());
        final Reference<Long> checksum = new Reference<>(0L);
        long saveImageStartTime = System.currentTimeMillis();
        // MetaHeader should use output stream in the future.
        long startPosition = MetaHeader.write(imageFile);
        List<MetaIndex> metaIndices = Lists.newArrayList();
        FileOutputStream imageFileOut = new FileOutputStream(imageFile, true);
        try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(imageFileOut),
                startPosition)) {
            writer.setDelegate(dos, metaIndices);
            long replayedJournalId = env.getReplayedJournalId();
            // 1. write header first
            checksum.setRef(
                    writer.doWork("header", () -> env.saveHeader(dos, replayedJournalId, checksum.getRef())));
            // 2. write other modules
            for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) {
                checksum.setRef(writer.doWork(m.name, () -> {
                    try {
                        return (long) m.writeMethod.invoke(env, dos, checksum.getRef());
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        LOG.warn("failed to write meta module: {}", m.name, e);
                        throw new RuntimeException(e);
                    }
                }));
            }
            // 3. force sync to disk
            imageFileOut.getChannel().force(true);
        }
        MetaFooter.write(imageFile, metaIndices, checksum.getRef());

        long saveImageEndTime = System.currentTimeMillis();
        LOG.info("finished save image {} in {} ms. checksum is {}", imageFile.getAbsolutePath(),
                (saveImageEndTime - saveImageStartTime), checksum.getRef());
    }

 

总结

EditLog

代理了对BDBJEJournal的写和读操作

logEdit(写)
    public void logCreateDb(Database db) {
        logEdit(OperationType.OP_CREATE_DB, db);
    }
    
    private synchronized long logEdit(short op, Writable writable) {
        long logId = -1;
        logId = journal.write(op, writable);
        
        // get a new transactionId
        txId++;

        if (txId >= Config.edit_log_roll_num) {
            LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId,
                    Config.edit_log_roll_num);
            rollEditLog();
            txId = 0;
        }

        return logId;
    }    
    
    public void rollEditLog() {
        journal.rollJournal();
    }   
loadJournal(读)
    public static void loadJournal(Env env, Long logId, JournalEntity journal) {
        short opCode = journal.getOpCode();
        try {
            switch (opCode) {
                case OperationType.OP_CREATE_DB: {
                    Database db = (Database) journal.getData();
                    env.replayCreateDb(db);
                    break;
                }
                default: {
                    IOException e = new IOException();
                    LOG.error("UNKNOWN Operation Type {}", opCode, e);
                    throw e;
                }
            }
    } 

 

BDBJEJournal

包含两个重要属性currentJournalDB,nextJournalId

write(写)
    public synchronized long write(short op, Writable writable) throws IOException {
        JournalEntity entity = new JournalEntity();
        entity.setOpCode(op);
        entity.setData(writable);

        // id is the key
        long id = nextJournalId.getAndIncrement();
        Long idLong = id;
        DatabaseEntry theKey = new DatabaseEntry();
        TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
        idBinding.objectToEntry(idLong, theKey);

        // entity is the value
        DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
        entity.write(buffer);

        DatabaseEntry theData = new DatabaseEntry(buffer.getData());
        if (MetricRepo.isInit) {
            MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
            MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
        }
        LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
        // Write the key value pair to bdb.
        boolean writeSucceed = false;
        for (int i = 0; i < RETRY_TIME; i++) {
            if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
                writeSucceed = true;
                break;
            }
        }

        return id;
    }

 

rollJournal

    public synchronized void rollJournal() {
        // Doesn't need to roll if current database contains no journals
        if (currentJournalDB.count() == 0) {
            return;
        }

        long newName = nextJournalId.get();
        String currentDbName = currentJournalDB.getDatabaseName();
        long currentName = Long.parseLong(currentDbName);
        long newNameVerify = currentName + currentJournalDB.count();
        if (newName == newNameVerify) {
            LOG.info("roll edit log. new db name is {}", newName);
            currentJournalDB = bdbEnvironment.openDatabase(Long.(newName));
        } else {
            ...
        }
    }    

 

open
    public synchronized void open() {
        if (bdbEnvironment == null) {
            File dbEnv = new File(environmentPath);
            bdbEnvironment = new BDBEnvironment();

            HostInfo helperNode = Env.getServingEnv().getHelperNode();
            String helperHostPort = helperNode.getHost() + ":" + helperNode.getPort();
            bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
                    Env.getServingEnv().isElectable());
        }

        // Open a new journal database or get last existing one as current journal
        // database
        List<Long> dbNames = null;
        dbNames = getDatabaseNames();

        if (dbNames.size() == 0) {
            String dbName = Long.(Env.getServingEnv().getReplayedJournalId() + 1);
            LOG.info("the very first time to open bdb, dbname is {}", dbName);
            currentJournalDB = bdbEnvironment.openDatabase(dbName);
        } else {
            // get last database as current journal database
            currentJournalDB = bdbEnvironment.openDatabase(dbNames.get(dbNames.size() - 1).());
        }

        // set next journal id
        nextJournalId.set(getMaxJournalId() + 1);
    }

 

 

0条评论
0 / 1000
x****m
5文章数
0粉丝数
x****m
5 文章 | 0 粉丝
原创

【源码阅读】9. 元数据

2023-11-01 03:22:15
1
0

概览

高可用,Replay

场景

元数据写入

    public void createDb(CreateDbStmt stmt) throws DdlException {
        String fullDbName = stmt.getFullDbName();
        long id = Env.getCurrentEnv().getNextId();
        Database db = new Database(id, fullDbName);
        unprotectCreateDb(db);
        Env.getCurrentEnv().getEditLog().logCreateDb(db);
    }

 

镜像生成 - Checkpoint

  • 创建Checkpoint Catalog;加载老镜像;从BDBJE中读取日志并回放
  • 生成新镜像
  • 推送新镜像到其他FE
  • 删除老的journal
  • 删除老的image

MetaWriter.write

    public static void write(File imageFile, Env env) throws IOException {
        // save image does not need any lock. because only checkpoint thread will call this method.
        LOG.info("start to save image to {}. is ckpt: {}",
                imageFile.getAbsolutePath(), Env.isCheckpointThread());
        final Reference<Long> checksum = new Reference<>(0L);
        long saveImageStartTime = System.currentTimeMillis();
        // MetaHeader should use output stream in the future.
        long startPosition = MetaHeader.write(imageFile);
        List<MetaIndex> metaIndices = Lists.newArrayList();
        FileOutputStream imageFileOut = new FileOutputStream(imageFile, true);
        try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(imageFileOut),
                startPosition)) {
            writer.setDelegate(dos, metaIndices);
            long replayedJournalId = env.getReplayedJournalId();
            // 1. write header first
            checksum.setRef(
                    writer.doWork("header", () -> env.saveHeader(dos, replayedJournalId, checksum.getRef())));
            // 2. write other modules
            for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) {
                checksum.setRef(writer.doWork(m.name, () -> {
                    try {
                        return (long) m.writeMethod.invoke(env, dos, checksum.getRef());
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        LOG.warn("failed to write meta module: {}", m.name, e);
                        throw new RuntimeException(e);
                    }
                }));
            }
            // 3. force sync to disk
            imageFileOut.getChannel().force(true);
        }
        MetaFooter.write(imageFile, metaIndices, checksum.getRef());

        long saveImageEndTime = System.currentTimeMillis();
        LOG.info("finished save image {} in {} ms. checksum is {}", imageFile.getAbsolutePath(),
                (saveImageEndTime - saveImageStartTime), checksum.getRef());
    }

 

总结

EditLog

代理了对BDBJEJournal的写和读操作

logEdit(写)
    public void logCreateDb(Database db) {
        logEdit(OperationType.OP_CREATE_DB, db);
    }
    
    private synchronized long logEdit(short op, Writable writable) {
        long logId = -1;
        logId = journal.write(op, writable);
        
        // get a new transactionId
        txId++;

        if (txId >= Config.edit_log_roll_num) {
            LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId,
                    Config.edit_log_roll_num);
            rollEditLog();
            txId = 0;
        }

        return logId;
    }    
    
    public void rollEditLog() {
        journal.rollJournal();
    }   
loadJournal(读)
    public static void loadJournal(Env env, Long logId, JournalEntity journal) {
        short opCode = journal.getOpCode();
        try {
            switch (opCode) {
                case OperationType.OP_CREATE_DB: {
                    Database db = (Database) journal.getData();
                    env.replayCreateDb(db);
                    break;
                }
                default: {
                    IOException e = new IOException();
                    LOG.error("UNKNOWN Operation Type {}", opCode, e);
                    throw e;
                }
            }
    } 

 

BDBJEJournal

包含两个重要属性currentJournalDB,nextJournalId

write(写)
    public synchronized long write(short op, Writable writable) throws IOException {
        JournalEntity entity = new JournalEntity();
        entity.setOpCode(op);
        entity.setData(writable);

        // id is the key
        long id = nextJournalId.getAndIncrement();
        Long idLong = id;
        DatabaseEntry theKey = new DatabaseEntry();
        TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
        idBinding.objectToEntry(idLong, theKey);

        // entity is the value
        DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
        entity.write(buffer);

        DatabaseEntry theData = new DatabaseEntry(buffer.getData());
        if (MetricRepo.isInit) {
            MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
            MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
        }
        LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
        // Write the key value pair to bdb.
        boolean writeSucceed = false;
        for (int i = 0; i < RETRY_TIME; i++) {
            if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
                writeSucceed = true;
                break;
            }
        }

        return id;
    }

 

rollJournal

    public synchronized void rollJournal() {
        // Doesn't need to roll if current database contains no journals
        if (currentJournalDB.count() == 0) {
            return;
        }

        long newName = nextJournalId.get();
        String currentDbName = currentJournalDB.getDatabaseName();
        long currentName = Long.parseLong(currentDbName);
        long newNameVerify = currentName + currentJournalDB.count();
        if (newName == newNameVerify) {
            LOG.info("roll edit log. new db name is {}", newName);
            currentJournalDB = bdbEnvironment.openDatabase(Long.(newName));
        } else {
            ...
        }
    }    

 

open
    public synchronized void open() {
        if (bdbEnvironment == null) {
            File dbEnv = new File(environmentPath);
            bdbEnvironment = new BDBEnvironment();

            HostInfo helperNode = Env.getServingEnv().getHelperNode();
            String helperHostPort = helperNode.getHost() + ":" + helperNode.getPort();
            bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
                    Env.getServingEnv().isElectable());
        }

        // Open a new journal database or get last existing one as current journal
        // database
        List<Long> dbNames = null;
        dbNames = getDatabaseNames();

        if (dbNames.size() == 0) {
            String dbName = Long.(Env.getServingEnv().getReplayedJournalId() + 1);
            LOG.info("the very first time to open bdb, dbname is {}", dbName);
            currentJournalDB = bdbEnvironment.openDatabase(dbName);
        } else {
            // get last database as current journal database
            currentJournalDB = bdbEnvironment.openDatabase(dbNames.get(dbNames.size() - 1).());
        }

        // set next journal id
        nextJournalId.set(getMaxJournalId() + 1);
    }

 

 

文章来自个人专栏
Doris
5 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0