Blanche
Engineer
Engineer
  • UID619
  • Fans3
  • Follows2
  • Posts59
Reads:2402Replies:0

[Share]GCD source code analysis

Created#
More Posted time:Oct 26, 2016 16:13 PM
Background
Recently when viewing the React Native code, I found the difference between the Main Queue and Main Thread was mentioned. I have long been eager to read the source code of GCD and now the chance has finally come.
Conclusion goes first: the Main Thread and the Main Queue are two totally different things.
• Main Queue IS bound to Main Thread.
• Main Thread IS NOT bound to Main Queue.
Source code profiling
Key points
Relationship between the queue and the thread


JUST REMEMBER THIS PNG!!
slowpath vs fastpath
Maybe you are familiar with __builtin_expect which is a function used by the compiler to optimize the execution speed.
When programmers are writing the IF conditions, they may have an idea of the more possible results, so they can use the fastpath or slowpath for compile optimization.
• The fastpath indicates the condition is more likely to be true.
• The slowpath indicates the condition is less likely to be true.
Simply put, when we encounter this, we can just ignore it, as it will not affect our understanding the code.
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))


Function vs block
GCD supports the execution of functions and blocks. It provides two methods to support function and block to join the queue. A simple example is as follows. So when we see functions of the same name with or without “f”, we presume they are the same function.
The underlying execution of blocks calls the function for execution.
// Execute the block
void dispatch_async(dispatch_queue_t dq, void (^work)(void));
// Execute the function
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func);


Definition of frequently-used macros
The libdispatch uses a large amount of macros to maximize the code readability while ensuring the performance.
Therefore, before learning more about the struct, we should understand several macro definitions for the convenience of code analysis that follows.
// The dispatch struct definition macro
#define DISPATCH_DECL(name) typedef struct name##_s *name##_t
// The os object header definition macro
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt)
  isa               // isa
  ref_cnt           // Reference counter. This is the counter used inside the GCD.
  xref_cnt          // External referee counter. This is the counter used outside the GCD.  Dispose is only available when both are 0.
// The dispatch struct header
#define DISPATCH_STRUCT_HEADER(x)
  _OS_OBJECT_HEADER
  do_next                             // The next do
  do_targetq                          // The target queue
  do_ctxt                             // The context of do
  do_finalizer                        // The function called at do destruction
  do_suspend_cnt                      // The suspend counter, used to suspend the flag
// The dispatch continuation header
#define DISPATCH_CONTINUATION_HEADER(x)
  _OS_OBJECT_HEADER
  do_next                                // The next do
  dc_func                                // The DC-encapsulated function
  dc_ctxt                                // The DC-encapsulated context
  dc_data                                // The DC-encapsulated data
  dc_other                               // Other DC-encapsulated information
// The dispatch queue header
#define DISPATCH_QUEUE_HEADER
  dq_running;                           // The number of tasks run in the queue
  dq_width;                             // The number of concurrency in the queue
  dq_items_tail;                        // The ending node of the queue
  dq_items_head;                        // The starting node of the queue
  dq_serialnum                          // The queue serial number. Every queue has a unique serial number.
  dq_specific_q                         //
// The dispatch vtable header
#define DISPATCH_VTABLE_HEADER(x)
  do_type                               // The do type
  do_kind                               // Type, such as:semaphore/group/queue...
  do_debug                              // The debug callback
  do_invoke                             // The invoke callback to awake the queue callback
  do_probe                              // The probe callback, important
  do_dispose                            // The dispose callback, the method to destruct the queue.


GCD struct
This article focuses on the implementation of the queue, so other structs are skipped for the time being. If you are interested, you can download the source code for further study.
os_object_t
A system base class
typedef struct _os_object_s {
  _OS_OBJECT_HEADER(
  const _os_object_class_s *os_obj_isa,  
  os_obj_ref_cnt,                        
  os_obj_xref_cnt);                      
} _os_object_s;


dispatch_object_t
This struct can be regarded as a base class of GCD.
typedef union {
  struct _os_object_s *_os_obj;
  struct dispatch_object_s *_do;
  struct dispatch_continuation_s *_dc;
  struct dispatch_queue_s *_dq;
  struct dispatch_queue_attr_s *_dqa;
  struct dispatch_group_s *_dg;
  struct dispatch_source_s *_ds;
  struct dispatch_source_attr_s *_dsa;
  struct dispatch_semaphore_s *_dsema;
  struct dispatch_data_s *_ddata;
  struct dispatch_io_s *_dchannel;
  struct dispatch_operation_s *_doperation;
  struct dispatch_disk_s *_ddisk;
} dispatch_object_t __attribute__((__transparent_union__));


