[plt-scheme] An alternative async-channel implementation

From: Dimitris Vyzovitis (vyzo at media.mit.edu)
Date: Sat Apr 26 17:21:14 EDT 2008

This is an alternative, simpler implementation using thread mailboxes
and a channel.
It keeps the same interface as scheme/async-channel (but does not
implement the queue size limit).

When there is no resource contention in the channel, it is
significantly faster (about 2x). It remains slightly faster (about 10-20%)
when there is contention.

-- vyzo
-------------- next part --------------
;; An async-channel implementation using thread mailboxes
;;
;; (C) 2008 Dimitris Vyzovitis [vyzo-at-media.mit.edu]
;;
;; This library is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Lesser General Public License (LGPL) as 
;; published by the Free Software Foundation, version 3 or 
;; (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;; GNU LGPL[1] for more details.
;;
;; [1] <http://www.gnu.org/licenses/>.
;;
;; Note: this implementation does not implement a limit to the async queue
;;  size -- a semaphore could be used for this purpose.
#lang scheme/base

(provide (rename-out (!ac? async-channel))
         make-async-channel
         async-channel-get
         async-channel-try-get
         async-channel-put
         async-channel-put-evt)

(define-syntax-rule (rec id expr)
  (letrec ((id expr)) id))

;; async-queue implementation
(define (async-queue ac)
  (define q null)
  (define qtl #f)
  (define (push! v)
    (let ((qtl* (mcons v null)))
      (if qtl (set-mcdr! qtl qtl*) (set! q qtl*))
      (set! qtl qtl*)))
  (define (pop!)
    (let ((q* (mcdr q)))
      (when (null? q*) (set! qtl #f))
      (set! q q*)))
  (let react ()
    (sync
      (handle-evt (thread-receive-evt)
        (lambda (e) (push! (thread-receive)) (react)))
      (if (null? q)
        never-evt
        (handle-evt (channel-put-evt (!ac-ch ac) (mcar q))
          (lambda (e) (pop!) (react)))))))

(define (async-evt ac)
  (thread-resume (!ac-thr ac) (current-thread))
  (!ac-ch ac))

(define-struct !ac (thr ch)
  #:property prop:evt async-evt)

;; async-channel interface
;; the limit argument is ignored - present for comatibility
(define (make-async-channel (limit #f))
  (let* ((thr (thread/suspend-to-kill 
               (lambda () (async-queue (thread-receive)))))
         (ac (make-!ac thr (make-channel))))
    (thread-send thr ac)
    ac))

(define (async-channel-get ac)
  (sync ac))

(define (async-channel-try-get ac)
  (sync/timeout 0 ac))

(define (async-channel-put ac v)
  (thread-resume (!ac-thr ac) (current-thread))
  (thread-send (!ac-thr ac) v))

(define (async-channel-put-evt ac v)
  (rec self
    (handle-evt always-evt
      (lambda (e) (async-channel-put ac v) self))))
-------------- next part --------------
#lang scheme/base

(require srfi/78
         "async-channel.ss" 
         (prefix-in mz: scheme/async-channel))
(provide (all-defined-out))

(define (wait-idle) (sync (system-idle-evt)))

(define (check-async-channel)
  (let ((x (make-async-channel)))
    (wait-idle)
    (check (async-channel-try-get x) => #f)
    (async-channel-put x 1)
    (wait-idle)
    (check (async-channel-try-get x) => 1)
    (async-channel-put x 2)
    (wait-idle)
    (check (async-channel-get x) => 2)
    (sync (async-channel-put-evt x 3))
    (wait-idle)
    (check (sync x) => 3)
    (for ((i '(1 2 3))) (async-channel-put x i))
    (wait-idle)
    (for ((i '(1 2 3))) (check (async-channel-get x) => i))))

(define (synchronizer ch sema put n)
  (handle-evt ch
    (lambda (v)
      (if (> v n) 
        (begin (semaphore-post sema) #f)
        (begin (put ch (add1 v)) v)))))

(define (synchronizer-thread ch sema put n)
  (define step (synchronizer ch sema put n))
  (define done (wrap-evt (semaphore-peek-evt sema) (lambda (x) #f)))
  (thread
   (lambda ()
     (let lp ()
       (when (sync step done) (lp))))))

;; M: # of threads that content. M=1 for no contention
;; N: # of ops
(define (run  make put M N)
  (let* ((sema (make-semaphore))
         (ch (make))
         (thrs (build-list M (lambda (x) (synchronizer-thread ch sema put N)))))
    (wait-idle)
    (put ch 0)
    (for ((x thrs)) (thread-wait x))))

(define (run/ac (M 1) (N 100000))
  (run make-async-channel async-channel-put M N))

(define (run/mzac (M 1) (N 100000))
  (run mz:make-async-channel mz:async-channel-put M N))


Posted on the users mailing list.