[plt-scheme] Efficient asynchronous communication primitives [PATCH]
Working with multi-threaded and distributed code relies heavily in
asynchronouscommunication.
mzscheme offers 3 main constructs: semaphores, channels, and
async-channels.
semaphores are the lowest level of primitives, while channels are the
basicsynchronous communication primitive. async-channels are built on top of
channels using an additional synchronizer thread.
Unfortunately, they all suffer: semaphores are too low-level and require
quite a bit of work to be usable for communication, channels are
synchronous, and async-channels are intolerably slow.
It doesn't have to be this way. The attached patch implements asynchronous
thread mailboxes, with 2 main primitives (thread-send and thread-receive,
similar to gambit's equinamed primitives) and 2 auxiliary primitives
(thread-try-receive and thread-receive-evt). The patch applies to revision
8983 from last night.
(thread-send <thread> <any>)
places a value into a thread's mailbox and (if necessary)
notifies the target thread.
If the target thread is the current-thread, it simply buffers a
self-message.
(thread-receive)
blocks the current thread until a message is available
(thread-try-receive)
non-block version of thread-receive, returns #f if no message is
available
(thread-receive-evt)
a synchronization event that is ready when thread-receive would not block
Sending a message works about as fast as posting a semaphore
-- the primitives are implemented with semaphores internally after all.
Note : the primitive count in schminc.h has changed, as has the
mzmarksrc.c template
Some results from simple ubenchmarks (see the attached mbox-test.ss) that
nonetheless capture the primitives follow below.
The times are in ms as reported from time-apply for 5 runs at 1M
iterations + the median in us/iteration.
The host is a 2.2GHz core duo running gnu.
'Assymetric' is a scenario with a producer thread sending [posting] and
a consumer thread receiving [waiting].
'Symmetric' is a ping-pong initiated by one of the two threads.
the sema-basline test is (call/semaphore void)
the context switch baseline are two threads doing (sleep)
--------------------------------------------------------
Welcome to MzScheme v3.99.0.18 [3m], Copyright (c) 2004-2008 PLT Scheme
Inc.
> (require "mbox-test.ss")
> (run-set baseline)
baseline 1000000
--------------------
sema-baseline
((276 278 92) (188 187 0) (188 188 8) (188 189 0) (188 192 4))
0.188 usec/iter
context-switch-baseline
((2084 2084 0) (2092 2089 0) (2088 2089 0) (2096 2098 0) (2108 2108 0))
2.092 usec/iter
> (run-set asymmetric)
asymmetric 1000000
--------------------
sema-asymmetric
((68 65 0) (64 65 0) (64 65 0) (64 65 0) (68 67 0))
0.064 usec/iter
mbox-asymmetric
((172 170 104) (92 93 32) (116 114 52) (140 139 80) (116 119 56))
0.116 usec/iter
channel-asymmetric
((2180 2181 56) (2140 2138 20) (2144 2143 24) (2129 2131 20) (2224 2224
12))
2.144 usec/iter
async-channel-asymmetric
((11168 11170 452) (10989 10987 308) (11249 11249 520) (11232 11234
464) (11285 11283 408))
11.232 usec/iter
> (run-set symmetric)
symmetric 1000000
--------------------
sema-symmetric
((1988 1986 0) (2008 2008 8) (2012 2014 4) (1988 1989 4) (2157 2157 8))
2.008 usec/iter
mbox-symmetric
((2036 2035 8) (2001 2001 20) (2020 2021 20) (2112 2113 104) (2120 2120 8))
2.036 usec/iter
channel-symmetric
((4408 4408 108) (4384 4381 80) (4453 4455 92) (4472 4471 100) (4620 4619 80))
4.453 usec/iter
async-channel-symmetric
((31250 31248 868) (31242 31241 824) (31102 31101 820) (31214 31211 824) (31250 31250 772))
31.242 usec/iter
-- vyzo
-------------- next part --------------
Index: src/mzscheme/include/scheme.h
===================================================================
--- src/mzscheme/include/scheme.h (revision 8983)
+++ src/mzscheme/include/scheme.h (working copy)
@@ -1066,6 +1066,10 @@
Scheme_Object *name;
+ Scheme_Object *mbox_first;
+ Scheme_Object *mbox_last;
+ Scheme_Object *mbox_sema;
+
#ifdef MZ_PRECISE_GC
int gc_owner_set;
#endif
Index: src/mzscheme/src/thread.c
===================================================================
--- src/mzscheme/src/thread.c (revision 8983)
+++ src/mzscheme/src/thread.c (working copy)
@@ -2176,7 +2176,7 @@
process->list_stack = NULL;
scheme_gmp_tls_init(process->gmp_tls);
-
+
if (prefix) {
process->next = scheme_first_thread;
process->prev = NULL;
@@ -2244,6 +2244,21 @@
process->nester = process->nestee = NULL;
+ process->mbox_first = NULL;
+ process->mbox_last = NULL;
+ process->mbox_sema = NULL;
+ /* this ensures the process has all members initialized before allocating
+ memory for the mbox semaphore (maybe unecessary) */
+ process->mref = NULL;
+ process->extra_mrefs = NULL;
+ {
+ Scheme_Object *sema = NULL;
+ sema = scheme_make_sema(0);
+ process->mbox_sema = sema;
+ }
+
+
+
/* A thread points to a lot of stuff, so it's bad to put a finalization
on it, which is what registering with a custodian does. Instead, we
register a weak indirection with the custodian. That way, the thread
@@ -2586,6 +2601,10 @@
r->error_buf = NULL;
r->spare_runstack = NULL;
+
+ r->mbox_first = NULL;
+ r->mbox_last = NULL;
+ r->mbox_sema = NULL;
}
static void remove_thread(Scheme_Thread *r)
Index: src/mzscheme/src/mzmarksrc.c
===================================================================
--- src/mzscheme/src/mzmarksrc.c (revision 8983)
+++ src/mzscheme/src/mzmarksrc.c (working copy)
@@ -692,6 +692,9 @@
gcMARK(pr->dead_box);
gcMARK(pr->running_box);
+ gcMARK(pr->mbox_first);
+ gcMARK(pr->mbox_last);
+ gcMARK(pr->mbox_sema);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Thread));
}
Index: src/mzscheme/src/sema.c
===================================================================
--- src/mzscheme/src/sema.c (revision 8983)
+++ src/mzscheme/src/sema.c (working copy)
@@ -38,6 +38,11 @@
static Scheme_Object *make_channel_put(int n, Scheme_Object **p);
static Scheme_Object *channel_p(int n, Scheme_Object **p);
+static Scheme_Object *thread_send(int n, Scheme_Object **p);
+static Scheme_Object *thread_receive(int n, Scheme_Object **p);
+static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
+static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
+
static Scheme_Object *make_alarm(int n, Scheme_Object **p);
static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
@@ -136,6 +141,28 @@
1, 1, 1),
env);
+ scheme_add_global_constant("thread-send",
+ scheme_make_prim_w_arity(thread_send,
+ "thread-send",
+ 2, 2),
+ env);
+ scheme_add_global_constant("thread-receive",
+ scheme_make_prim_w_arity(thread_receive,
+ "thread-receive",
+ 0, 0),
+ env);
+ scheme_add_global_constant("thread-try-receive",
+ scheme_make_prim_w_arity(thread_try_receive,
+ "thread-try-receive",
+ 0, 0),
+ env);
+ scheme_add_global_constant("thread-receive-evt",
+ scheme_make_prim_w_arity(thread_receive_evt,
+ "thread-receive-evt",
+ 0, 0),
+ env);
+
+
scheme_add_global_constant("alarm-evt",
scheme_make_prim_w_arity(make_alarm,
"alarm-evt",
@@ -976,6 +1003,87 @@
}
/**********************************************************************/
+/* Thread mbox */
+/**********************************************************************/
+static void mbox_push( Scheme_Thread *p, Scheme_Object *o) {
+ /* check semaphore overflow before pushing */
+ if ((((Scheme_Sema *)p->mbox_sema)->value + 1) > ((Scheme_Sema *)p->mbox_sema)->value) {
+ Scheme_Object *next = NULL;
+ next = scheme_alloc_object();
+ next->type = scheme_raw_pair_type;
+ SCHEME_CAR(next) = o;
+ SCHEME_CDR(next) = NULL;
+
+ if (p->mbox_first) {
+ SCHEME_CDR(p->mbox_last) = next;
+ p->mbox_last = next;
+ } else {
+ p->mbox_first = next;
+ p->mbox_last = next;
+ }
+
+ if (((Scheme_Sema*)p->mbox_sema)->value
+ || (p == scheme_current_thread)) {
+ /* Avoid scheduler overhead when the semaphore is already up
+ or this is a self-send */
+ ++((Scheme_Sema*)p->mbox_sema)->value;
+ } else {
+ scheme_post_sema(p->mbox_sema);
+ }
+
+ } else scheme_raise_exn(MZEXN_FAIL, "thread-send: mailbox overflow");
+
+}
+
+static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec ) {
+ /* assertion: mbox_first != NULL */
+ Scheme_Object *r = NULL;
+ r = SCHEME_CAR(p->mbox_first);
+ p->mbox_first = SCHEME_CDR(p->mbox_first);
+ if (!p->mbox_first)
+ p->mbox_last = NULL;
+ if (dec) (--((Scheme_Sema*)p->mbox_sema)->value);
+ return r;
+}
+
+static Scheme_Object *thread_send(int argc, Scheme_Object **argv) {
+ if (SCHEME_THREADP(argv[0])) {
+ int running;
+ running = ((Scheme_Thread*)argv[0])->running;
+ if (MZTHREAD_STILL_RUNNING(running)) {
+ mbox_push((Scheme_Thread*)argv[0], argv[1]);
+ return scheme_void;
+ } else scheme_raise_exn(MZEXN_FAIL, "thread-send: thread is not running");
+ } else scheme_wrong_type("thread-send", "thread", 0, argc, argv);
+ /* unreachable, but compiler complains */
+ return scheme_void;
+}
+
+/* The mbox semaphore can only be downed by the current thread, so
+ receive/try-receive can directly dec+pop without syncing
+ (by calling mbox_pop with dec=1)
+*/
+static Scheme_Object *thread_receive(int argc, Scheme_Object **argv) {
+ if (scheme_current_thread->mbox_first) {
+ return mbox_pop(scheme_current_thread, 1);
+ } else {
+ scheme_wait_sema(scheme_current_thread->mbox_sema, 0);
+ return mbox_pop(scheme_current_thread, 0);
+ }
+}
+
+static Scheme_Object *thread_try_receive(int argc, Scheme_Object **argv) {
+ if (scheme_current_thread->mbox_first) {
+ return mbox_pop(scheme_current_thread, 1);
+ } else return scheme_false;
+}
+
+static Scheme_Object *thread_receive_evt(int argc, Scheme_Object **argv) {
+ return scheme_make_sema_repost( scheme_current_thread->mbox_sema );
+}
+
+
+/**********************************************************************/
/* alarms */
/**********************************************************************/
Index: src/mzscheme/src/schminc.h
===================================================================
--- src/mzscheme/src/schminc.h (revision 8983)
+++ src/mzscheme/src/schminc.h (working copy)
@@ -13,7 +13,7 @@
#define USE_COMPILED_STARTUP 1
-#define EXPECTED_PRIM_COUNT 899
+#define EXPECTED_PRIM_COUNT 903
#ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP
-------------- next part --------------
;; communication primitives micro-benchmarks for mzscheme
;; author: vyzo at media.mit.edu
#lang scheme
(require scheme/async-channel srfi/26)
(provide (all-defined-out))
(define-syntax dotimes
(syntax-rules ()
((_ k e es ...)
(do ((n 0 (add1 n)))
((= n k))
e es ...))))
(define-syntax thread-wait*
(syntax-rules ()
((_ thr ...)
(begin (thread-wait thr) ...))))
(define (time-it f (n 3))
(let lp ((r null) (n n))
(if (zero? n)
r
(call-with-values
(lambda () (time-apply f '()))
(lambda (x . rs)
(lp (cons rs r) (sub1 n)))))))
(define (mbox-asymmetric k)
(define (send peer)
(dotimes k (thread-send peer 'hello)))
(define (recv)
(dotimes k (thread-receive)))
(let* ((r (thread recv))
(s (thread (cut send r))))
(thread-wait* s r)))
(define (mbox-symmetric k)
(define (send)
(let ((peer (thread-receive)))
(dotimes k (thread-send peer 'hello) (thread-receive))))
(define (recv peer)
(dotimes k (thread-receive) (thread-send peer 'hello)))
(let* ((s (thread send))
(r (thread (cut recv s))))
(thread-send s r)
(thread-wait* s r)))
(define (sema-asymmetric k)
(define (recv sema)
(dotimes k (semaphore-wait sema)))
(define (send sema)
(dotimes k (semaphore-post sema)))
(let* ((sema (make-semaphore))
(r (thread (lambda () (recv sema))))
(s (thread (lambda () (send sema)))))
(thread-wait* s r)))
(define (sema-symmetric k)
(define (recv in out)
(dotimes k (semaphore-wait in) (semaphore-post out)))
(define (send in out)
(dotimes k (semaphore-post out) (semaphore-wait in)))
(let* ((s1 (make-semaphore))
(s2 (make-semaphore))
(r (thread (lambda () (recv s1 s2))))
(s (thread (lambda () (send s2 s1)))))
(thread-wait* s r)))
(define (sema-baseline k)
(let ((sema (make-semaphore 1)))
(dotimes k (call-with-semaphore sema void))))
(define (channel-asymmetric k)
(define (recv ch)
(dotimes k (channel-get ch)))
(define (send ch)
(dotimes k (channel-put ch 'hello)))
(let* ((ch (make-channel))
(r (thread (lambda () (recv ch))))
(s (thread (lambda () (send ch)))))
(thread-wait* s r)))
(define (channel-symmetric k)
(define (recv in out)
(dotimes k (channel-get in) (channel-put out 'hello)))
(define (send in out)
(dotimes k (channel-put out 'hello) (channel-get in)))
(let* ((ch1 (make-channel))
(ch2 (make-channel))
(r (thread (lambda () (recv ch1 ch2))))
(s (thread (lambda () (send ch2 ch1)))))
(thread-wait* s r)))
(define (async-channel-asymmetric k)
(define (recv ch)
(dotimes k (async-channel-get ch)))
(define (send ch)
(dotimes k (async-channel-put ch 'hello)))
(let* ((ch (make-async-channel))
(r (thread (lambda () (recv ch))))
(s (thread (lambda () (send ch)))))
(thread-wait* s r)))
(define (async-channel-symmetric k)
(define (recv in out)
(dotimes k (async-channel-get in) (async-channel-put out 'hello)))
(define (send in out)
(dotimes k (async-channel-put out 'hello) (async-channel-get in)))
(let* ((ch1 (make-async-channel))
(ch2 (make-async-channel))
(r (thread (lambda () (recv ch1 ch2))))
(s (thread (lambda () (send ch2 ch1)))))
(thread-wait* s r)))
(define (context-switch-baseline k)
(define (runner)
(dotimes k (sleep)))
(let ((t1 (thread runner))
(t2 (thread runner)))
(thread-wait* t1 t2)))
(define symmetric
(list
'symmetric
sema-symmetric mbox-symmetric channel-symmetric async-channel-symmetric))
(define asymmetric
(list
'asymmetric
sema-asymmetric mbox-asymmetric channel-asymmetric async-channel-asymmetric))
(define baseline
(list 'baseline sema-baseline context-switch-baseline))
(define (median lst k n)
(/ (* 1000
(exact->inexact
(list-ref (sort lst <) (if (even? n) (/ n 2) (/ (sub1 n) 2)))))
k))
(define (go test k (n 3))
(printf "~a~n" (object-name test))
(let ((r (time-it (lambda () (test k)) n)))
(printf " ~a~n" r)
(printf " ~a usec/iter~n" (median (map car r) k n))))
(define (run-set set (k 1000000) (n 5))
(printf "~a ~a~n" (car set) k)
(printf "--------------------~n")
(for-each (cut go <> k n) (cdr set)))