[plt-scheme] Be kind, thread-rewind [PATCH]
This patch complements the new thread mailbox functionality with an
additional primitive, thread-rewind:
(thread-rewind <list>)
atomically pushes the list at the front of the current thread's mailbox.
The behavior is best understood this way:
(thread-rewind (list a0 ... aN))
(thread-receive) => aN
(thread-receive)...
(thread-receive) => a0
without affecting the causal order of messages that are already in the
mailbox or arrive during the sequence of receives.
eg:
> (thread-send (current-thread) 1)
> (thread-send (current-thread) 2)
> (thread-rewind '(a b c))
> (thread-receive)
c
> (thread-receive)
b
> (thread-receive)
a
> (thread-receive)
1
> (thread-receive)
2
thread-rewind is a simple (and efficient) primitive for implementing
higher-level mailbox operations.
For instance, here is a straightforward implementation of some standard
ops for interleaved blocking transactions and filtering.
[See the attached rewind.ss for examples using these primitives]
;; receive: block until a message satisfying p? is available
(define (receive p?)
(let lp ((r null))
(let ((next (thread-receive)))
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r))))))
;; try-receive: scan the mailbox and extract a message satisfying p?
;; #f if none is available
(define (try-receive p?)
(let lp ((r null))
(cond
((thread-try-receive) =>
(lambda (next)
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r)))))
(else (thread-rewind r) #f))))
;; wait: block until a message satisfying p? is available or the event evt
;; is ready
(define (wait p? evt)
(let lp ((r null))
(sync
(handle-evt (thread-receive-evt)
(lambda (e)
(let ((next (thread-receive)))
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r))))))
(handle-evt evt
(lambda (e) (thread-rewind r) e)))))
;; receive*: receives all messages from the mailbox that satisfy p?
(define (receive* p?)
(let lp ((r null) (vs null))
(cond
((thread-try-receive) =>
(lambda (next)
(if (p? next)
(lp r (cons next vs))
(lp (cons next r) vs))))
(else
(thread-rewind r)
(reverse vs)))))
-- vyzo
-------------- next part --------------
Index: src/mzscheme/src/sema.c
===================================================================
--- src/mzscheme/src/sema.c (revision 9047)
+++ src/mzscheme/src/sema.c (working copy)
@@ -42,6 +42,7 @@
static Scheme_Object *thread_receive(int n, Scheme_Object **p);
static Scheme_Object *thread_try_receive(int n, Scheme_Object **p);
static Scheme_Object *thread_receive_evt(int n, Scheme_Object **p);
+static Scheme_Object *thread_rewind(int n, Scheme_Object **p);
static Scheme_Object *make_alarm(int n, Scheme_Object **p);
static Scheme_Object *make_sys_idle(int n, Scheme_Object **p);
@@ -163,6 +164,11 @@
"thread-receive-evt",
0, 0),
env);
+ scheme_add_global_constant("thread-rewind",
+ scheme_make_prim_w_arity(thread_rewind,
+ "thread-rewind",
+ 1, 1),
+ env);
scheme_add_global_constant("alarm-evt",
@@ -1043,6 +1049,27 @@
memory for the queue, first. */
}
+
+static void mbox_push_front(Scheme_Thread *p, Scheme_Object *lst)
+{
+ int x;
+ Scheme_Object *next;
+ for (x = -1, next = lst; !SCHEME_NULLP(next); ++x, next = SCHEME_CDR(next)) {
+ Scheme_Object *hd;
+ hd = scheme_make_raw_pair(SCHEME_CAR(next), p->mbox_first);
+ if (!p->mbox_first)
+ p->mbox_last = hd;
+ p->mbox_first = hd;
+ }
+
+ if (x > -1) {
+ /* do a single post for all messages */
+ ((Scheme_Sema*)p->mbox_sema)->value += x;
+ scheme_post_sema(p->mbox_sema);
+ }
+
+}
+
static Scheme_Object *mbox_pop( Scheme_Thread *p, int dec)
{
/* Assertion: mbox_first != NULL */
@@ -1138,6 +1165,16 @@
return 0;
}
+static Scheme_Object *thread_rewind(int argc, Scheme_Object **argv)
+{
+ if (scheme_is_list(argv[0])) {
+ mbox_push_front(scheme_current_thread, argv[0]);
+ return scheme_void;
+ } else
+ scheme_wrong_type("thread-rewind", "list", 0, argc, argv);
+ return NULL;
+}
+
/**********************************************************************/
/* alarms */
/**********************************************************************/
Index: src/mzscheme/src/schminc.h
===================================================================
--- src/mzscheme/src/schminc.h (revision 9047)
+++ src/mzscheme/src/schminc.h (working copy)
@@ -11,9 +11,9 @@
EXPECTED_PRIM_COUNT to the new value, and then USE_COMPILED_STARTUP
can be set to 1 again. */
-#define USE_COMPILED_STARTUP 1
+#define USE_COMPILED_STARTUP 0
-#define EXPECTED_PRIM_COUNT 903
+#define EXPECTED_PRIM_COUNT 904
#ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP
-------------- next part --------------
;; thread-rewind examples: higher level mailbox primitives
;; author: vyzo at media.mit.edu
#|
> (require "rewind.ss")
> (thread-wait (thread do-fruits))
an #<orange>
an #<apple>
more? #f
> (thread-wait (thread do-fruits/timeout))
an #<orange>
more orange? #f
an #<apple>
more? #f
> (thread-wait (thread do-fruits/filter))
some (#<orange> #<orange>)
some (#<apple> #<apple>)
more? 1
|#
#lang scheme
(provide (all-defined-out))
(define (receive p?)
(let lp ((r null))
(let ((next (thread-receive)))
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r))))))
(define (try-receive p?)
(let lp ((r null))
(cond
((thread-try-receive) =>
(lambda (next)
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r)))))
(else (thread-rewind r) #f))))
(define (wait p? evt)
(let lp ((r null))
(sync
(handle-evt (thread-receive-evt)
(lambda (e)
(let ((next (thread-receive)))
(if (p? next)
(begin (thread-rewind r) next)
(lp (cons next r))))))
(handle-evt evt
(lambda (e) (thread-rewind r) e)))))
(define (receive* p?)
(let lp ((r null) (vs null))
(cond
((thread-try-receive) =>
(lambda (next)
(if (p? next)
(lp r (cons next vs))
(lp (cons next r) vs))))
(else
(thread-rewind r)
(reverse vs)))))
(define-struct apple ())
(define-struct orange ())
(define-struct get-fruit (who))
(define (i-want-fruit who)
(thread-send who (make-get-fruit (current-thread))))
(define (fruit-basket make-fruit)
(lambda ()
(let lp ()
(let ((o (thread-receive)))
(if (get-fruit? o)
(thread-send (get-fruit-who o) (make-fruit))
(printf "~a: unexpected ~a~n" (object-name make-fruit) o))
(lp)))))
(define apples (thread (fruit-basket make-apple)))
(define oranges (thread (fruit-basket make-orange)))
(define (do-fruits)
(i-want-fruit apples)
(i-want-fruit oranges)
(printf "an ~a~n" (receive orange?)) ; #<orange>
(printf "an ~a~n" (thread-receive)) ; #<apple>
(printf "more? ~a~n" (thread-try-receive))) ; #f
(define (timeout secs (timeo 'timeout))
(handle-evt (alarm-evt (+ (current-inexact-milliseconds) (* secs 1000)))
(lambda (e) timeo)))
(define (do-fruits/timeout)
(i-want-fruit apples)
(i-want-fruit oranges)
(printf "an ~a~n" (wait orange? (timeout 1))) ; #<orange>
(printf "more orange? ~a~n" (wait orange? (timeout 1 #f))) ; #f
(printf "an ~a~n" (wait apple? (timeout 1))); #<apple>
(printf "more? ~a~n" (thread-try-receive))) ; #f
(define (do-fruits/filter)
(i-want-fruit apples)
(sleep)
(thread-send (current-thread) 1)
(i-want-fruit oranges)
(i-want-fruit apples)
(i-want-fruit oranges)
(sleep 1)
(printf "some ~a~n" (receive* orange?)) ; (#<orange> #<orange>)
(printf "some ~a~n" (receive* apple?)) ; (#<apple> #<apple>)
(printf "more? ~a~n" (thread-receive))) ; 1