Through the struct definition above, we can discover that the dispatch_object_t can be of any type of the union struct.
dispatch_continuation_t
This struct is mainly used to encapsulate the block and the function.
struct dispatch_continuation_s {
  DISPATCH_CONTINUATION_HEADER(continuation);
};


dispatch_queue_t
The queue struct may be the most important struct in GCD.
struct dispatch_queue_s {
  DISPATCH_STRUCT_HEADER(queue);
  DISPATCH_QUEUE_HEADER;
  char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // must be last
  char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD];   // for static queues only
};


dispatch_queue_attr_t
Queue attribute struct.
struct dispatch_queue_attr_s {
  DISPATCH_STRUCT_HEADER(queue_attr);
};


Frequently-used APIs
GCD offers many features to simplify writing code for multi-core devices. Here we will analyze the source code of frequently-used APIs.
dispatch_get_global_queue
dispatch_queue_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
  if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
    return NULL;
  }
  return _dispatch_get_root_queue(priority,
      flags & DISPATCH_QUEUE_OVERCOMMIT);
}


When we obtain the global queue for usage, we are actually obtaining the non-overcommitted pre-generated queue through _dispatch_get_root_queue.
The _dispatch_get_root_queue obtains the queue of the corresponding priority from the _dispatch_root_queues struct. The _dispatch_root_queues judges whether the queue is overcommitted and defines four priorities of queues. They can be found in the figure above. The 1 in 1bit at last represents overcommit. The overcommit interface is used to control whether the number of threads can exceed the number of physical cores. Apparently, the queue obtained through this interface won't create too many queues for the system.


DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY = 0,
  DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY,
  DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY,


At last, the thread implementation corresponding to _dispatch_root_queues is in the _dispatch_root_queue_contexts. Every context is a thread pool and the maximum thread count in every thread pool is 255.
dispatch_get_current_queue
dispatch_queue_t
dispatch_get_current_queue(void)
{
  return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
}

static inline dispatch_queue_t
_dispatch_queue_get_current(void)
{
  return (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
}


We can find that when we obtain the currently running queue through dispatch_get_current_queue, we determine the current queue through TSD (Thread Specific Data). Every time when we switch the queue, we set the current queue through _dispatch_thread_setspecific.
The TSD mainly involved here is also called TLD and is an interesting technology.
dispatch_queue_create
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
  dispatch_queue_t dq;
  size_t label_len;

  if (!label) {
    label = "";
  }

  label_len = strlen(label);
  if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
    label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
  }

  // XXX switch to malloc()
  dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
      sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE -
      DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1);

  _dispatch_queue_init(dq);
  strcpy(dq->dq_label, label);

  if (fastpath(!attr)) {
    return dq;
  }
  if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
    dq->dq_width = UINT32_MAX;
    dq->do_targetq = _dispatch_get_root_queue(0, false);
  } else {
    dispatch_debug_assert(!attr, "Invalid attribute");
  }
  return dq;
}

static inline void
_dispatch_queue_init(dispatch_queue_t dq)
{
  dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
  // Default target queue is overcommit!
  dq->do_targetq = _dispatch_get_root_queue(0, true);
  dq->dq_running = 0;
  dq->dq_width = 1;
  dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
}


When we call dispatch_queue_create to create a queue, it will first call _dispatch_queue_init to initialize a queue. The queue is of the Default priority (refer to the figure above). The dq_width is 1, that is, a serial queue, and the serial number is added with 1.
If it is a concurrent queue, the dq_width will be changed to UINT32_MAX, and the target queue is set to non-overcommitted. If the overcommit is set to true, it means you can create more threads than the number of the physical cores. Therefore, we can see that the number of threads for custom concurrent queues will not exceed the number of physical cores, but the serial queue is not subject to this limit.
We mentioned earlier that the serial number is added with 1. So what does the serial number do? There is such an annotation in the source code.
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9,10,11 - global queues
// we use 'xadd' on Intel, so the initial value == next assigned


