[plt-scheme] An alternative async-channel implementation
This is an alternative, simpler implementation using thread mailboxes
and a channel.
It keeps the same interface as scheme/async-channel (but does not
implement the queue size limit).
When there is no resource contention in the channel, it is
significantly faster (about 2x). It remains slightly faster (about 10-20%)
when there is contention.
-- vyzo
-------------- next part --------------
;; An async-channel implementation using thread mailboxes
;;
;; (C) 2008 Dimitris Vyzovitis [vyzo-at-media.mit.edu]
;;
;; This library is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Lesser General Public License (LGPL) as
;; published by the Free Software Foundation, version 3 or
;; (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU LGPL[1] for more details.
;;
;; [1] <http://www.gnu.org/licenses/>.
;;
;; Note: this implementation does not implement a limit to the async queue
;; size -- a semaphore could be used for this purpose.
#lang scheme/base
(provide (rename-out (!ac? async-channel))
make-async-channel
async-channel-get
async-channel-try-get
async-channel-put
async-channel-put-evt)
(define-syntax-rule (rec id expr)
(letrec ((id expr)) id))
;; async-queue implementation
(define (async-queue ac)
(define q null)
(define qtl #f)
(define (push! v)
(let ((qtl* (mcons v null)))
(if qtl (set-mcdr! qtl qtl*) (set! q qtl*))
(set! qtl qtl*)))
(define (pop!)
(let ((q* (mcdr q)))
(when (null? q*) (set! qtl #f))
(set! q q*)))
(let react ()
(sync
(handle-evt (thread-receive-evt)
(lambda (e) (push! (thread-receive)) (react)))
(if (null? q)
never-evt
(handle-evt (channel-put-evt (!ac-ch ac) (mcar q))
(lambda (e) (pop!) (react)))))))
(define (async-evt ac)
(thread-resume (!ac-thr ac) (current-thread))
(!ac-ch ac))
(define-struct !ac (thr ch)
#:property prop:evt async-evt)
;; async-channel interface
;; the limit argument is ignored - present for comatibility
(define (make-async-channel (limit #f))
(let* ((thr (thread/suspend-to-kill
(lambda () (async-queue (thread-receive)))))
(ac (make-!ac thr (make-channel))))
(thread-send thr ac)
ac))
(define (async-channel-get ac)
(sync ac))
(define (async-channel-try-get ac)
(sync/timeout 0 ac))
(define (async-channel-put ac v)
(thread-resume (!ac-thr ac) (current-thread))
(thread-send (!ac-thr ac) v))
(define (async-channel-put-evt ac v)
(rec self
(handle-evt always-evt
(lambda (e) (async-channel-put ac v) self))))
-------------- next part --------------
#lang scheme/base
(require srfi/78
"async-channel.ss"
(prefix-in mz: scheme/async-channel))
(provide (all-defined-out))
(define (wait-idle) (sync (system-idle-evt)))
(define (check-async-channel)
(let ((x (make-async-channel)))
(wait-idle)
(check (async-channel-try-get x) => #f)
(async-channel-put x 1)
(wait-idle)
(check (async-channel-try-get x) => 1)
(async-channel-put x 2)
(wait-idle)
(check (async-channel-get x) => 2)
(sync (async-channel-put-evt x 3))
(wait-idle)
(check (sync x) => 3)
(for ((i '(1 2 3))) (async-channel-put x i))
(wait-idle)
(for ((i '(1 2 3))) (check (async-channel-get x) => i))))
(define (synchronizer ch sema put n)
(handle-evt ch
(lambda (v)
(if (> v n)
(begin (semaphore-post sema) #f)
(begin (put ch (add1 v)) v)))))
(define (synchronizer-thread ch sema put n)
(define step (synchronizer ch sema put n))
(define done (wrap-evt (semaphore-peek-evt sema) (lambda (x) #f)))
(thread
(lambda ()
(let lp ()
(when (sync step done) (lp))))))
;; M: # of threads that content. M=1 for no contention
;; N: # of ops
(define (run make put M N)
(let* ((sema (make-semaphore))
(ch (make))
(thrs (build-list M (lambda (x) (synchronizer-thread ch sema put N)))))
(wait-idle)
(put ch 0)
(for ((x thrs)) (thread-wait x))))
(define (run/ac (M 1) (N 100000))
(run make-async-channel async-channel-put M N))
(define (run/mzac (M 1) (N 100000))
(run mz:make-async-channel mz:async-channel-put M N))