[plt-scheme] An alternative async-channel implementation

From: Dimitris Vyzovitis (vyzo at media.mit.edu)
Date: Sat Apr 26 17:27:37 EDT 2008

On Sat, 26 Apr 2008, Dimitris Vyzovitis wrote:

> This is an alternative, simpler implementation using thread mailboxes
> and a channel.

Oops - slight bug: async-channel? is provided as async-channel.
Fixed in this attachment.

-- vyzo
-------------- next part --------------
;; An async-channel implementation using thread mailboxes
;;
;; (C) 2008 Dimitris Vyzovitis [vyzo-at-media.mit.edu]
;;
;; This library is free software: you can redistribute it and/or modify
;; it under the terms of the GNU Lesser General Public License (LGPL) as 
;; published by the Free Software Foundation, version 3 or 
;; (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;; GNU LGPL[1] for more details.
;;
;; [1] <http://www.gnu.org/licenses/>.
;;
;; Note: this implementation does not implement a limit to the async queue
;;  size -- a semaphore could be used for this purpose.
#lang scheme/base

(provide (rename-out (!ac? async-channel?))
         make-async-channel
         async-channel-get
         async-channel-try-get
         async-channel-put
         async-channel-put-evt)

(define-syntax-rule (rec id expr)
  (letrec ((id expr)) id))

;; async-queue implementation
(define (async-queue ac)
  (define q null)
  (define qtl #f)
  (define (push! v)
    (let ((qtl* (mcons v null)))
      (if qtl (set-mcdr! qtl qtl*) (set! q qtl*))
      (set! qtl qtl*)))
  (define (pop!)
    (let ((q* (mcdr q)))
      (when (null? q*) (set! qtl #f))
      (set! q q*)))
  (let react ()
    (sync
      (handle-evt (thread-receive-evt)
        (lambda (e) (push! (thread-receive)) (react)))
      (if (null? q)
        never-evt
        (handle-evt (channel-put-evt (!ac-ch ac) (mcar q))
          (lambda (e) (pop!) (react)))))))

(define (async-evt ac)
  (thread-resume (!ac-thr ac) (current-thread))
  (!ac-ch ac))

(define-struct !ac (thr ch)
  #:property prop:evt async-evt)

;; async-channel interface
;; the limit argument is ignored - present for comatibility
(define (make-async-channel (limit #f))
  (let* ((thr (thread/suspend-to-kill 
               (lambda () (async-queue (thread-receive)))))
         (ac (make-!ac thr (make-channel))))
    (thread-send thr ac)
    ac))

(define (async-channel-get ac)
  (sync ac))

(define (async-channel-try-get ac)
  (sync/timeout 0 ac))

(define (async-channel-put ac v)
  (thread-resume (!ac-thr ac) (current-thread))
  (thread-send (!ac-thr ac) v))

(define (async-channel-put-evt ac v)
  (rec self
    (handle-evt always-evt
      (lambda (e) (async-channel-put ac v) self))))

Posted on the users mailing list.