We can find that when the serial number is 1, it indicates the main queue; when it is 2, it indicates the manager queue; 3 is not used; and 4 to 11 indicate the global queues. Queues that follow are user-defined queues.
Next, let's take a look at how queues of serial number 1 and serial number 2 are defined.
struct dispatch_queue_s _dispatch_main_q = {
  .do_vtable = DISPATCH_VTABLE(queue),
#if !DISPATCH_USE_RESOLVERS
  .do_targetq = &_dispatch_root_queues[
      DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
#endif
  .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
  .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
  .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
  .dq_label = "com.apple.main-thread",
  .dq_running = 1,
  .dq_width = 1,
  .dq_serialnum = 1,
};


It is worth noting that the target queue of the main queue is also an overcommitted queue of the default priority. It is in the general thread pool with nothing special.
struct dispatch_queue_s _dispatch_mgr_q = {
  .do_vtable = DISPATCH_VTABLE(queue_mgr),
  .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
  .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
  .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
  .do_targetq = &_dispatch_root_queues[
      DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
  .dq_label = "com.apple.libdispatch-manager",
  .dq_width = 1,
  .dq_serialnum = 2,
};


This queue is used to manage the tasks in the GCD, such as various sources.
dispatch_sync
void
dispatch_sync(dispatch_queue_t dq, void (^work)(void))
{
#if DISPATCH_COCOA_COMPAT
  if (slowpath(dq == &_dispatch_main_q)) {
    return _dispatch_sync_slow(dq, work);
  }
#endif
  struct Block_basic *bb = (void *)work;
  dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}


Whichever the route, we can find after further research that it will eventually take the dispatch_sync_f route, and implement block-to-function conversion through _dispatch_Block_copy or Block_basic.
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
  if (fastpath(dq->dq_width == 1)) {
    return dispatch_barrier_sync_f(dq, ctxt, func);
  }
  if (slowpath(!dq->do_targetq)) {
    // the global root queues do not need strict ordering
    (void)dispatch_atomic_add2o(dq, dq_running, 2);
    return _dispatch_sync_f_invoke(dq, ctxt, func);
  }
  _dispatch_sync_f2(dq, ctxt, func);
}


If the dq_width is 1, that is, the dp is a serial queue, it must wait until the preceding task is executed and completed to execute this task. So it will call dispatch_barrier_sync_f. The implementation of the barrier is ensured relying on the semaphore mechanism.
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
    dispatch_function_t func)
{
  // 1) ensure that this thread hasn't enqueued anything ahead of this call
  // 2) the queue is not suspended
  if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
    return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
  }
  if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
    // global queues and main queue bound to main thread always falls into
    // the slow case
    return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
  }
  if (slowpath(dq->do_targetq->do_targetq)) {
    return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
  }
  _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}


If an object exists in the current queue, or the current queue is in the suspension state, or the current queue has no running tasks, the slow path of _dispatch_barrier_sync_f_slow is taken. Otherwise, the _dispatch_barrier_sync_f_invoke is called directly. We can find that dispatch_sync is generally executed in the **current thread** with no thread switching. This deserves special attention. Only when the slow path is taken will thread switching be conducted. Next, let's take a look at what the slow path dispatching looks like.
static void
_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
    dispatch_function_t func)
{
  // It's preferred to execute synchronous blocks on the current thread
  // due to thread-local side effects, garbage collection, etc. However,
  // blocks submitted to the main thread MUST be run on the main thread

  struct dispatch_barrier_sync_slow2_s dbss2 = {
    .dbss2_dq = dq,
#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT
    .dbss2_func = func,
    .dbss2_ctxt = ctxt,
#endif
    .dbss2_sema = _dispatch_get_thread_semaphore(),
  };
  struct dispatch_barrier_sync_slow_s dbss = {
    .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
        DISPATCH_OBJ_SYNC_SLOW_BIT),
    .dc_func = _dispatch_barrier_sync_f_slow_invoke,
    .dc_ctxt = &dbss2,
  };
  _dispatch_queue_push(dq, (void *)&dbss);

  _dispatch_thread_semaphore_wait(dbss2.dbss2_sema);
  _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
  ......


The code is a little long. From the section at the beginning, we can find that semaphores are used to synchronize task execution and thread switching is required.
Let’s look at the _dispatch_function_invoke. Note that the relationship between the queue and the thread is not of a one-to-one correspondence. The queue is based on the thread. When we are using the queue, we should not operate on the thread to prevent exceptions.
static inline void
_dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
    dispatch_function_t func)
{
  dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
  _dispatch_thread_setspecific(dispatch_queue_key, dq);
  _dispatch_client_callout(ctxt, func);
  _dispatch_workitem_inc();
  _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
}


When we switch the thread, we will first change the TSD, then execute the block and re-apply the previous dq settings.
dispatch_async
void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
  dispatch_async_f(dq, _dispatch_Block_copy(work),
      _dispatch_call_block_and_release);
}


This is obvious. The block is directly converted to the function and then the xxx_f function is called.
void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
  dispatch_continuation_t dc;

  // No fastpath/slowpath hint because we simply don't know
  if (dq->dq_width == 1) {
    return dispatch_barrier_async_f(dq, ctxt, func);
  }

  dc = fastpath(_dispatch_continuation_alloc_cacheonly());
  if (!dc) {
    return _dispatch_async_f_slow(dq, ctxt, func);
  }

  dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
  dc->dc_func = func;
  dc->dc_ctxt = ctxt;

  // No fastpath/slowpath hint because we simply don't know
  if (dq->do_targetq) {
    return _dispatch_async_f2(dq, dc);
  }

  _dispatch_queue_push(dq, dc);
}


