#lang scheme ;; To send messages to server: (define read-req-ch (make-channel)) (define write-req-ch (make-channel)) ;; A message to the server: (define-struct req (accept-ch done-sema th)) ;; Internal to server: (define-struct reader (done-sema th)) (define-struct writer (accept-ch done-sema th)) (define temporary-workaround-evt (make-semaphore)) ;; avoids a bug in version prior to v4.1.0.2-svn11367 (define (serve readers pending-writer) (sync ;; Handle reqeusst to become writer: (handle-evt write-req-ch (lambda (r) (cond [pending-writer ;; Reject request, because someone else has asked ;; to become a writer (reply (req-accept-ch r) #f) (serve readers pending-writer)] [else ;; Accept the writer and remove it from the readers, ;; but don't let the writer continue until we're ;; out of readers (serve (remove r readers (lambda (a b) (eq? (req-th a) (reader-th b)))) (make-writer (req-accept-ch r) (req-done-sema r) (req-th r)))]))) ;; If a pending writer and no readers, let writer continue: (if (and pending-writer (null? readers)) (choice-evt (let ([completed-sema (make-semaphore)]) (handle-evt (channel-put-evt (writer-accept-ch pending-writer) completed-sema) (lambda (v) ;; Writer has accepted and should now provide the (trusted!) ;; writing thunk (sync ;; handle provided thunk: (handle-evt (writer-accept-ch pending-writer) (lambda (write-thunk) (write-thunk))) ;; No problem if the writer dies before supplying the thunk: (thread-dead-evt (writer-th pending-writer))) ;; Tell writing thread that the thunk completed: (semaphore-post completed-sema) ;; Demote the writer back to a reader: (serve (list (make-reader (writer-done-sema pending-writer) (writer-th pending-writer))) #f)))) (handle-evt (thread-dead-evt (writer-th pending-writer)) (lambda (v) ;; Writer died before continuing (serve null #f)))) never-evt) ;; Handle requests to become a reader (if pending-writer never-evt ; don't allow if there's a pending writer (handle-evt read-req-ch (lambda (r) ;; Accept new reader (reply (req-accept-ch r) #t) (serve (cons (make-reader (req-done-sema r) (req-th r)) readers) #f)))) ;; Handle reader completion/termination by removing it from our list (apply choice-evt temporary-workaround-evt (map (lambda (r) (handle-evt (choice-evt (reader-done-sema r) (thread-dead-evt (reader-th r))) (lambda (v) (serve (remq r readers) pending-writer)))) readers)))) (define (reply ch v) (or ;; Reply immediately, if possible; this is an optimization (sync/timeout 0 (channel-put-evt ch v)) ;; Reply eventually (thread-resume (thread (lambda () (channel-put ch v))) (current-thread)))) ;; Start the server: (define serve-thread (thread (lambda () (serve null #f)))) ;; ---------------------------------------- (define retry (make-continuation-prompt-tag 'retry)) ;; call-as-retryable : (((-> beta) -> beta) -> alpha) -> alpha ;; Given function acts as reader. When the reader is called, it's given ;; a function that takes a thunk to apply in write mode (but write mode ;; also continues after the thunk returns) (define (call-as-retryable read-proc) ;; Ensure that server is running: (thread-resume serve-thread (current-thread)) ;; Set up a prompt for abort and retry: (call-with-continuation-prompt (lambda () (let ([accept-ch (make-channel)] [done-sema (make-semaphore)]) (channel-put read-req-ch (make-req accept-ch done-sema (current-thread))) (channel-get accept-ch) ;; wait until accepted as a reader (call-with-continuation-barrier (lambda () (dynamic-wind void (lambda () ;; read... (read-proc (lambda (write-thunk) ;; shift into write mode (let ([accept-ch (make-channel)]) (channel-put write-req-ch (make-req accept-ch done-sema (current-thread))) (let ([completed-sema (channel-get accept-ch)]) (if completed-sema ;; allowed to write: (begin (channel-put accept-ch write-thunk) (semaphore-wait completed-sema)) ;; can't write, so abort (and abort handler will retry) (abort-current-continuation retry))))))) (lambda () (semaphore-post done-sema))))))) retry ;; On abort, retry: (lambda () (call-as-retryable read-proc)))) ;; ---------------------------------------- ;; Example use (define x 0) (for ([i (in-range 100)]) (thread (lambda () (printf "~s\n" (call-as-retryable (lambda (call-write) (when (zero? (random 3)) ; randomly become writer (call-write (lambda () (let ([v x]) (set! x #f) ; simulate inconsistency (sleep) ;; if the server thread is killed here, it will get resumed ;; by any future read attempt, and so the write action will ;; finish (set! x (add1 v)))))) (when (zero? (random (add1 i))) ; randomly die (kill-thread (current-thread))) ;; x should never be #f (i.e., inconsistent): x))) ;; pretend to continue with other work... (sleep 1000))))