diff options
Diffstat (limited to 'appl/collab/servers/mpx.b')
| -rw-r--r-- | appl/collab/servers/mpx.b | 301 |
1 files changed, 301 insertions, 0 deletions
diff --git a/appl/collab/servers/mpx.b b/appl/collab/servers/mpx.b new file mode 100644 index 00000000..ea0c2c1c --- /dev/null +++ b/appl/collab/servers/mpx.b @@ -0,0 +1,301 @@ +implement Service; + +# +# 1 to many and many to 1 multiplexor +# + +include "sys.m"; + sys: Sys; + Qid: import Sys; + +include "styx.m"; + styx: Styx; + Tmsg, Rmsg: import styx; + +include "styxservers.m"; + styxservers: Styxservers; + Styxserver, Navigator: import styxservers; + nametree: Nametree; + Tree: import nametree; + +include "service.m"; + +include "messages.m"; + messages: Messages; + Msg, Msglist, Readreq, User: import messages; + +Qdir, Qroot, Qusers, Qleaf: con iota; + +srv: ref Styxserver; +clientidgen := 0; + +Einactive: con "not currently active"; + +toleaf: ref Msglist; +toroot: ref Msglist; +userlist: list of ref User; + +user := "inferno"; + +dir(name: string, perm: int, path: int): Sys->Dir +{ + d := sys->zerodir; + d.name = name; + d.uid = user; + d.gid = user; + d.qid.path = big path; + if(perm & Sys->DMDIR) + d.qid.qtype = Sys->QTDIR; + else + d.qid.qtype = Sys->QTFILE; + d.mode = perm; + return d; +} + +init(nil: list of string): (string, string, ref Sys->FD) +{ + sys = load Sys Sys->PATH; + styx = load Styx Styx->PATH; + if(styx == nil) + return (sys->sprint("can't load %s: %r", Styx->PATH), nil, nil); + styxservers = load Styxservers Styxservers->PATH; + if(styxservers == nil) + return (sys->sprint("can't load %s: %r", Styxservers->PATH), nil, nil); + nametree = load Nametree Nametree->PATH; + if(nametree == nil) + return (sys->sprint("can't load %s: %r", Nametree->PATH), nil, nil); + styx->init(); + styxservers->init(styx); +styxservers->traceset(1); + nametree->init(); + messages = load Messages Messages->PATH; + if(messages == nil) + return (sys->sprint("can't load %s: %r", Messages->PATH), nil, nil); + + (tree, treeop) := nametree->start(); + tree.create(big Qdir, dir(".", Sys->DMDIR|8r555, Qdir)); + tree.create(big Qdir, dir("leaf", 8r666, Qleaf)); + tree.create(big Qdir, dir("root", 8r666, Qroot)); + tree.create(big Qdir, dir("users", 8r444, Qusers)); + + p := array [2] of ref Sys->FD; + if (sys->pipe(p) < 0){ + tree.quit(); + return (sys->sprint("can't create pipe: %r"), nil, nil); + } + + toleaf = Msglist.new(); + toroot = Msglist.new(); + + tc: chan of ref Tmsg; + (tc, srv) = Styxserver.new(p[1], Navigator.new(treeop), big Qdir); + spawn mpx(tc, tree); + + return (nil, "/", p[0]); +} + +mpx(tc: chan of ref Tmsg, tree: ref Tree) +{ + root: ref User; + while((tmsg := <-tc) != nil){ + pick tm := tmsg { + Readerror => + break; + Open => + c := srv.getfid(tm.fid); + if(c == nil || c.isopen){ + srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid)); + continue; + } + case int c.path { + Qroot => + if(root != nil){ + srv.reply(ref Rmsg.Error(tm.tag, sys->sprint("interaction already directed by %s", root.name))); + continue; + } + c = srv.open(tm); + if (c == nil) + continue; + root = ref User(0, tm.fid, c.uname, nil); + root.initqueue(toroot); + Qleaf => + if(root == nil){ + srv.reply(ref Rmsg.Error(tm.tag, Einactive)); + continue; + } + c = srv.open(tm); + if (c == nil) + continue; + userarrives(tm.fid, c.uname); + # mpxdir[1].qid.vers++; # TO DO + * => + srv.open(tm); + } + Read => + c := srv.getfid(tm.fid); + if (c == nil || !c.isopen) { + srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid)); + continue; + } + case int c.path { + Qdir => + srv.read(tm); + Qroot => + tm.offset = big 0; + m := qread(toroot, root, tm, 1); + if(m != nil) + srv.reply(ref Rmsg.Read(tm.tag, m.data)); + Qleaf => + u := fid2user(tm.fid); + if (u == nil) { + srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user")); + continue; + } + tm.offset = big 0; + m := qread(toleaf, u, tm, 0); + if(m == nil){ + if(root == nil) + srv.reply(ref Rmsg.Read(tm.tag, nil)); + else + qread(toleaf, u, tm, 1); # put us on the wait queue + }else + srv.reply(ref Rmsg.Read(tm.tag, m.data)); + Qusers => + srv.reply(styxservers->readstr(tm, usernames())); + * => + srv.reply(ref Rmsg.Error(tm.tag, "phase error -- bad path")); + } + Write => + c := srv.getfid(tm.fid); + if (c == nil || !c.isopen) { + srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid)); + continue; + } + case int c.path { + Qroot => + qwrite(toleaf, msg(root, 'M', tm.data)); + srv.reply(ref Rmsg.Write(tm.tag, len tm.data)); + Qleaf => + u := fid2user(tm.fid); + if(u == nil) { + srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user")); + continue; + } + if(root == nil){ + srv.reply(ref Rmsg.Error(tm.tag, Einactive)); + continue; + } + qwrite(toroot, msg(u, 'm', tm.data)); + srv.reply(ref Rmsg.Write(tm.tag, len tm.data)); + * => + srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Eperm)); + } + Flush => + cancelpending(tm.tag); + srv.reply(ref Rmsg.Flush(tm.tag)); + Clunk => + c := srv.getfid(tm.fid); + if(c.isopen){ + case int c.path { + Qroot => + # shut down? + qwrite(toleaf, msg(root, 'L', nil)); + root = nil; + Qleaf => + userleaves(tm.fid); + # mpxdir[1].qid.vers++; # TO DO + } + } + * => + srv.default(tmsg); + } + } + tree.quit(); + sys->print("mpx exit\n"); +} + +mpxseqgen := 0; + +time(): int +{ + return ++mpxseqgen; # server time; assumes 2^31-1 is large enough +} + +userarrives(fid: int, name: string) +{ + u := User.new(fid, name); + qwrite(toroot, msg(u, 'a', nil)); + u.initqueue(toleaf); # sees leaf messages from now on + userlist = u :: userlist; +} + +fid2user(fid: int): ref User +{ + for(ul := userlist; ul != nil; ul = tl ul) + if((u := hd ul).fid == fid) + return u; + return nil; +} + +userleaves(fid: int) +{ + ul := userlist; + userlist = nil; + u: ref User; + for(; ul != nil; ul = tl ul) + if((hd ul).fid != fid) + userlist = hd ul :: userlist; + else + u = hd ul; + if(u != nil) + qwrite(toroot, msg(u, 'l', nil)); +} + +usernames(): string +{ + s := ""; + for(ul := userlist; ul != nil; ul = tl ul){ + u := hd ul; + s += string u.id+" "+u.name+"\n"; + } + return s; +} + +qwrite(msgs: ref Msglist, m: ref Msg) +{ + pending := msgs.write(m); + for(; pending != nil; pending = tl pending){ + (u, req) := hd pending; + m = u.read(); # must succeed, or the code is wrong + data := m.data; + if(req.count < len data) + data = data[0:req.count]; + srv.reply(ref Rmsg.Read(req.tag, data)); + } +} + +qread(msgs: ref Msglist, u: ref User, tm: ref Tmsg.Read, wait: int): ref Msg +{ + m := u.read(); + if(m != nil){ + if(tm.count < len m.data) + m.data = m.data[0:tm.count]; + }else if(wait) + msgs.wait(u, ref Readreq(tm.tag, tm.fid, tm.count, tm.offset)); + return m; +} + +cancelpending(tag: int) +{ + toroot.flushtag(tag); + toleaf.flushtag(tag); +} + +msg(u: ref User, op: int, data: array of byte): ref Msg +{ + a := sys->aprint("%ud %d %c %s ", time(), u.id, op, u.name); + m := ref Msg(u, array[len a + len data] of byte, nil); + m.data[0:] = a; + m.data[len a:] = data; + return m; +} |