The several examples above clarify the logic. If the dq is a serial queue, the dispatch_barrier_async_f path is taken. Otherwise, a dispatch_continuation object is created to store the function and put it at the end of the queue. The two paths actually achieve the same result, that is, to create a DC object and put it at the end of the queue. The only difference lies in
// barrier
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);

// not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;


By now we know that the GCD queue barrier wait is implemented by the DISPATCH_OBJ_BARRIER_BIT bit flag. There are the following types of flags:
#define DISPATCH_OBJ_ASYNC_BIT    0x1       // Asynchronous
#define DISPATCH_OBJ_BARRIER_BIT  0x2       // Barrier
#define DISPATCH_OBJ_GROUP_BIT    0x4       // Group
#define DISPATCH_OBJ_SYNC_SLOW_BIT  0x8     // Slow synchronization


Understanding this helps a lot to read the source code.
Next comes a very important part. We have indeed put the DC to the queue, but when will the DC be executed? Normally a task will be triggered after the preceding task is completed. We can look at several functions.
// Synchronous jobs execution
static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
    dispatch_function_t func)
{
  _dispatch_function_invoke(dq, ctxt, func);
  if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
    _dispatch_wakeup(dq);
  }
}

// Asynchronous jobs execution
static void
_dispatch_async_f_redirect_invoke(void *_ctxt)
{
  struct dispatch_continuation_s *dc = _ctxt;
  struct dispatch_continuation_s *other_dc = dc->dc_other;
  dispatch_queue_t old_dq, dq = dc->dc_data, rq;

  old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
  _dispatch_thread_setspecific(dispatch_queue_key, dq);
  _dispatch_continuation_pop(other_dc);
  _dispatch_thread_setspecific(dispatch_queue_key, old_dq);

  rq = dq->do_targetq;
  while (slowpath(rq->do_targetq) && rq != old_dq) {
    if (dispatch_atomic_sub2o(rq, dq_running, 2) == 0) {
      _dispatch_wakeup(rq);
    }
    rq = rq->do_targetq;
  }

  if (dispatch_atomic_sub2o(dq, dq_running, 2) == 0) {
    _dispatch_wakeup(dq);
  }
  _dispatch_release(dq);
}


We can find that no matter whether it is synchronous jobs execution or asynchronous jobs execution, it will finally call _dispatch_wakeup. The role of this function is to trigger the next task.
dispatch_queue_t
_dispatch_wakeup(dispatch_object_t dou)
{
  dispatch_queue_t tq;

  if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
    return NULL;
  }
  if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
    return NULL;
  }

  // _dispatch_source_invoke() relies on this testing the whole suspend count
  // word, not just the lock bit. In other words, no point taking the lock
  // if the source is suspended or canceled.
  if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
      DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT || DISPATCH_LINUX_COMPAT
    if (dou._dq == &_dispatch_main_q) {
      return _dispatch_queue_wakeup_main();
    }
#endif
    return NULL;
  }
  dispatch_atomic_acquire_barrier();
  _dispatch_retain(dou._do);
  tq = dou._do->do_targetq;
  _dispatch_queue_push(tq, dou._do);
  return tq;  // libdispatch does not need this, but the Instrument DTrace
        // probe does
}


At the beginning, I didn't understand how it works. There is a very unimpressive macro dx_probe. Its definition is (x)->do_vtable->do_probe(x). I searched for do_probe and found its definition can be found in init.c. Our discussions have been basically centered on the root_queue, so we only look at the root_queue.
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_root, queue,
  .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
  .do_kind = "global-queue",
  .do_debug = dispatch_queue_debug,
  .do_probe = _dispatch_queue_probe_root,
);

bool
_dispatch_queue_probe_root(dispatch_queue_t dq)
{
  _dispatch_queue_wakeup_global2(dq, 1);
  return false;
}

static inline void
_dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
{
  struct dispatch_root_queue_context_s *qc = dq->do_ctxt;

  if (!dq->dq_items_tail) {
    return;
  }
#if HAVE_PTHREAD_WORKQUEUES
  if (
#if DISPATCH_USE_PTHREAD_POOL
      (qc->dgq_kworkqueue != (void*)(~0ul)) &&
#endif
      !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n)) {
    _dispatch_debug("work thread request still pending on global queue: "
        "%p", dq);
    return;
  }
#endif // HAVE_PTHREAD_WORKQUEUES
  return  _dispatch_queue_wakeup_global_slow(dq, n);
}


I am not pasting the code for _dispatch_queue_wakeup_global_slow here. This function will send messages to the producer to trigger the execution of the next job, and create a thread as necessary. It is not very explicit here, but we can make a close guess about its principle.
Conclusion
After reading the code for several days, I finally have an inkling about it. I want to express my gratitude to some bloggers who provided instructions on key information, saving me a lot of headache. I've recorded it here for future reference.
Guest