summaryrefslogtreecommitdiff
path: root/appl/lib
diff options
context:
space:
mode:
authorCharles.Forsyth <devnull@localhost>2007-12-27 12:25:32 +0000
committerCharles.Forsyth <devnull@localhost>2007-12-27 12:25:32 +0000
commitb43c1ca5eb5fc65b93ae935a568432712797b049 (patch)
tree512e88b4fcf815e911ff4eab8f850427b93f9afa /appl/lib
parent45a14cc6b60b02e58eb1b271508ccc2fa3c52b8f (diff)
20071227-1230
Diffstat (limited to 'appl/lib')
-rw-r--r--appl/lib/mkfile1
-rw-r--r--appl/lib/styxflush.b153
2 files changed, 154 insertions, 0 deletions
diff --git a/appl/lib/mkfile b/appl/lib/mkfile
index 6784a568..34062c42 100644
--- a/appl/lib/mkfile
+++ b/appl/lib/mkfile
@@ -236,3 +236,4 @@ json.dis: $ROOT/module/json.m
lists.dis: $ROOT/module/lists.m
vac.dis: $ROOT/module/vac.m $ROOT/module/venti.m
dial.dis: $ROOT/module/dial.m
+styxflush.dis: $ROOT/module/styxflush.m
diff --git a/appl/lib/styxflush.b b/appl/lib/styxflush.b
new file mode 100644
index 00000000..3388b1be
--- /dev/null
+++ b/appl/lib/styxflush.b
@@ -0,0 +1,153 @@
+implement Styxflush;
+include "sys.m";
+ sys: Sys;
+include "tables.m";
+ tables: Tables;
+ Table: import tables;
+include "styx.m";
+ styx: Styx;
+ Tmsg, Rmsg: import styx;
+include "styxflush.m";
+
+reqs: ref Table[ref Req];
+Req: adt {
+ m: ref Tmsg;
+ flushc: chan of (int, chan of int);
+ oldreq: cyclic ref Req;
+ flushes: cyclic ref Req; # flushes queued on this req.
+ nextflush: cyclic ref Req; # (flush only) next req in flush queue.
+ flushready: chan of int; # (flush only) wait for flush attempt.
+ flushing: int; # request is subject of a flush.
+ finished: chan of int; # [1]; signals finish to late flushers.
+ responded: int;
+};
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ tables = load Tables Tables->PATH;
+ styx = load Styx Styx->PATH;
+ styx->init();
+
+ reqs = Table[ref Req].new(11, nil);
+}
+
+tmsg(gm: ref Styx->Tmsg, flushc: chan of (int, chan of int), reply: chan of ref Styx->Rmsg): (int, ref Rmsg)
+{
+ req := ref Req(
+ gm,
+ flushc, # flushc
+ nil, # oldreq
+ nil, # flushes
+ nil, # nextflush
+ nil, # flushready
+ 0, # flushing
+ chan[1] of int, # finished
+ 0 # responded
+ );
+ if(reqs.add(gm.tag, req) == 0)
+ return (1, ref Rmsg.Error(gm.tag, "duplicate tag"));
+ pick m := gm {
+ Flush =>
+ req.oldreq = reqs.find(m.oldtag);
+ if(req.oldreq == nil)
+ return (1, ref Rmsg.Flush(m.tag));
+ addflush(req);
+ req.flushc = chan of (int, chan of int);
+ spawn flushreq(req, reply);
+ return (1, nil);
+ }
+ return (0, nil);
+}
+
+rmsg(rm: ref Styx->Rmsg): int
+{
+ req := reqs.find(rm.tag);
+ if(req == nil){
+ complain("req has disappeared, reply "+rm.text());
+ return 0;
+ }
+ reqs.del(rm.tag);
+ if(tagof rm == tagof Rmsg.Flush)
+ delflush(req);
+ if(req.flushing)
+ req.finished <-= 1;
+ req.responded = 1;
+ pick m := rm {
+ Error =>
+ if(m.ename == Einterrupted){
+ if(!req.flushing)
+ complain("interrupted reply but no flush "+req.m.text());
+ return 0;
+ }
+ }
+ return 1;
+}
+
+addflush(req: ref Req)
+{
+ o := req.oldreq;
+ for(r := o.flushes; r != nil; r = r.nextflush)
+ if(r.nextflush == nil)
+ break;
+ if(r == nil){
+ o.flushes = req;
+ req.flushready = nil;
+ }else{
+ r.nextflush = req;
+ req.flushready = chan of int;
+ }
+ o.flushing = 1;
+}
+
+# remove req (a flush request) from the list of flushes pending
+# for req.oldreq. if it was at the head of the list, then give
+# the next req a go.
+delflush(req: ref Req)
+{
+ oldreq := req.oldreq;
+ prev: ref Req;
+ for(r := oldreq.flushes; r != nil; r = r.nextflush){
+ if(r == req)
+ break;
+ prev = r;
+ }
+ if(prev == nil){
+ oldreq.flushes = r.nextflush;
+ if(oldreq.flushes != nil)
+ oldreq.flushes.flushready <-= 1;
+ }else
+ prev.nextflush = r.nextflush;
+ r.nextflush = nil;
+}
+
+flushreq(req: ref Req, reply: chan of ref Styx->Rmsg)
+{
+ o := req.oldreq;
+ # if we're queued up, wait our turn.
+ if(req.flushready != nil)
+ <-req.flushready;
+ rc := chan of int;
+ alt{
+ o.flushc <-= (req.m.tag, rc) =>
+ <-rc;
+ reply <-= ref Rmsg.Flush(req.m.tag);
+ # old request must have responded before sending on rc,
+ # but be defensive because it's easy to forget.
+ if(!o.responded){
+ complain("flushed request not responded to: "+o.m.text());
+ o.responded = 1; # race but better than nothing.
+ }
+ (nil, nrc) := <-req.flushc =>
+ reply <-= ref Rmsg.Error(req.m.tag, Einterrupted);
+ nrc <-= 1;
+ <-o.finished =>
+ o.finished <-= 1;
+ reply <-= ref Rmsg.Flush(req.m.tag);
+ }
+}
+
+complain(e: string)
+{
+ sys->fprint(sys->fildes(2), "styxflush: warning: %s\n", e);
+}