Learning Redis source code - BIO - Alibaba Cloud Developer Forums: Cloud Discussion Forums

Blanche
Engineer
Engineer
  • UID619
  • Fans3
  • Follows2
  • Posts59
Reads:2185Replies:0

[Others]Learning Redis source code - BIO

Created#
More Posted time:Sep 5, 2016 13:48 PM
BIO, as its name suggests, means background IO. It runs in the background of Redis.  There are uniform sayings on the internet that Redis is single-thread and single-process.  But in fact, Redis is not strictly single-thread and single-process during its operation process.
Multi-process in Redis:
When writing backup (RDB, AOF) files, Redis will fork child processes for writing backup files.
Multithreading in Redis:
1. In AOF backup mode, if we set AOF_FSYNC_EVERYSEC (backup once every second, this setting can be understood as weak synchronization backup), Redis will create a background thread in which AOF backup file writing is executed.
2. When the newly generated AOF file overwrites the old AOF file:  If AOF backup has been enabled, before you close the fd, the Redis process and the old AOF file have reference existing, and the old AOF file won’t be deleted in a real sense.  So when you execute close (oldfd), if the number of processes accessing the old AOF file is displayed as 0, that is, no process has accessed this file ever, this file will be deleted in the real sense when you close the fd.  However, deleting old AOF files may cause service congestion, so we need to put and call it in another thread.
3. Execute DEL. If this key happens to have many objects, the DEL operation will cause congestion to the server for a few seconds. So you can execute the DEL operation in another thread.
BIO
Redis encapsulates all the multi-thread operations in BIO, which can be viewed in bio.c,bio.h.  We focus on the Redis encapsulation of BIO instead of specific operations in this article. The code is concise and well maintained.  It is worth learning.
BIO offers the following APIs:
void bioInit(void); //Initialize BIO
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);   //Create a new BIO job
unsigned long long bioPendingJobsOfType(int type);  //Obtain the current BIO job type, the number of pending jobs in the queue
unsigned long long bioWaitStepOfType(int type); //Congestion, wait for the execution of a BIO job type, return the number of pending jobs
void bioKillThreads(void); //Kill all BIO processes

BIO operation types:

/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Close file*/
#define BIO_AOF_FSYNC     1 /* AOF writing */
#define BIO_LAZY_FREE     2 /* Release object */
#define BIO_NUM_OPS       3 /*BIO count*/

BIO objects:

static pthread_t bio_threads[BIO_NUM_OPS];  //BIO thread
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; //Mutex lock variable of every BIO thread
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; //Conditional variable of BIO thread lock, listening to the current conditional variable to waken the current thread
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; //BIO thread congestion lock, bioWaitStepOfType listens to this conditional variable noticed for execution of this operation.
static list *bio_jobs[BIO_NUM_OPS];
static unsigned long long bio_pending[BIO_NUM_OPS]; // Pending BIO operations

Let's first take a look at the executed operations at initialization:
bioInit() {
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { //Initialize thread
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}


The main functions are divided into two parts:
1. bioCreateBackgroundJob:  Create a BIO job, insert bio_jobs, call pthread_cond_signal to notice the process to unlock.
2. bioProcessBackgroundJobs:  Execute BIO jobs.  Pthread is used to manage process locks in the thread. When bioCreateBackgroundJob executes pthread_cond_signal to notice the thread for the job, the previous job is read from bio_jobs and executed.

bioCreateBackgroundJob
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]); // Lock to protect bio_jobs and bio_pending consistency
    listAddNodeTail(bio_jobs[type],job);  //Insert to the job queue
    bio_pending[type]++;  
    pthread_cond_signal(&bio_newjob_cond[type]);  //Notice the process thread to execute the job
    pthread_mutex_unlock(&bio_mutex[type]);  //Unlock
}

bioProcessBackgroundJobs

void *bioProcessBackgroundJobs(void *arg) {
    // Enable the process to be manually killed
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    // Lock to ensure that two processes won’t use pthread_cond_wait to listen to the same lock
    pthread_mutex_lock(&bio_mutex[type]);
    while(1) {
        listNode *ln;
        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); // Wait for bioCreateBackgroundJob to notice unlocking
            continue;
        }
        //Get the first job in the queue
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]); //Unlock
        // Execute jobs according to the type
        // do somethings...
        pthread_cond_broadcast(&bio_step_cond[type]); // Broadcast unlocking, used to unlock bioWaitStepOfType and lift the congestion.
        pthread_mutex_lock(&bio_mutex[type]); // Lock the following operations, and used for pthread_cond_wait congestion of the next loop.
        listDelNode(bio_jobs[type],ln);  // The bio_jobs and bio_pending  operations indicate the job has been completed.
        bio_pending[type]--;
    }
}


Pthread
The entire BIO is the congestion background IO through locks.  Let's go over the lock process:
1. bioInit, create a new thread, execute bioProcessBackgroundJobs.
2. In bioProcessBackgroundJobs, use pthread_mutex_lock(&bio_mutex[type]) to lock the lock variable of the job.
3. Enter while loop, call pthread_cond_wait and wait for unlocking.  Since mutex lock is “sleep-lock”, the thread will sleep and wait to be wakened.
4. Called by the main thread to create BIO job, by calling bioCreateBackgroundJob.
5. In bioCreateBackgroundJob, use pthread_mutex_lock(&bio_mutex[type]); to lock bio_mutex[type] again
6. In bioCreateBackgroundJob, pthread_cond_signal(&bio_newjob_cond[type]) //Send the signal to notice BIO thread to continue the execution.
7. In bioCreateBackgroundJob, pthread_mutex_unlock(&bio_mutex[type]); //Unlock
8. bioProcessBackgroundJobs is wakened and continues the execution.
9. After the job execution is complete, use pthread_mutex_unlock to unlock and pthread_cond_broadcast to broadcast unlocking.
10. Use pthread_mutex_lock again to lock.  Used for next while loop.
During the process, I found a strange thing: in Step 2 we lock BIO thread, and in Step 5, we call bioCreateBackgroundJob to lock mutex again in the main thread.  But there is no pthread_mutex_unlock execution between them.  Why wasn't bioCreateBackgroundJob congested by the mutex lock?
The key lies in the pthread_cond_wait function.  From my previous understanding, pthread_cond_wait might only perform a signal wait, and when the signal arrives, it unlocks mutex[type].  That is why pthread_mutex_lock didn't congest the bioCreateBackgroundJob in the main thread before the signal was sent.  So I guess pthread_cond_wait is not just a signal wait, but an unlock+wait.
Here is pthread_cond_wait in glibc:
// line 93
int __pthread_cond_wait (cond, mutex)
// line 110
  err = __pthread_mutex_unlock_usercnt (mutex, 0);  //Unlock mutex
do {
// line 155    
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);  // wait signal
} while (val == seq || cond->__data.__woken_seq == val);
// line 193
return __pthread_mutex_cond_lock (mutex);


We can see that pthread_cond_wait is actually a process of Unlock -> Wait -> Lock.
[Blanche edited the post at Sep 5, 2016 16:31 PM]
Guest