[plt-scheme] Efficient asynchronous communication primitives [PATCH]

From: Dimitris Vyzovitis (vyzo at media.mit.edu)
Date: Sun Mar 16 11:55:21 EDT 2008

Working with multi-threaded and distributed code relies heavily in
asynchronouscommunication.
mzscheme offers 3 main constructs: semaphores, channels, and
async-channels.
semaphores are the lowest level of primitives, while channels are the
basicsynchronous communication primitive. async-channels are built on top of
channels using an additional synchronizer thread.

Unfortunately, they all suffer: semaphores are too low-level and require
quite a bit of work to be usable for communication, channels are
synchronous, and async-channels are intolerably slow.

It doesn't have to be this way. The attached patch implements asynchronous
thread mailboxes, with 2 main primitives (thread-send and thread-receive,
similar to gambit's equinamed primitives) and 2 auxiliary primitives
(thread-try-receive and thread-receive-evt). The patch applies to revision
8983 from last night.

(thread-send <thread> <any>)
   places a value into a thread's mailbox and (if necessary)
   notifies the target thread.
   If the target thread is the current-thread, it simply buffers a
   self-message.
(thread-receive)
   blocks the current thread until a message is available
(thread-try-receive)
   non-block version of thread-receive, returns #f if no message is
   available
(thread-receive-evt)
   a synchronization event that is ready when thread-receive would not block

Sending a message works about as fast as posting a semaphore
-- the primitives are implemented with semaphores internally after all.

Note : the primitive count in schminc.h has changed, as has the
mzmarksrc.c template

Some results from simple ubenchmarks (see the attached mbox-test.ss) that
nonetheless capture the primitives follow below.

The times are in ms as reported from time-apply for 5 runs at 1M
iterations + the median in us/iteration.
The host is a 2.2GHz core duo running gnu.

'Assymetric' is a scenario with a producer thread sending [posting] and
a consumer thread receiving [waiting].
'Symmetric' is a ping-pong initiated by one of the two threads.
the sema-basline test is (call/semaphore void)
the context switch baseline are two threads doing (sleep)

--------------------------------------------------------
Welcome to MzScheme v3.99.0.18 [3m], Copyright (c) 2004-2008 PLT Scheme
Inc.
> (require "mbox-test.ss")
> (run-set baseline)
baseline 1000000
--------------------
sema-baseline
   ((276 278 92) (188 187 0) (188 188 8) (188 189 0) (188 192 4))
   0.188 usec/iter
context-switch-baseline
   ((2084 2084 0) (2092 2089 0) (2088 2089 0) (2096 2098 0) (2108 2108 0))
   2.092 usec/iter
> (run-set asymmetric)
asymmetric 1000000
--------------------
sema-asymmetric
   ((68 65 0) (64 65 0) (64 65 0) (64 65 0) (68 67 0))
   0.064 usec/iter
mbox-asymmetric
   ((172 170 104) (92 93 32) (116 114 52) (140 139 80) (116 119 56))
   0.116 usec/iter
channel-asymmetric
   ((2180 2181 56) (2140 2138 20) (2144 2143 24) (2129 2131 20) (2224 2224
12))
   2.144 usec/iter
async-channel-asymmetric
   ((11168 11170 452) (10989 10987 308) (11249 11249 520) (11232 11234
464) (11285 11283 408))
   11.232 usec/iter
> (run-set symmetric)
symmetric 1000000
--------------------
sema-symmetric
   ((1988 1986 0) (2008 2008 8) (2012 2014 4) (1988 1989 4) (2157 2157 8))
   2.008 usec/iter
mbox-symmetric
   ((2036 2035 8) (2001 2001 20) (2020 2021 20) (2112 2113 104) (2120 2120 8))
   2.036 usec/iter
channel-symmetric
   ((4408 4408 108) (4384 4381 80) (4453 4455 92) (4472 4471 100) (4620 4619 80))
   4.453 usec/iter
async-channel-symmetric
   ((31250 31248 868) (31242 31241 824) (31102 31101 820) (31214 31211 824) (31250 31250 772))
   31.242 usec/iter

-- vyzo
-------------- next part --------------
Index: src/mzscheme/include/scheme.h
===================================================================
--- src/mzscheme/include/scheme.h	(revision 8983)
+++ src/mzscheme/include/scheme.h	(working copy)
@@ -1066,6 +1066,10 @@
 
   Scheme_Object *name;
 
+  Scheme_Object *mbox_first;
+  Scheme_Object *mbox_last;
+  Scheme_Object *mbox_sema;
+
 #ifdef MZ_PRECISE_GC
   int gc_owner_set;
 #endif
Index: src/mzscheme/src/thread.c
===================================================================
--- src/mzscheme/src/thread.c	(revision 8983)
+++ src/mzscheme/src/thread.c	(working copy)
@@ -2176,7 +2176,7 @@
   process->list_stack = NULL;
 
   scheme_gmp_tls_init(process->gmp_tls);
-
+  
   if (prefix) {
     process->next = scheme_first_thread;
     process->prev = NULL;
@@ -2244,6 +2244,21 @@
   
   process->nester = process->nestee = NULL;
 
+  process->mbox_first = NULL;
+  process->mbox_last = NULL;
+  process->mbox_sema = NULL;
+  /* this ensures the process has all members initialized before allocating
+     memory for the mbox semaphore (maybe unecessary) */
+  process->mref = NULL;
+  process->extra_mrefs = NULL;
+  {
+    Scheme_Object *sema = NULL;
+    sema = scheme_make_sema(0); 
+    process->mbox_sema = sema;
+  }
+
+    
+
   /* A thread points to a lot of stuff, so it's bad to put a finalization
      on it, which is what registering with a custodian does. Instead, we
      register a weak indirection with the custodian. That way, the thread
@@ -2586,6 +2601,10 @@
   r->error_buf = NULL;
 
   r->spare_runstack = NULL;
+
+  r->mbox_first = NULL;
+  r->mbox_last = NULL;
+  r->mbox_sema = NULL;
 }
 
 static void remove_thread(Scheme_Thread *r)
Index: src/mzscheme/src/mzmarksrc.c
===================================================================
--- src/mzscheme/src/mzmarksrc.c	(revision 8983)
+++ src/mzscheme/src/mzmarksrc.c	(working copy)
@@ -692,6 +692,9 @@
   gcMARK(pr->dead_box);
   gcMARK(pr->running_box);
 
+  gcMARK(pr->mbox_first);
+  gcMARK(pr->mbox_last);
+  gcMARK(pr->mbox_sema);
  size:
   gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
 }
Index: src/mzscheme/src/sema.c
===================================================================
--- src/mzscheme/src/sema.c	(revision 8983)
+++ src/mzscheme/src/sema.c	(working copy)
@@ -38,6 +38,11 @@
 static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
 static Scheme_Object *channel_p(int n, Scheme_Object **p);
 
+static Scheme_Object *thread_send(int n, Scheme_Object **p);
+static Scheme_Object *thread_receive(int n, Scheme_Object **p);
+static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
+static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
+
 static Scheme_Object *make_alarm(int n, Scheme_Object **p);
 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
 
@@ -136,6 +141,28 @@
 						      1, 1, 1), 
 			     env);  
 
+  scheme_add_global_constant("thread-send", 
+			     scheme_make_prim_w_arity(thread_send,
+						      "thread-send", 
+						      2, 2), 
+			     env);
+  scheme_add_global_constant("thread-receive", 
+			     scheme_make_prim_w_arity(thread_receive,
+						      "thread-receive", 
+						      0, 0), 
+			     env);
+  scheme_add_global_constant("thread-try-receive", 
+			     scheme_make_prim_w_arity(thread_try_receive,
+						      "thread-try-receive", 
+						      0, 0), 
+			     env);
+  scheme_add_global_constant("thread-receive-evt", 
+			     scheme_make_prim_w_arity(thread_receive_evt,
+						      "thread-receive-evt", 
+						      0, 0), 
+			     env);
+
+
   scheme_add_global_constant("alarm-evt", 
 			     scheme_make_prim_w_arity(make_alarm,
 						      "alarm-evt",
@@ -976,6 +1003,87 @@
 }
 
 /**********************************************************************/
+/*                           Thread mbox                              */
+/**********************************************************************/
+static void mbox_push( Scheme_Thread *p, Scheme_Object *o) {
+  /* check semaphore overflow before pushing */
+  if ((((Scheme_Sema *)p->mbox_sema)->value + 1) > ((Scheme_Sema *)p->mbox_sema)->value) {
+    Scheme_Object *next = NULL;
+    next = scheme_alloc_object();
+    next->type = scheme_raw_pair_type;
+    SCHEME_CAR(next) = o;
+    SCHEME_CDR(next) = NULL;
+
+    if (p->mbox_first) {
+      SCHEME_CDR(p->mbox_last) = next;
+      p->mbox_last = next;
+    } else {
+      p->mbox_first = next;
+      p->mbox_last = next;
+    }
+    
+    if (((Scheme_Sema*)p->mbox_sema)->value 
+        || (p == scheme_current_thread)) {
+      /* Avoid scheduler overhead when the semaphore is already up
+         or this is a self-send */
+      ++((Scheme_Sema*)p->mbox_sema)->value;
+    } else {
+      scheme_post_sema(p->mbox_sema);
+    }
+
+  } else scheme_raise_exn(MZEXN_FAIL, "thread-send: mailbox overflow");
+
+}
+
+static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec ) {
+  /* assertion: mbox_first != NULL */
+  Scheme_Object *r = NULL;
+  r = SCHEME_CAR(p->mbox_first);
+  p->mbox_first = SCHEME_CDR(p->mbox_first);
+  if (!p->mbox_first)
+    p->mbox_last = NULL;
+  if (dec) (--((Scheme_Sema*)p->mbox_sema)->value);
+  return r;
+}
+
+static Scheme_Object *thread_send(int argc, Scheme_Object **argv) {
+  if (SCHEME_THREADP(argv[0])) {
+    int running;
+    running = ((Scheme_Thread*)argv[0])->running;
+    if (MZTHREAD_STILL_RUNNING(running)) {
+      mbox_push((Scheme_Thread*)argv[0], argv[1]);
+      return scheme_void;
+    } else scheme_raise_exn(MZEXN_FAIL, "thread-send: thread is not running");
+  } else scheme_wrong_type("thread-send", "thread", 0, argc, argv);
+  /* unreachable, but compiler complains */
+  return scheme_void;
+}
+
+/* The mbox semaphore can only be downed by the current thread, so
+   receive/try-receive can directly dec+pop without syncing 
+   (by calling mbox_pop with dec=1)
+*/
+static Scheme_Object *thread_receive(int argc, Scheme_Object **argv) {
+  if (scheme_current_thread->mbox_first) {
+    return mbox_pop(scheme_current_thread, 1);
+  } else {
+    scheme_wait_sema(scheme_current_thread->mbox_sema, 0);
+    return mbox_pop(scheme_current_thread, 0);
+  }
+}
+
+static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv) {
+  if (scheme_current_thread->mbox_first) {
+    return mbox_pop(scheme_current_thread, 1);
+  } else return scheme_false;
+}
+
+static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv) {
+  return scheme_make_sema_repost( scheme_current_thread->mbox_sema );
+}
+
+
+/**********************************************************************/
 /*                             alarms                                 */
 /**********************************************************************/
 
