[plt-scheme] Be kind, thread-rewind [PATCH]

From: Dimitris Vyzovitis (vyzo at media.mit.edu)
Date: Fri Mar 21 20:58:19 EDT 2008

This patch complements the new thread mailbox functionality with an
additional primitive, thread-rewind:

(thread-rewind <list>)
  atomically pushes the list at the front of the current thread's mailbox.

The behavior is best understood this way:
(thread-rewind (list a0 ... aN))
(thread-receive) => aN
(thread-receive)...
(thread-receive) => a0
without affecting the causal order of messages that are already in the
mailbox or arrive during the sequence of receives.

eg:
> (thread-send (current-thread) 1)
> (thread-send (current-thread) 2)
> (thread-rewind '(a b c))
> (thread-receive)
c
> (thread-receive)
b
> (thread-receive)
a
> (thread-receive)
1
> (thread-receive)
2

thread-rewind is a simple (and efficient) primitive for implementing
higher-level mailbox operations.
For instance, here is a straightforward implementation of some standard
ops for interleaved blocking transactions and filtering.
[See the attached rewind.ss for examples using these primitives]

;; receive: block until a message satisfying p? is available
(define (receive p?)
  (let lp ((r null))
    (let ((next (thread-receive)))
      (if (p? next)
        (begin (thread-rewind r) next)
        (lp (cons next r))))))

;; try-receive: scan the mailbox and extract a message satisfying p?
;;  #f if none is available
(define (try-receive p?)
  (let lp ((r null))
    (cond
     ((thread-try-receive) =>
      (lambda (next)
        (if (p? next)
          (begin (thread-rewind r) next)
          (lp (cons next r)))))
     (else (thread-rewind r) #f))))

;; wait: block until a message satisfying p? is available or the event evt
;;  is ready
(define (wait p? evt)
  (let lp ((r null))
    (sync
      (handle-evt (thread-receive-evt)
        (lambda (e)
          (let ((next (thread-receive)))
            (if (p? next)
              (begin (thread-rewind r) next)
              (lp (cons next r))))))
      (handle-evt evt
        (lambda (e) (thread-rewind r) e)))))

;; receive*: receives all messages from the mailbox that satisfy p?
(define (receive* p?)
  (let lp ((r null) (vs null))
    (cond
     ((thread-try-receive) =>
      (lambda (next)
        (if (p? next)
          (lp r (cons next vs))
          (lp (cons next r) vs))))
     (else
       (thread-rewind r)
       (reverse vs)))))


-- vyzo
-------------- next part --------------
Index: src/mzscheme/src/sema.c
===================================================================
--- src/mzscheme/src/sema.c	(revision 9047)
+++ src/mzscheme/src/sema.c	(working copy)
@@ -42,6 +42,7 @@
 static Scheme_Object *thread_receive(int n, Scheme_Object **p);
 static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
 static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
+static Scheme_Object *thread_rewind(int n, Scheme_Object **p);
 
 static Scheme_Object *make_alarm(int n, Scheme_Object **p);
 static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
@@ -163,6 +164,11 @@
 						      "thread-receive-evt", 
 						      0, 0), 
 			     env);
+  scheme_add_global_constant("thread-rewind", 
+			     scheme_make_prim_w_arity(thread_rewind,
+						      "thread-rewind", 
+						      1, 1), 
+			     env);
 
 
   scheme_add_global_constant("alarm-evt", 
@@ -1043,6 +1049,27 @@
      memory for the queue, first. */
 }
 
+
+static void mbox_push_front(Scheme_Thread *p, Scheme_Object *lst) 
+{
+  int x;
+  Scheme_Object *next;
+  for (x = -1, next = lst; !SCHEME_NULLP(next); ++x, next = SCHEME_CDR(next)) {
+    Scheme_Object *hd;
+    hd = scheme_make_raw_pair(SCHEME_CAR(next), p->mbox_first);
+    if (!p->mbox_first)
+      p->mbox_last = hd;
+    p->mbox_first = hd;
+  }
+  
+  if (x > -1) {
+    /* do a single post for all messages */
+    ((Scheme_Sema*)p->mbox_sema)->value += x;
+    scheme_post_sema(p->mbox_sema);
+  }
+  
+}
+
 static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
 {
   /* Assertion: mbox_first != NULL */
@@ -1138,6 +1165,16 @@
   return 0;
 }
 
+static Scheme_Object *thread_rewind(int argc, Scheme_Object **argv)
+{
+  if (scheme_is_list(argv[0])) {
+    mbox_push_front(scheme_current_thread, argv[0]);
+    return scheme_void;
+  } else
+    scheme_wrong_type("thread-rewind", "list", 0, argc, argv);
+  return NULL;
+}
+
 /**********************************************************************/
 /*                             alarms                                 */
 /**********************************************************************/
Index: src/mzscheme/src/schminc.h
===================================================================
--- src/mzscheme/src/schminc.h	(revision 9047)
+++ src/mzscheme/src/schminc.h	(working copy)
@@ -11,9 +11,9 @@
    EXPECTED_PRIM_COUNT to the new value, and then USE_COMPILED_STARTUP
    can be set to 1 again. */
 
-#define USE_COMPILED_STARTUP 1
+#define USE_COMPILED_STARTUP 0
 
-#define EXPECTED_PRIM_COUNT 903
+#define EXPECTED_PRIM_COUNT 904
 
 #ifdef MZSCHEME_SOMETHING_OMITTED
 # undef USE_COMPILED_STARTUP
-------------- next part --------------
;; thread-rewind examples:  higher level mailbox primitives
;; author: vyzo at media.mit.edu
#|
> (require "rewind.ss")
> (thread-wait (thread do-fruits))
an #<orange>
an #<apple>
more? #f
> (thread-wait (thread do-fruits/timeout))
an #<orange>
more orange? #f
an #<apple>
more? #f
> (thread-wait (thread do-fruits/filter))
some (#<orange> #<orange>)
some (#<apple> #<apple>)
more? 1
|#

#lang scheme
(provide (all-defined-out))

(define (receive p?)
  (let lp ((r null))
    (let ((next (thread-receive)))
      (if (p? next)
        (begin (thread-rewind r) next)
        (lp (cons next r))))))

(define (try-receive p?)
  (let lp ((r null))
    (cond
     ((thread-try-receive) =>
      (lambda (next)
        (if (p? next)
          (begin (thread-rewind r) next)
          (lp (cons next r)))))
     (else (thread-rewind r) #f))))

(define (wait p? evt)
  (let lp ((r null))
    (sync 
      (handle-evt (thread-receive-evt)
        (lambda (e) 
          (let ((next (thread-receive)))
            (if (p? next)
              (begin (thread-rewind r) next)
              (lp (cons next r))))))
      (handle-evt evt
        (lambda (e) (thread-rewind r) e)))))

(define (receive* p?)
  (let lp ((r null) (vs null))
    (cond
     ((thread-try-receive) =>
      (lambda (next)
        (if (p? next)
          (lp r (cons next vs))
          (lp (cons next r) vs))))
     (else
       (thread-rewind r)
       (reverse vs)))))

(define-struct apple ())
(define-struct orange ())
(define-struct get-fruit (who))

(define (i-want-fruit who)
  (thread-send who (make-get-fruit (current-thread))))

(define (fruit-basket make-fruit)
  (lambda ()
    (let lp ()
      (let ((o (thread-receive)))
        (if (get-fruit? o)
          (thread-send (get-fruit-who o) (make-fruit))
          (printf "~a: unexpected ~a~n" (object-name make-fruit) o))
        (lp)))))

(define apples (thread (fruit-basket make-apple)))
(define oranges (thread (fruit-basket make-orange)))

(define (do-fruits)
  (i-want-fruit apples)
  (i-want-fruit oranges)
  (printf "an ~a~n" (receive orange?)) ; #<orange>
  (printf "an ~a~n" (thread-receive)) ; #<apple>
  (printf "more? ~a~n" (thread-try-receive))) ; #f

(define (timeout secs (timeo 'timeout))
  (handle-evt (alarm-evt (+ (current-inexact-milliseconds) (* secs 1000)))
    (lambda (e) timeo)))

(define (do-fruits/timeout)
  (i-want-fruit apples)
  (i-want-fruit oranges)
  (printf "an ~a~n" (wait orange? (timeout 1))) ; #<orange>
  (printf "more orange? ~a~n" (wait orange? (timeout 1 #f))) ; #f
  (printf "an ~a~n" (wait apple? (timeout 1))); #<apple>
  (printf "more? ~a~n" (thread-try-receive))) ; #f

(define (do-fruits/filter)
  (i-want-fruit apples)
  (sleep)
  (thread-send (current-thread) 1)
  (i-want-fruit oranges)
  (i-want-fruit apples)
  (i-want-fruit oranges)
  (sleep 1)
  (printf "some ~a~n" (receive* orange?)) ; (#<orange> #<orange>) 
  (printf "some ~a~n" (receive* apple?))  ; (#<apple> #<apple>)
  (printf "more? ~a~n" (thread-receive))) ; 1

Posted on the users mailing list.