前言 網絡上也有許多介紹redis的AOF機制的文章,但是從宏觀上介紹aof的流程,沒有具體分析在AOF過程中涉及到的數據結構和控制機制。昨晚特別看了2.8源碼,感覺源碼中的許多細節是值得細細深究的。特別是list *aof_rewrite_buf_blocks結構。仔細看源碼,會發
網絡上也有許多介紹redis的AOF機制的文章,但是從宏觀上介紹aof的流程,沒有具體分析在AOF過程中涉及到的數據結構和控制機制。昨晚特別看了2.8源碼,感覺源碼中的許多細節是值得細細深究的。特別是list *aof_rewrite_buf_blocks結構。仔細看源碼,會發現原來看網絡文章多的到的領會是片面的,最好的學習還是得自己動手...
原文鏈接: http://blog.csdn.net/ordeder/article/details/39271543
作者提及的AOF簡化的流程為:依據源碼,AOF總體有一下操作:
主要函數:
//函數1:將command寫入aof_buff
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
//函數2:啟動子進程,子進程用于刷一遍redis中的數據
int rewriteAppendOnlyFileBackground(void);
//函數3:刷一遍server.db[16],依次將對象寫入磁盤臨時文件tmpfile
int rewriteAppendOnlyFile(char *filename);
//函數4:將aof_buff內容持久化
void flushAppendOnlyFile(int force);
//函數5:將server.aof_rewrite_buf_blocks中的內容寫入tmpfile,并替換aof文件
void backgroundRewriteDoneHandler(exitcode,bysignal);
1 AOF日常命令append:
1.1. Redis執行文件事件:執行用戶命令,并將該命令緩存于Server.aof_buf中{函數1}
1.2. Redis執行時間時間的ServerCron:依據參數server.aof_flush_postponed_start,{函數4}
1.2.1. 將redisServer.aof_buf寫入文件Server.aof_fd。
1.2.2. 該文件何時fsync到磁盤有三種機制:
AOF_FSYNC_EVERYSEC 每秒調用fsync
AOF_FSYNC_ALWAYS 寫文件后立即調用fsync
其他 聽系統的
2 AOF日志簡化操作:
2.1. Redis執行時間時間的ServerCron:{函數2-3}
2.1.1. 開啟后臺AOF進程,依據redis內存數據(redis.db[16]),生成可重建數據庫的命令集,并寫入tmpfile臨時文件
2.2. Redis執行文件事件:
執行用戶命令時,{函數1}
2.2.1. 將該命令緩存于redisServer.aof_buf;
2.2.2. 同時將該命令緩存于server.aof_rewrite_buf_blocks
2.3. Redis執行時間時間的ServerCron:
2.3.1 {函數4}在aof子進程還未結束期間,步驟 1.2 照常執行,將aof_buf寫入aof_fd(該干嘛干嘛)
2.3.2 wait3發現aof子進程結束,那么:{函數5}
2.3.2.1 將server.aof_rewrite_buf_blocks中的內容寫入tmpfile中
2.3.2.2 用tmpfile替換原有aof文件,并重置Server.aof_fd
函數和數據間關系如下圖所示:
struct redisServer{ ... /* AOF persistence */ int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy (每個操作|每秒|緩沖區滿)*/ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. 是否需要開啟后臺aof子進程*/ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. 在aof bgsave期間redis執行的命令將存儲到aof_rewrite_buf_blocks,當然aof_buf還是要照常使用的,二者不沖突*/ sds aof_buf; /* AOF buffer, written before entering the event loop */ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ ... } ///////////////////////////////////////////////////////////////////////////////// /* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start = ustime(), duration; int client_old_flags = c->flags; ... /* 執行用戶命令 */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(&server.also_propagate); dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; duration = ustime()-start; ... /* 將用戶命令進行AOF備份 */ if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & REDIS_PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* 如果當前操作的dict和前一次操作的dict不同, 那么redis要在aof中添加一條:select命令,選擇當前dict */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } //依據不同的命令,進行字符畫處理,并將結果寫入臨時的buff中 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ //如果用戶開啟的AOF,那么將當前命令的buff Append到server.aof_buf緩沖的尾部 if (server.aof_state == REDIS_AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ //如果當前有子進程正在進行AOF日志的重構(即掃描redis數據庫,依據數據構建日志) //那么將當前命令的buff添加到server.aof_rewrite_buf_blocks內存中(該部分內存 //專門記錄在重構AOF期間redis處理的操作) if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); } //////////////////////////////////////////////////////////////////////////////////////// int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* We take a cached value of the unix time in the global state because * with virtual memory and aging there is to store the current time * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ //緩存系統時間... server.unixtime = time(NULL); server.mstime = mstime(); ... /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ //開啟AOF日志重建的子進程(簡化日志) //后臺AOF子進程通過掃描redis.db[16]數據,生成可重建當前數據庫的命令, //并寫入臨時文件tmpfile if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { //AOF rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ //后臺AOF進程結束:將在后臺AOF子進程構建AOF日志期間redis執行的新命令 //(記錄于server.aof_rewrite_buf_blocks)append 到后臺子進程構建的tmpfile中 //最后將tmpfile重名為server.aof_filename 替換原有AOF if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); } else { redisLog(REDIS_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } updateDictResizePolicy(); } } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now */ //沒有后臺子進程在跑,那么檢查是否要開啟一個AOF或者RDB的子進程。。 ... } /* If we postponed an AOF buffer flush, let's try to do it every time the * cron function is called. */ //將server.aof_buf(緩存redis最近執行過的命名)flush到磁盤AOF文件中 //flush的策略有如下: //每個操作,調用fync將命令持久化 //間隔1秒,調用fync將aof_buf持久化 //從不調用fync,由系統自行安排時機 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); ... server.cronloops++; return 1000/server.hz; } /* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.aof_rewrite_buf. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, will append the * data accumulated into server.aof_rewrite_buf into the temp file, and * finally will rename(2) the temp file in the actual file name. * The the new file is reopened as the new append only file. Profit! */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; if (server.aof_child_pid != -1) return REDIS_ERR; start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; /* Child */ closeListeningSockets(0); redisSetProcTitle("redis-aof-rewrite"); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redisLog(REDIS_NOTICE, "AOF rewrite: %zu MB of memory used by copy-on-write", private_dirty/(1024*1024)); } exitFromChild(0); } else { exitFromChild(1); } } else { /* Parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); server.aof_child_pid = childpid; updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start * with a SELECT statement and it will be safe to merge. */ server.aof_selected_db = -1; replicationScriptCacheFlush(); return REDIS_OK; } return REDIS_OK; /* unreached */ } /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { //添加一條定位dict的命令 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* If this key is already expired skip it */ if (expiretime != -1 && expiretime < now) continue; /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); } /* Make sure data will not remain on the OS's output buffers */ fflush(fp); aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; } /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * About the 'force' argument: * * When the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on Linux write(2) will be blocked by the background fsync anyway. * When this happens we remember that there is some aof buffer to be * flushed ASAP, and will try to do that in the serverCron() function. * * However if force is set to 1 we'll write regardless of the background * fsync. */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; //判定是否該開始將server.aof_buff中緩存的命令flush到server.aof_fd文件的寫緩沖中 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* If you are following this code path, then we are going to write so * set reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ //將redis最近執行的一些命令(存于server.aof_buf)寫入文件(server.aof_fd) //注意,寫入文件并不能保證馬上寫入磁盤,因為這是帶緩沖的寫。關于何時將 //文件寫緩沖中的命令fync到磁盤,這就要看用戶的設置:(見下文) nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) { /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ ... exit(1); } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } //aof_no_fsync_on_rewrite : 該標志位表示當有aof或rdb子進程時,不進行fsync操作 if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; //fsync... //每個操作,調用fync將命令持久化 [1] //間隔1秒,調用fync將aof_buf持久化 [2] //從不調用fync,由系統自行安排時機(fd的寫緩沖區滿了)[3] //【1】 //每個操作都需要將文件緩沖區的寫 buff sync到磁盤。從而保證每個redis操作在 //被redis執行后,都能馬上持久化,安全性很高,就是磁盤寫的系統開銷有點大大 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ server.aof_last_fsync = server.unixtime; } //【2】 //每隔1s將文件緩沖區的寫緩沖區sync到磁盤 else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } //【3】 //else fd的寫緩沖滿后會由系統安排執行(聽天由命) }
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com