summaryrefslogtreecommitdiff
path: root/appl/collab/servers/mpx.b
diff options
context:
space:
mode:
Diffstat (limited to 'appl/collab/servers/mpx.b')
-rw-r--r--appl/collab/servers/mpx.b301
1 files changed, 301 insertions, 0 deletions
diff --git a/appl/collab/servers/mpx.b b/appl/collab/servers/mpx.b
new file mode 100644
index 00000000..ea0c2c1c
--- /dev/null
+++ b/appl/collab/servers/mpx.b
@@ -0,0 +1,301 @@
+implement Service;
+
+#
+# 1 to many and many to 1 multiplexor
+#
+
+include "sys.m";
+ sys: Sys;
+ Qid: import Sys;
+
+include "styx.m";
+ styx: Styx;
+ Tmsg, Rmsg: import styx;
+
+include "styxservers.m";
+ styxservers: Styxservers;
+ Styxserver, Navigator: import styxservers;
+ nametree: Nametree;
+ Tree: import nametree;
+
+include "service.m";
+
+include "messages.m";
+ messages: Messages;
+ Msg, Msglist, Readreq, User: import messages;
+
+Qdir, Qroot, Qusers, Qleaf: con iota;
+
+srv: ref Styxserver;
+clientidgen := 0;
+
+Einactive: con "not currently active";
+
+toleaf: ref Msglist;
+toroot: ref Msglist;
+userlist: list of ref User;
+
+user := "inferno";
+
+dir(name: string, perm: int, path: int): Sys->Dir
+{
+ d := sys->zerodir;
+ d.name = name;
+ d.uid = user;
+ d.gid = user;
+ d.qid.path = big path;
+ if(perm & Sys->DMDIR)
+ d.qid.qtype = Sys->QTDIR;
+ else
+ d.qid.qtype = Sys->QTFILE;
+ d.mode = perm;
+ return d;
+}
+
+init(nil: list of string): (string, string, ref Sys->FD)
+{
+ sys = load Sys Sys->PATH;
+ styx = load Styx Styx->PATH;
+ if(styx == nil)
+ return (sys->sprint("can't load %s: %r", Styx->PATH), nil, nil);
+ styxservers = load Styxservers Styxservers->PATH;
+ if(styxservers == nil)
+ return (sys->sprint("can't load %s: %r", Styxservers->PATH), nil, nil);
+ nametree = load Nametree Nametree->PATH;
+ if(nametree == nil)
+ return (sys->sprint("can't load %s: %r", Nametree->PATH), nil, nil);
+ styx->init();
+ styxservers->init(styx);
+styxservers->traceset(1);
+ nametree->init();
+ messages = load Messages Messages->PATH;
+ if(messages == nil)
+ return (sys->sprint("can't load %s: %r", Messages->PATH), nil, nil);
+
+ (tree, treeop) := nametree->start();
+ tree.create(big Qdir, dir(".", Sys->DMDIR|8r555, Qdir));
+ tree.create(big Qdir, dir("leaf", 8r666, Qleaf));
+ tree.create(big Qdir, dir("root", 8r666, Qroot));
+ tree.create(big Qdir, dir("users", 8r444, Qusers));
+
+ p := array [2] of ref Sys->FD;
+ if (sys->pipe(p) < 0){
+ tree.quit();
+ return (sys->sprint("can't create pipe: %r"), nil, nil);
+ }
+
+ toleaf = Msglist.new();
+ toroot = Msglist.new();
+
+ tc: chan of ref Tmsg;
+ (tc, srv) = Styxserver.new(p[1], Navigator.new(treeop), big Qdir);
+ spawn mpx(tc, tree);
+
+ return (nil, "/", p[0]);
+}
+
+mpx(tc: chan of ref Tmsg, tree: ref Tree)
+{
+ root: ref User;
+ while((tmsg := <-tc) != nil){
+ pick tm := tmsg {
+ Readerror =>
+ break;
+ Open =>
+ c := srv.getfid(tm.fid);
+ if(c == nil || c.isopen){
+ srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
+ continue;
+ }
+ case int c.path {
+ Qroot =>
+ if(root != nil){
+ srv.reply(ref Rmsg.Error(tm.tag, sys->sprint("interaction already directed by %s", root.name)));
+ continue;
+ }
+ c = srv.open(tm);
+ if (c == nil)
+ continue;
+ root = ref User(0, tm.fid, c.uname, nil);
+ root.initqueue(toroot);
+ Qleaf =>
+ if(root == nil){
+ srv.reply(ref Rmsg.Error(tm.tag, Einactive));
+ continue;
+ }
+ c = srv.open(tm);
+ if (c == nil)
+ continue;
+ userarrives(tm.fid, c.uname);
+ # mpxdir[1].qid.vers++; # TO DO
+ * =>
+ srv.open(tm);
+ }
+ Read =>
+ c := srv.getfid(tm.fid);
+ if (c == nil || !c.isopen) {
+ srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
+ continue;
+ }
+ case int c.path {
+ Qdir =>
+ srv.read(tm);
+ Qroot =>
+ tm.offset = big 0;
+ m := qread(toroot, root, tm, 1);
+ if(m != nil)
+ srv.reply(ref Rmsg.Read(tm.tag, m.data));
+ Qleaf =>
+ u := fid2user(tm.fid);
+ if (u == nil) {
+ srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
+ continue;
+ }
+ tm.offset = big 0;
+ m := qread(toleaf, u, tm, 0);
+ if(m == nil){
+ if(root == nil)
+ srv.reply(ref Rmsg.Read(tm.tag, nil));
+ else
+ qread(toleaf, u, tm, 1); # put us on the wait queue
+ }else
+ srv.reply(ref Rmsg.Read(tm.tag, m.data));
+ Qusers =>
+ srv.reply(styxservers->readstr(tm, usernames()));
+ * =>
+ srv.reply(ref Rmsg.Error(tm.tag, "phase error -- bad path"));
+ }
+ Write =>
+ c := srv.getfid(tm.fid);
+ if (c == nil || !c.isopen) {
+ srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
+ continue;
+ }
+ case int c.path {
+ Qroot =>
+ qwrite(toleaf, msg(root, 'M', tm.data));
+ srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
+ Qleaf =>
+ u := fid2user(tm.fid);
+ if(u == nil) {
+ srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
+ continue;
+ }
+ if(root == nil){
+ srv.reply(ref Rmsg.Error(tm.tag, Einactive));
+ continue;
+ }
+ qwrite(toroot, msg(u, 'm', tm.data));
+ srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
+ * =>
+ srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Eperm));
+ }
+ Flush =>
+ cancelpending(tm.tag);
+ srv.reply(ref Rmsg.Flush(tm.tag));
+ Clunk =>
+ c := srv.getfid(tm.fid);
+ if(c.isopen){
+ case int c.path {
+ Qroot =>
+ # shut down?
+ qwrite(toleaf, msg(root, 'L', nil));
+ root = nil;
+ Qleaf =>
+ userleaves(tm.fid);
+ # mpxdir[1].qid.vers++; # TO DO
+ }
+ }
+ * =>
+ srv.default(tmsg);
+ }
+ }
+ tree.quit();
+ sys->print("mpx exit\n");
+}
+
+mpxseqgen := 0;
+
+time(): int
+{
+ return ++mpxseqgen; # server time; assumes 2^31-1 is large enough
+}
+
+userarrives(fid: int, name: string)
+{
+ u := User.new(fid, name);
+ qwrite(toroot, msg(u, 'a', nil));
+ u.initqueue(toleaf); # sees leaf messages from now on
+ userlist = u :: userlist;
+}
+
+fid2user(fid: int): ref User
+{
+ for(ul := userlist; ul != nil; ul = tl ul)
+ if((u := hd ul).fid == fid)
+ return u;
+ return nil;
+}
+
+userleaves(fid: int)
+{
+ ul := userlist;
+ userlist = nil;
+ u: ref User;
+ for(; ul != nil; ul = tl ul)
+ if((hd ul).fid != fid)
+ userlist = hd ul :: userlist;
+ else
+ u = hd ul;
+ if(u != nil)
+ qwrite(toroot, msg(u, 'l', nil));
+}
+
+usernames(): string
+{
+ s := "";
+ for(ul := userlist; ul != nil; ul = tl ul){
+ u := hd ul;
+ s += string u.id+" "+u.name+"\n";
+ }
+ return s;
+}
+
+qwrite(msgs: ref Msglist, m: ref Msg)
+{
+ pending := msgs.write(m);
+ for(; pending != nil; pending = tl pending){
+ (u, req) := hd pending;
+ m = u.read(); # must succeed, or the code is wrong
+ data := m.data;
+ if(req.count < len data)
+ data = data[0:req.count];
+ srv.reply(ref Rmsg.Read(req.tag, data));
+ }
+}
+
+qread(msgs: ref Msglist, u: ref User, tm: ref Tmsg.Read, wait: int): ref Msg
+{
+ m := u.read();
+ if(m != nil){
+ if(tm.count < len m.data)
+ m.data = m.data[0:tm.count];
+ }else if(wait)
+ msgs.wait(u, ref Readreq(tm.tag, tm.fid, tm.count, tm.offset));
+ return m;
+}
+
+cancelpending(tag: int)
+{
+ toroot.flushtag(tag);
+ toleaf.flushtag(tag);
+}
+
+msg(u: ref User, op: int, data: array of byte): ref Msg
+{
+ a := sys->aprint("%ud %d %c %s ", time(), u.id, op, u.name);
+ m := ref Msg(u, array[len a + len data] of byte, nil);
+ m.data[0:] = a;
+ m.data[len a:] = data;
+ return m;
+}