[plt-scheme] Savers: Superceding

From: Synx (plt at synx.us.to)
Date: Wed Feb 17 23:18:56 EST 2010

I've been playing with a neat concept I call "savers". Sort of an
over-simplification of ports. But a saver is just a procedure that takes
a byte vector and consumes it somehow. What's neat about it is I can
make sort of "filters" which are procedures that take a saver, and
return a new saver. The new saver does something with the data, before
invoking the old saver, so you could wrap encoding or data structure
serialization around a plain old port saver.

(define-struct slice (bytes start end))

(define saver/c (-> slice any))
(define filter/c (-> saver/c saver/c))

The problem with superceding came up when I was trying to make what I
was calling an "accumulator." Often I need the first N bytes of a byte
stream, but the saver can be invoked with a sequence of byte arrays that
could possibly overlap on that N byte boundary line.

Here's my accumulate procedure so far:

(define (accumulate amount completed #:failed (failed void))
  (define left amount)
  (define buffer null)
  (define save #f)

  (define (process slice)
    (cond
      ((eq? left #f) (save slice))
      ((eq? slice #f)
       (if (= left 0)
           (begin
             (set! left #f)
             (save slice))
           (failed left)))
      ((= left 0)
       (set! save (completed (slice:append (reverse buffer))))
       (set! left #f)
       (save slice))
      ((< left (slice:length slice))
       (let ((mid (+ left (slice:start slice))))
         (set! buffer (cons (slice:sub slice (slice:start slice) mid)
buffer))
         (set! left 0)
         (process (slice:sub slice mid (slice:end slice)))))
      ((>= left (slice:length slice))
       (set! buffer (cons slice buffer))
       (set! left (- left (slice:length slice))))))
  process)

Yes I know the mutation is horrible. But you'll notice that the
accumulate procedure takes a procedure that produces a saver, and
returns a saver. That way it can accumulate, say 200 bytes, then call
its completed procedure, handling that 200 byte prefix and returning the
proper saver. So I could for instance read in a 32 byte hash, and then
use it as a destination (or source) file name.


(accumulate 32
   (λ (result)
     (port-saver (open-output-file (format "data/~a" (hex result))))))

So that way, once I've gotten the first 32 bytes, it then just straight
out saves bytes to the resulting file.

"Superceding" is the term I've given for what happens with this
accumulate process. After the result has been accumulated, there is no
more need for accumulate. It doesn't have to check anything. It doesn't
have to buffer anything, or hold onto any bindings. Theoretically it is
exactly the same as calling (port-saver) by itself, except for the first
32 bytes. So in that way the port saver has "superceded" the accumulate
saver.

The problem with my implementation is that accumulate doesn't go away,
and I don't know how to make it go away. I end up calling
(accumulate-save bytes) which then checks (eq? left #f) and then calls
(port-save bytes). I would rather just have it call (port-save bytes)
for the rest of the stream, instead of doing what effectively amounts to:

(define (wrapper-save slice) (when (eq? #f #f) (save slice)))

That seems both unscalable and unnecessary. I don't really know the
solution though. I'd love to do (set! wrapper-save save) and just go
with that, but when I try to do it, the lexical context of wrapper-save
always seems to be inaccessable.

Has anyone done anything like this before? I'm not using a combination
of threads and pipes because I want to scale up to thousands of these
streams at once, and I can barely open 200 thread/pipe/pipe things
without my computer locking up. And when I tried using a combination of
threads and pipes, it turned out I needed about a half dozen threads and
pipes per stream, because of the several levels of recursive encoding.

I don't think I need the flexibility of threads and pipes, since it's
effectively just me taking a prefix off the stream and saving it. But
the source of these bytes is not often something as predictable as a
port. Often the source is a procedure that reads a sequence of files
decodes each, then concatenates their contents. So I can't just go
(read-bytes 32) as I wish I could.

I tried making custom ports, but that was a mess too. I don't want or
care about special values, lines, source code locations or peeking. All
I want is something that consumes bytes, through a complex chain of
encodings. But the chain itself has to have some way for me to
"supercede" elements in it, when I no longer have to decode a prefix for
instance, and all that particular saver is doing now, is just passing
the bytes onto the next step forever.


Posted on the users mailing list.