Index: src/mzscheme/src/schminc.h
===================================================================
--- src/mzscheme/src/schminc.h	(revision 8983)
+++ src/mzscheme/src/schminc.h	(working copy)
@@ -13,7 +13,7 @@
 
 #define USE_COMPILED_STARTUP 1
 
-#define EXPECTED_PRIM_COUNT 899
+#define EXPECTED_PRIM_COUNT 903
 
 #ifdef MZSCHEME_SOMETHING_OMITTED
 # undef USE_COMPILED_STARTUP
-------------- next part --------------
;; communication primitives micro-benchmarks for mzscheme
;; author: vyzo at media.mit.edu
#lang scheme
(require scheme/async-channel srfi/26)
(provide (all-defined-out))

(define-syntax dotimes 
  (syntax-rules ()
    ((_ k e es ...)
     (do ((n 0 (add1 n)))
         ((= n k))
       e es ...))))

(define-syntax thread-wait*
  (syntax-rules ()
    ((_ thr ...)
     (begin (thread-wait thr) ...))))

(define (time-it f (n 3))
  (let lp ((r null) (n n))
    (if (zero? n) 
      r
      (call-with-values 
        (lambda () (time-apply f '()))
        (lambda (x . rs)
          (lp (cons rs r) (sub1 n)))))))

(define (mbox-asymmetric k)
  (define (send peer)
    (dotimes k (thread-send peer 'hello)))
  (define (recv)
    (dotimes k (thread-receive)))
  (let* ((r (thread recv))
         (s (thread (cut send r))))
    (thread-wait* s r)))

(define (mbox-symmetric k)
  (define (send)
    (let ((peer (thread-receive)))
      (dotimes k (thread-send peer 'hello) (thread-receive))))
  (define (recv peer)
    (dotimes k (thread-receive) (thread-send peer 'hello)))
  (let* ((s (thread send))
         (r (thread (cut recv s))))
    (thread-send s r)
    (thread-wait* s r)))

(define (sema-asymmetric k)
  (define (recv sema)
    (dotimes k (semaphore-wait sema)))
  (define (send sema)
    (dotimes k (semaphore-post sema)))
  (let* ((sema (make-semaphore))
         (r (thread (lambda () (recv sema))))
         (s (thread (lambda () (send sema)))))
    (thread-wait* s r)))

(define (sema-symmetric k)
  (define (recv in out)
    (dotimes k (semaphore-wait in) (semaphore-post out)))
  (define (send in out)
    (dotimes k (semaphore-post out) (semaphore-wait in)))
  (let* ((s1 (make-semaphore))
         (s2 (make-semaphore))
         (r (thread (lambda () (recv s1 s2))))
         (s (thread (lambda () (send s2 s1)))))
    (thread-wait* s r)))

(define (sema-baseline k)
  (let ((sema (make-semaphore 1)))
    (dotimes k (call-with-semaphore sema void))))

(define (channel-asymmetric k)
  (define (recv ch)
    (dotimes k (channel-get ch)))
  (define (send ch)
    (dotimes k (channel-put ch 'hello)))
  (let* ((ch (make-channel))
         (r (thread (lambda () (recv ch))))
         (s (thread (lambda () (send ch)))))
    (thread-wait* s r)))

(define (channel-symmetric k)
  (define (recv in out)
    (dotimes k (channel-get in) (channel-put out 'hello)))
  (define (send in out)
    (dotimes k (channel-put out 'hello) (channel-get in)))
  (let* ((ch1 (make-channel))
         (ch2 (make-channel))
         (r (thread (lambda () (recv ch1 ch2))))
         (s (thread (lambda () (send ch2 ch1)))))
    (thread-wait* s r)))

(define (async-channel-asymmetric k)
  (define (recv ch)
    (dotimes k (async-channel-get ch)))
  (define (send ch)
    (dotimes k (async-channel-put ch 'hello)))
  (let* ((ch (make-async-channel))
         (r (thread (lambda () (recv ch))))
         (s (thread (lambda () (send ch)))))
    (thread-wait* s r)))

(define (async-channel-symmetric k)
  (define (recv in out)
    (dotimes k (async-channel-get in) (async-channel-put out 'hello)))
  (define (send in out)
    (dotimes k (async-channel-put out 'hello) (async-channel-get in)))
  (let* ((ch1 (make-async-channel))
         (ch2 (make-async-channel))
         (r (thread (lambda () (recv ch1 ch2))))
         (s (thread (lambda () (send ch2 ch1)))))
    (thread-wait* s r)))

(define (context-switch-baseline k)
  (define (runner)
    (dotimes k (sleep)))
  (let ((t1 (thread runner))
        (t2 (thread runner)))
    (thread-wait* t1 t2)))

(define symmetric 
  (list 
   'symmetric 
   sema-symmetric mbox-symmetric channel-symmetric async-channel-symmetric))
(define asymmetric
  (list 
   'asymmetric 
   sema-asymmetric mbox-asymmetric channel-asymmetric async-channel-asymmetric))

(define baseline
  (list 'baseline sema-baseline context-switch-baseline))

(define (median lst k n)
  (/ (* 1000 
        (exact->inexact 
         (list-ref (sort lst <) (if (even? n) (/ n 2) (/ (sub1 n) 2)))))
     k))

(define (go test k (n 3))
  (printf "~a~n" (object-name test))
  (let ((r (time-it (lambda () (test k)) n)))
    (printf "   ~a~n" r)
    (printf "   ~a usec/iter~n" (median (map car r) k n))))

(define (run-set set (k 1000000) (n 5))
  (printf "~a ~a~n" (car set) k)
  (printf "--------------------~n")
  (for-each (cut go <> k n) (cdr set)))

Posted on the users mailing list.