summaryrefslogtreecommitdiff
path: root/appl/grid/cpupool.b
diff options
context:
space:
mode:
Diffstat (limited to 'appl/grid/cpupool.b')
-rw-r--r--appl/grid/cpupool.b917
1 files changed, 917 insertions, 0 deletions
diff --git a/appl/grid/cpupool.b b/appl/grid/cpupool.b
new file mode 100644
index 00000000..0c3af3f6
--- /dev/null
+++ b/appl/grid/cpupool.b
@@ -0,0 +1,917 @@
+implement CpuPool;
+#
+# Copyright © 2003 Vita Nuova Holdings Limited. All rights reserved.
+#
+
+include "sys.m";
+ sys : Sys;
+include "daytime.m";
+ daytime: Daytime;
+include "styx.m";
+ styx: Styx;
+ Rmsg, Tmsg: import styx;
+include "styxservers.m";
+ styxservers: Styxservers;
+ Fid, Navigator, Navop: import styxservers;
+ Styxserver: import styxservers;
+ nametree: Nametree;
+ Tree: import nametree;
+include "draw.m";
+include "sh.m";
+include "arg.m";
+include "registries.m";
+ registries: Registries;
+ Registry, Attributes, Service: import registries;
+include "grid/announce.m";
+ announce: Announce;
+include "readdir.m";
+ readdir: Readdir;
+
+TEST: con 0;
+
+RUN : con "#!/dis/sh\n" +
+ "load std\n" +
+ "if {~ $#* 0} {\n" +
+ " echo usage: run.sh cmd args\n"+
+ " raise usage\n" +
+ "}\n"+
+ "CMD = $*\n" +
+ "{echo $CMD; dir=`{read -o 0}; cat <[0=3] > $dir/data& catpid=$apid;"+
+ " cat $dir/data >[1=4]; kill $catpid >[2] /dev/null} <[3=0] >[4=1] <> clone >[1=0]\n";
+
+EMPTYDIR: con "#//dev";
+rootpath := "/tmp/cpupool/";
+rstyxreg: ref Registry;
+registered: ref Registries->Registered;
+
+CpuSession: adt {
+ proxyid, fid, cpuid, omode, written, finished: int;
+ stdoutopen, stdinopen: int;
+ stdinchan, stdoutchan: chan of array of byte;
+ closestdin,closestdout, readstdout, sync: chan of int;
+ rcmdfinishedstdin, rcmdfinishedstdout: chan of int;
+ fio: ref sys->FileIO;
+ pids: list of int;
+};
+
+NILCPUSESSION: con CpuSession (-1, -1,-1, 0, 0, 0, 0, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil);
+
+cpusession: array of CpuSession;
+poolchanin : chan of string;
+poolchanout : chan of int;
+
+conids : array of int;
+
+CpuPool: module {
+ init: fn (nil : ref Draw->Context, argv: list of string);
+};
+
+init(nil : ref Draw->Context, argv: list of string)
+{
+ sys = load Sys Sys->PATH;
+ if (sys == nil)
+ badmod(Sys->PATH);
+ daytime = load Daytime Daytime->PATH;
+ if (daytime == nil)
+ badmod(Daytime->PATH);
+ styx = load Styx Styx->PATH;
+ if (styx == nil)
+ badmod(Styx->PATH);
+ styx->init();
+ styxservers = load Styxservers Styxservers->PATH;
+ if (styxservers == nil)
+ badmod(Styxservers->PATH);
+ styxservers->init(styx);
+ nametree = load Nametree Nametree->PATH;
+ if (nametree == nil)
+ badmod(Nametree->PATH);
+ nametree->init();
+ registries = load Registries Registries->PATH;
+ if (registries == nil)
+ badmod(Registries->PATH);
+ registries->init();
+ announce = load Announce Announce->PATH;
+ if (announce == nil)
+ badmod(Announce->PATH);
+ announce->init();
+ readdir = load Readdir Readdir->PATH;
+ if (readdir == nil)
+ badmod(Readdir->PATH);
+ arg := load Arg Arg->PATH;
+ if (arg == nil)
+ badmod(Arg->PATH);
+ sys->pctl(Sys->FORKNS | sys->NEWPGRP, nil);
+ sys->unmount(nil, "/n/remote");
+ getuid();
+ sys->chdir(EMPTYDIR);
+ cpusession = array[500] of { * => NILCPUSESSION };
+ attrs := Attributes.new(("proto", "styx") :: ("auth", "none") :: ("resource","Cpu Pool") :: nil);
+
+ arg->init(argv);
+ arg->setusage("cpupool [-a attributes] [rootdir]");
+ while ((opt := arg->opt()) != 0) {
+ case opt {
+ 'a' =>
+ attr := arg->earg();
+ val := arg->earg();
+ attrs.set(attr, val);
+ * =>
+ arg->usage();
+ }
+ }
+ argv = arg->argv();
+ arg = nil;
+
+ if (argv != nil)
+ rootpath = hd argv;
+ if (rootpath[len rootpath - 1] != '/')
+ rootpath[len rootpath] = '/';
+ (n, dir) := sys->stat(rootpath);
+ if (n == -1 || !(dir.mode & sys->DMDIR))
+ error("Invalid tmp path: "+rootpath);
+
+ rstyxreg = Registry.new("/mnt/rstyxreg");
+ if (rstyxreg == nil)
+ error("Could not find Rstyx Registry");
+
+ reg := Registry.connect(nil, nil, nil);
+ if (reg == nil)
+ error("Could not find registry");
+ (myaddr, c) := announce->announce();
+ if (myaddr == nil)
+ error(sys->sprint("cannot announce: %r"));
+ persist := 0;
+ err: string;
+ (registered, err) = reg.register(myaddr, attrs, persist);
+ if (err != nil)
+ error("could not register with registry: "+err);
+ conids = array[200] of { * => -1 };
+ poolchanin = chan of string;
+ poolchanout = chan of int;
+ userchan := chan of int;
+ spawn listener(*c);
+ spawn cpupoolloop(poolchanin, poolchanout);
+}
+
+attrval(s: string): (string, string)
+{
+ for (i := 0; i < len s; i++) {
+ if (s[i] == '=')
+ return (s[:i], s[i+1:]);
+ }
+ return (nil, s);
+}
+
+uid: string;
+Qroot : con 0;
+Qclone: con 1;
+
+Qdata: con 2;
+Qsh: con 3;
+Qrun: con 4;
+Qcpu: con 5;
+Qsessdir: con 6;
+Qsessdat: con 7;
+
+getuid()
+{
+ buf := array [100] of byte;
+ fd := sys->open("/dev/user", Sys->OREAD);
+ uidlen := sys->read(fd, buf, len buf);
+ uid = string buf[0: uidlen];
+}
+
+dir(name: string, perm: int, length: int, qid: int): Sys->Dir
+{
+ d := sys->zerodir;
+ d.name = name;
+ d.uid = uid;
+ d.gid = uid;
+ d.qid.path = big qid;
+ if (perm & Sys->DMDIR)
+ d.qid.qtype = Sys->QTDIR;
+ else {
+ d.qid.qtype = Sys->QTFILE;
+ d.length = big length;
+ }
+ d.mode = perm;
+ d.atime = d.mtime = daytime->now();
+ return d;
+}
+
+defaultdirs := array[] of {
+ ("dis", 1),
+ ("dev", 1),
+ ("fonts", 1),
+ ("mnt", 0),
+ ("prog", 0),
+};
+
+serveloop(fd : ref sys->FD, cmdchan: chan of (int, string, chan of int), exitchan, sync: chan of int, proxyid: int)
+{
+ if (TEST)
+ sys->fprint(sys->fildes(2), "starting serveloop");
+ tchan: chan of ref Tmsg;
+ srv: ref Styxserver;
+ (tree, treeop) := nametree->start();
+ tree.create(big Qroot, dir(".",8r555 | sys->DMDIR,0,Qroot));
+ tree.create(big Qroot, dir("clone",8r666,0,Qclone));
+ tree.create(big Qroot, dir("run.sh",8r555,0,Qrun));
+ tree.create(big Qroot, dir("cpu",8r444,0,Qcpu));
+ tree.create(big Qroot, dir("data",8r777 | sys->DMDIR,0,Qdata));
+ tree.create(big Qroot, dir("runtime",8r444 | sys->DMDIR,0,Qsh));
+
+ for (i := 0; i < len defaultdirs; i++)
+ tree.create(big Qroot, dir(defaultdirs[i].t0,8r555 | sys->DMDIR ,0,8 + (i<<4)));
+
+ (tchan, srv) = Styxserver.new(fd,Navigator.new(treeop), big Qroot);
+ fd = nil;
+ datafids : list of Datafid = nil;
+ sync <-= 1;
+ gm: ref Tmsg;
+ loop: for (;;) {
+ alt {
+ <-exitchan =>
+ break loop;
+
+ gm = <-tchan =>
+
+ if (gm == nil)
+ break loop;
+ # sys->fprint(sys->fildes(2), "Got new GM %s tag: %d\n", gm.text(), gm.tag);
+
+ pick m := gm {
+ Readerror =>
+ sys->fprint(sys->fildes(2), "cpupool: fatal read error: %s\n", m.error);
+ exit;
+ Clunk =>
+ deldf: Datafid;
+ (datafids, deldf) = delfid(datafids, m.fid);
+ if (deldf.sessid != -1) {
+ if (deldf.omode == sys->OREAD || deldf.omode == sys->ORDWR)
+ cpusession[deldf.sessid].sync <-= STDOUTCLOSE;
+ else if (deldf.omode == sys->OWRITE || deldf.omode == sys->ORDWR)
+ cpusession[deldf.sessid].sync <-= STDINCLOSE;
+ }
+ else {
+ sessid := getsession(m.fid);
+ if (sessid != -1)
+ cpusession[sessid].sync <-= CLONECLOSE;
+ }
+ srv.default(gm);
+ Open =>
+ (f, nil, d, err) := srv.canopen(m);
+ if(f == nil) {
+ srv.reply(ref Rmsg.Error(m.tag, err));
+ break;
+ }
+ ind := int f.uname;
+ mode := m.mode & 3;
+ case int f.path & 15 {
+ Qclone =>
+ if (mode == sys->OREAD) {
+ srv.reply(ref Rmsg.Error(m.tag, "ctl cannot be open as read only"));
+ break;
+ }
+ poolchanin <-= "request";
+ cpuid := <-poolchanout;
+ if (cpuid == -1)
+ srv.reply(ref Rmsg.Error(m.tag, "no free resources"));
+ else {
+ sessid := getsession(-1);
+ cpusession[sessid].fid = m.fid;
+ cpusession[sessid].cpuid = cpuid;
+ cpusession[sessid].omode = mode;
+ cpusession[sessid].sync = chan of int;
+ cpusession[sessid].proxyid = proxyid;
+ spawn sessionctl(sessid, tree);
+ Qdir := Qsessdir | (sessid<<4);
+ tree.create(big Qroot, dir(string sessid,
+ 8r777 | sys->DMDIR,0, Qdir));
+ tree.create(big Qdir, dir("data", 8r666,0, Qsessdat | (sessid<<4)));
+ if (TEST)
+ sys->fprint(sys->fildes(2), "New Session %d\n\tcpuid: %d\n"
+ ,sessid,cpuid);
+ srv.default(gm);
+ }
+ Qsessdat =>
+ err = "";
+ sessid := (int f.path)>>4;
+ datafids = addfid(datafids, Datafid(sessid, m.fid, mode));
+ if (cpusession[sessid].finished)
+ err = "session already finished";
+ else if (mode == sys->OREAD || mode == sys->ORDWR) {
+ if (cpusession[sessid].stdoutopen == -1)
+ err = "pipe closed";
+ else
+ cpusession[sessid].sync <-= STDOUTOPEN;
+ }
+ else if (mode == sys->OWRITE || mode == sys->ORDWR) {
+ if (cpusession[sessid].stdinopen == -1)
+ err = "pipe closed";
+ else
+ cpusession[sessid].sync <-= STDINOPEN;
+ }
+ # sys->fprint(sys->fildes(2),
+ # "Open: Data: sessid %d, stdout %d stdin %d: err: '%s'\n",
+ # sessid,cpusession[sessid].stdoutopen,
+ # cpusession[sessid].stdinopen, err);
+ if (err == nil)
+ srv.default(gm);
+ else
+ srv.reply(ref Rmsg.Error(m.tag, err));
+ * =>
+ # sys->print("Open: %s tag: %d\n", gm.text(), gm.tag);
+ srv.default(gm);
+ }
+ Write =>
+ (f,e) := srv.canwrite(m);
+ if(f == nil) {
+ # sys->print("breaking! %r\n");
+ break;
+ }
+ case int f.path & 15 {
+ Qsessdat =>
+ sessid := (int f.path)>>4;
+ # sys->fprint(sys->fildes(2), "Write: Data %d len: %d\n",
+ # sessid,len m.data);
+ spawn datawrite(sessid,srv,m);
+ Qclone =>
+ sessid := getsession(m.fid);
+ # sys->fprint(sys->fildes(2), "Write: clone %d\n",sessid);
+ spawn clonewrite(sessid,srv, m, cmdchan);
+ * =>
+ srv.default(gm);
+ }
+
+ Read =>
+ (f,e) := srv.canread(m);
+ if(f == nil)
+ break;
+ case int f.path & 15 {
+ Qclone =>
+ sessid := getsession(m.fid);
+ # sys->fprint(sys->fildes(2), "Read: clone %d\n",sessid);
+ srv.reply(styxservers->readbytes(m, array of byte (string sessid + "\n")));
+ Qsessdat =>
+ sessid := (int f.path)>>4;
+ # sys->fprint(sys->fildes(2), "Read: data session: %d\n",sessid);
+ if (cpusession[sessid].finished)
+ srv.reply(ref Rmsg.Error(m.tag, "session finished"));
+ else
+ spawn dataread(sessid, srv, m);
+ Qrun =>
+ srv.reply(styxservers->readbytes(m, array of byte RUN));
+ Qcpu =>
+ poolchanin <-= "refresh";
+ s := (string ncpupool) + "\n";
+ srv.reply(styxservers->readbytes(m, array of byte s));
+ * =>
+ srv.default(gm);
+ }
+
+ * =>
+ srv.default(gm);
+ }
+ }
+ }
+ if (TEST)
+ sys->fprint(sys->fildes(2), "leaving serveloop...\n");
+ tree.quit();
+ for (i = 0; i < len cpusession; i++) {
+ if (cpusession[i].proxyid == proxyid) {
+ #Tear it down!
+ if (TEST)
+ sys->fprint(sys->fildes(2), "Killing off session %d\n",i);
+ poolchanin <-= "free "+string cpusession[i].cpuid;
+ for (; cpusession[i].pids != nil; cpusession[i].pids = tl cpusession[i].pids)
+ kill(hd cpusession[i].pids);
+ cpusession[i] = NILCPUSESSION;
+ }
+ }
+ if (TEST)
+ sys->fprint(sys->fildes(2), "serveloop exited\n");
+}
+
+dataread(sessid: int, srv: ref Styxserver, m: ref Tmsg.Read)
+{
+ cpusession[sessid].readstdout <-= 1;
+ data := <- cpusession[sessid].stdoutchan;
+ srv.reply(ref Rmsg.Read(m.tag, data));
+}
+
+datawrite(sessid: int, srv: ref Styxserver, m: ref Tmsg.Write)
+{
+ # sys->fprint(sys->fildes(2), "Writing to Stdin %d (%d)\n'%s'\n",
+ # len m.data, m.tag, string m.data);
+ cpusession[sessid].stdinchan <-= m.data;
+ # sys->fprint(sys->fildes(2), "Written to Stdin %d!\n",m.tag);
+ srv.reply(ref Rmsg.Write(m.tag, len m.data));
+}
+
+clonewrite(sessid: int, srv: ref Styxserver, m: ref Tmsg.Write, cmdchan: chan of (int, string, chan of int))
+{
+ if (cpusession[sessid].written) {
+ srv.reply(ref Rmsg.Error(m.tag, "session already started"));
+ return;
+ }
+ rc := chan of int;
+ cmdchan <-= (sessid, string m.data, rc);
+ i := <-rc;
+ # sys->fprint(sys->fildes(2), "Sending write\n");
+ srv.reply(ref Rmsg.Write(m.tag, i));
+}
+
+badmod(path: string)
+{
+ sys->fprint(sys->fildes(1), "error CpuPool: failed to load: %s\n",path);
+ exit;
+}
+
+listener(c: Sys->Connection)
+{
+ for (;;) {
+ (n, nc) := sys->listen(c);
+ if (n == -1)
+ error(sys->sprint("listen failed: %r"));
+ dfd := sys->open(nc.dir + "/data", Sys->ORDWR);
+ if (dfd != nil) {
+ sync := chan of int;
+ sys->print("got new connection!\n");
+ spawn proxy(sync, dfd);
+ <-sync;
+ }
+ }
+}
+
+proxy(sync: chan of int, dfd: ref Sys->FD)
+{
+ proxypid := sys->pctl(0, nil);
+ sys->pctl(sys->FORKNS, nil);
+ sys->chdir(EMPTYDIR);
+ sync <-= 1;
+
+ sync = chan of int;
+ fds := array[2] of ref sys->FD;
+ sys->pipe(fds);
+ cmdchan := chan of (int, string, chan of int);
+ exitchan := chan of int;
+ killsrvloop := chan of int;
+ spawn serveloop(fds[0], cmdchan, killsrvloop, sync, proxypid);
+ <-sync;
+
+ if (sys->mount(fds[1], nil, "/n/remote", Sys->MREPL | sys->MCREATE, nil) == -1)
+ error(sys->sprint("cannot mount mountfd: %r"));
+
+ conid := getconid(-1);
+ conids[conid] = 1;
+ setupworkspace(conid);
+
+ spawn exportns(dfd, conid, exitchan);
+ for (;;) alt {
+ (sessid, cmd, reply) := <-cmdchan =>
+ spawn runit(conid, sessid, cmd, reply);
+ e := <-exitchan =>
+ killsrvloop <-= 1;
+ return;
+ }
+}
+
+getconid(id: int): int
+{
+ for (i := 0; i < len conids; i++)
+ if (conids[i] == id)
+ return i;
+ return -1;
+}
+
+exportns(dfd: ref Sys->FD, conid: int, exitchan: chan of int)
+{
+ sys->export(dfd, "/n/remote", sys->EXPWAIT);
+ if (TEST)
+ sys->fprint(sys->fildes(2), "Export Finished!\n");
+ conids[conid] = -1;
+ exitchan <-= 1;
+}
+
+error(e: string)
+{
+ sys->fprint(sys->fildes(2), "CpuPool: %s: %r\n", e);
+ raise "fail:error";
+}
+
+setupworkspace(pathid: int)
+{
+ path := rootpath + string pathid;
+ sys->create(path, sys->OREAD, 8r777 | sys->DMDIR);
+ delpath(path, 0);
+ sys->create(path + "/data", sys->OREAD, 8r777 | sys->DMDIR);
+ if (sys->bind(path+"/data", "/n/remote/data",
+ sys->MREPL | sys->MCREATE) == -1)
+ sys->fprint(sys->fildes(2), "data bind error %r\n");
+ sys->create(path + "/runtime", sys->OREAD, 8r777 | sys->DMDIR);
+ if (sys->bind(path+"/runtime", "/n/remote/runtime", sys->MREPL) == -1)
+ sys->fprint(sys->fildes(2), "runtime bind error %r\n");
+ for (i := 0; i < len defaultdirs; i++) {
+ if (defaultdirs[i].t1 == 1) {
+ sys->create(path+"/"+defaultdirs[i].t0, sys->OREAD, 8r777 | sys->DMDIR);
+ if (sys->bind("/"+defaultdirs[i].t0,
+ "/n/remote/"+defaultdirs[i].t0, sys->MREPL) == -1)
+ sys->fprint(sys->fildes(2), "dir bind error %r\n");
+ }
+ }
+}
+
+delpath(path: string, incl: int)
+{
+ if (path[len path - 1] != '/')
+ path[len path] = '/';
+ (dirs, n) := readdir->init(path, readdir->NONE | readdir->COMPACT);
+ for (i := 0; i < n; i++) {
+ if (dirs[i].mode & sys->DMDIR)
+ delpath(path + dirs[i].name, 1);
+ else
+ sys->remove(path + dirs[i].name);
+ }
+ if (incl)
+ sys->remove(path);
+}
+
+runit(id, sessid: int, cmd: string, sync: chan of int)
+{
+ # sys->print("got runit!\n");
+ cpusession[sessid].sync <-= PID;
+ cpusession[sessid].sync <-= sys->pctl(sys->FORKNS, nil);
+ if (!TEST && sys->bind("/net.alt", "/net", sys->MREPL) == -1) {
+ sys->fprint(sys->fildes(2), "cpupool net.alt bind failed: %r\n");
+ sync <-= -1;
+ return;
+ }
+ path := rootpath + string id;
+ runfile := "/runtime/start"+string cpusession[sessid].cpuid+".sh";
+ sh := load Sh Sh->PATH;
+ if(sh == nil) {
+ sys->fprint(sys->fildes(2), "Failed to load sh: %r\n");
+ sync <-= -1;
+ return;
+ }
+
+ sys->remove(path+runfile);
+ fd := sys->create(path+runfile, sys->OWRITE, 8r777);
+ if (fd == nil) {
+ sync <-= -1;
+ return;
+ }
+ sys->fprint(fd, "#!/dis/sh\n");
+ sys->fprint(fd, "bind /prog /n/client/prog\n");
+ sys->fprint(fd, "bind /n/client /\n");
+ sys->fprint(fd, "cd /\n");
+ sys->fprint(fd, "%s\n", cmd);
+
+ if (sys->bind("#s", "/n/remote/runtime", Sys->MBEFORE|Sys->MCREATE) == -1) {
+ sys->fprint(sys->fildes(2), "cpupool: %r\n");
+ return;
+ }
+
+ cpusession[sessid].fio = sys->file2chan("/n/remote/runtime", "mycons");
+ if (cpusession[sessid].fio == nil) {
+ sys->fprint(sys->fildes(2), "cpupool: file2chan failed: %r\n");
+ return;
+ }
+
+ if (sys->bind("/n/remote/runtime/mycons", "/n/remote/dev/cons", sys->MREPL) == -1)
+ sys->fprint(sys->fildes(2), "cons bind error %r\n");
+ cpusession[sessid].written = 1;
+
+ cpusession[sessid].stdinchan = chan of array of byte;
+ cpusession[sessid].closestdin = chan of int;
+ cpusession[sessid].rcmdfinishedstdin = chan of int;
+ spawn devconsread(sessid);
+
+ cpusession[sessid].stdoutchan = chan of array of byte;
+ cpusession[sessid].closestdout = chan of int;
+ cpusession[sessid].readstdout = chan of int;
+ cpusession[sessid].rcmdfinishedstdout = chan of int;
+ spawn devconswrite(sessid);
+
+ # Let it know that session channels have been created & can be listened on...
+ sync <-= len cmd;
+
+ # would prefer that it were authenticated
+ if (TEST)
+ sys->print("ABOUT TO RCMD\n");
+ sh->run(nil, "rcmd" :: "-A" :: "-e" :: "/n/remote" ::
+ cpupool[cpusession[sessid].cpuid].srvc.addr ::
+ "sh" :: "-c" :: "/n/client"+runfile :: nil);
+ if (TEST)
+ sys->print("DONE RCMD\n");
+
+ sys->remove(path+runfile);
+ sys->unmount(nil, "/n/remote/dev/cons");
+ cpusession[sessid].rcmdfinishedstdin <-= 1;
+ cpusession[sessid].rcmdfinishedstdout <-= 1;
+ cpusession[sessid].sync <-= FINISHED;
+}
+
+CLONECLOSE: con 0;
+FINISHED: con 1;
+STDINOPEN: con 2;
+STDINCLOSE: con 3;
+STDOUTOPEN: con 4;
+STDOUTCLOSE: con 5;
+PID: con -2;
+
+sessionctl(sessid: int, tree: ref Nametree->Tree)
+{
+ cpusession[sessid].pids = sys->pctl(0, nil) :: nil;
+ clone := 1;
+ closed := 0;
+ main: for (;;) {
+ i := <-cpusession[sessid].sync;
+ case i {
+ PID =>
+ pid := <-cpusession[sessid].sync;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "adding PID: %d\n", pid);
+ cpusession[sessid].pids = pid :: cpusession[sessid].pids;
+ STDINOPEN =>
+ cpusession[sessid].stdinopen++;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Open stdin: => %d\n",
+ sessid, cpusession[sessid].stdinopen);
+ STDOUTOPEN =>
+ cpusession[sessid].stdoutopen++;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Open stdout: => %d\n",
+ sessid, cpusession[sessid].stdoutopen);
+ STDINCLOSE =>
+ cpusession[sessid].stdinopen--;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Close stdin: => %d\n",
+ sessid, cpusession[sessid].stdinopen);
+ if (cpusession[sessid].stdinopen == 0) {
+ cpusession[sessid].stdinopen = -1;
+ cpusession[sessid].closestdin <-= 1;
+ }
+ # sys->fprint(sys->fildes(2), "Clunk: stdin (in %d: out %d\n",
+ # cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
+ STDOUTCLOSE =>
+ cpusession[sessid].stdoutopen--;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Close stdout: => %d\n",
+ sessid, cpusession[sessid].stdoutopen);
+ if (cpusession[sessid].stdoutopen == 0) {
+ cpusession[sessid].stdoutopen = -1;
+ cpusession[sessid].closestdout <-= 1;
+ }
+ #sys->fprint(sys->fildes(2), "Clunk: stdout (in %d: out %d\n",
+ # cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
+ CLONECLOSE =>
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Close clone\n", sessid);
+ clone = 0;
+ #sys->fprint(sys->fildes(2), "Clunk: clone (in %d: out %d\n",
+ # cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
+ FINISHED =>
+ if (TEST)
+ sys->fprint(sys->fildes(2), "%d: Rcmd finished", sessid);
+
+ cpusession[sessid].finished = 1;
+ poolchanin <-= "free "+string cpusession[sessid].cpuid;
+ if (closed)
+ break main;
+ }
+ if (cpusession[sessid].stdinopen <= 0 &&
+ cpusession[sessid].stdoutopen <= 0 &&
+ clone == 0) {
+
+ closed = 1;
+ tree.remove(big (Qsessdir | (sessid<<4)));
+ tree.remove(big (Qsessdat | (sessid<<4)));
+ if (cpusession[sessid].finished || !cpusession[sessid].written)
+ break main;
+ }
+ }
+ if (!cpusession[sessid].finished) # ie never executed anything
+ poolchanin <-= "free "+string cpusession[sessid].cpuid;
+ cpusession[sessid] = NILCPUSESSION;
+ if (TEST)
+ sys->fprint(sys->fildes(2), "closing session %d\n",sessid);
+}
+
+devconswrite(sessid: int)
+{
+ cpusession[sessid].sync <-= PID;
+ cpusession[sessid].sync <-= sys->pctl(0, nil);
+ stdouteof := 0;
+ file2chaneof := 0;
+ rcmddone := 0;
+ main: for (;;) alt {
+ <-cpusession[sessid].rcmdfinishedstdout =>
+ rcmddone = 1;
+ if (file2chaneof)
+ break main;
+ <-cpusession[sessid].closestdout =>
+ stdouteof = 1;
+ (offset, d, fid, wc) := <-cpusession[sessid].fio.write =>
+ if (wc != nil) {
+ # sys->fprint(sys->fildes(2), "stdout: '%s'\n", string d);
+ if (stdouteof) {
+ # sys->fprint(sys->fildes(2), "stdout: sending EOF\n");
+ wc <-= (0, nil);
+ continue;
+ }
+ alt {
+ <-cpusession[sessid].closestdout =>
+ # sys->print("got closestdout\n");
+ wc <-= (0, nil);
+ stdouteof = 1;
+ <-cpusession[sessid].readstdout =>
+ cpusession[sessid].stdoutchan <-= d;
+ wc <-= (len d, nil);
+ }
+ }
+ else {
+ # sys->fprint(sys->fildes(2), "got nil wc\n");
+ file2chaneof = 1;
+ if (rcmddone)
+ break main;
+ }
+ }
+ # No more input at this point as rcmd has finished;
+ if (stdouteof || cpusession[sessid].stdoutopen == 0) {
+ # sys->print("leaving devconswrite\n");
+ return;
+ }
+ for (;;) alt {
+ <-cpusession[sessid].closestdout =>
+ # sys->print("got closestdout\n");
+ # sys->print("leaving devconswrite\n");
+ return;
+ <- cpusession[sessid].readstdout =>
+ cpusession[sessid].stdoutchan <-= nil;
+ }
+}
+
+devconsread(sessid: int)
+{
+ cpusession[sessid].sync <-= PID;
+ cpusession[sessid].sync <-= sys->pctl(0, nil);
+ stdineof := 0;
+ file2chaneof := 0;
+ rcmddone := 0;
+ main: for (;;) alt {
+ <-cpusession[sessid].rcmdfinishedstdin =>
+ rcmddone = 1;
+ if (file2chaneof)
+ break main;
+ <-cpusession[sessid].closestdin =>
+ # sys->print("got stdin close\n");
+ stdineof = 1;
+ (offset, count, fid, rc) := <-cpusession[sessid].fio.read =>
+ if (rc != nil) {
+ # sys->fprint(sys->fildes(2), "devconsread: '%d %d'\n", count, offset);
+ if (stdineof) {
+ rc <-= (nil, nil);
+ continue;
+ }
+ alt {
+ data := <-cpusession[sessid].stdinchan =>
+ # sys->print("got data len %d\n", len data);
+ rc <-= (data, nil);
+ <-cpusession[sessid].closestdin =>
+ # sys->print("got stdin close\n");
+ stdineof = 1;
+ rc <-= (nil, nil);
+ }
+ }
+ else {
+ # sys->print("got nil rc\n");
+ file2chaneof = 1;
+ if (rcmddone)
+ break main;
+ }
+ }
+ if (!stdineof && cpusession[sessid].stdinopen != 0)
+ <-cpusession[sessid].closestdin;
+ # sys->fprint(sys->fildes(2), "Leaving devconsread\n");
+}
+
+Srvcpool: adt {
+ srvc: ref Service;
+ inuse: int;
+};
+
+cpupool: array of Srvcpool;
+ncpupool := 0;
+
+cpupoolloop(chanin: chan of string, chanout: chan of int)
+{
+ cpupool = array[200] of Srvcpool;
+ for (i := 0; i < len cpupool; i++)
+ cpupool[i] = Srvcpool (nil, 0);
+ wait := 0;
+ for (;;) {
+ inp := <-chanin;
+ # sys->print("poolloop: '%s'\n",inp);
+ (nil, lst) := sys->tokenize(inp, " \t\n");
+ case hd lst {
+ "refresh" =>
+ if (daytime->now() - wait >= 60) {
+ refreshcpupool();
+ wait = daytime->now();
+ }
+ "request" =>
+ if (daytime->now() - wait >= 60) {
+ refreshcpupool();
+ wait = daytime->now();
+ }
+ found := -1;
+ # sys->print("found %d services...\n", ncpupool);
+ for (i = 0; i < ncpupool; i++) {
+ if (!cpupool[i].inuse) {
+ found = i;
+ cpupool[i].inuse = 1;
+ break;
+ }
+ }
+ # sys->print("found service %d\n", found);
+ chanout <-= found;
+ "free" =>
+ if (TEST)
+ sys->print("freed service %d\n", int hd tl lst);
+ cpupool[int hd tl lst].inuse = 0;
+ }
+ }
+}
+
+refreshcpupool()
+{
+ (lsrv, err) := rstyxreg.find(("resource", "Rstyx resource") :: nil);
+ # sys->print("found %d resources\n",len lsrv);
+ if (err != nil)
+ return;
+ tmp := array[len cpupool] of Srvcpool;
+ ntmp := len lsrv;
+ i := 0;
+ for (;lsrv != nil; lsrv = tl lsrv)
+ tmp[i++] = Srvcpool(hd lsrv, 0);
+ min := 0;
+ for (i = 0; i < ntmp; i++) {
+ for (j := min; j < ncpupool; j++) {
+ if (tmp[i].srvc.addr == cpupool[j].srvc.addr) {
+ if (j == min)
+ min++;
+ tmp[i].inuse = cpupool[j].inuse;
+ }
+ }
+ }
+ ncpupool = ntmp;
+ for (i = 0; i < ntmp; i++)
+ cpupool[i] = tmp[i];
+ # sys->print("ncpupool: %d\n",ncpupool);
+}
+
+getsession(fid: int): int
+{
+ for (i := 0; i < len cpusession; i++)
+ if (cpusession[i].fid == fid)
+ return i;
+ return -1;
+}
+
+kill(pid: int)
+{
+ if ((fd := sys->open("/prog/" + string pid + "/ctl", Sys->OWRITE)) != nil)
+ sys->fprint(fd, "kill");
+}
+
+killg(pid: int)
+{
+ if ((fd := sys->open("/prog/" + string pid + "/ctl", Sys->OWRITE)) != nil)
+ sys->fprint(fd, "killgrp");
+}
+
+delfid(datafids: list of Datafid, fid: int): (list of Datafid, Datafid)
+{
+ rdf := Datafid (-1, -1, -1);
+ tmp : list of Datafid = nil;
+ for (; datafids != nil; datafids = tl datafids) {
+ testdf := hd datafids;
+ if (testdf.fid == fid)
+ rdf = testdf;
+ else
+ tmp = testdf :: tmp;
+ }
+ return (tmp, rdf);
+}
+
+addfid(datafids: list of Datafid, df: Datafid): list of Datafid
+{
+ (datafids, nil) = delfid(datafids, df.fid);
+ return df :: datafids;
+}
+
+Datafid: adt {
+ sessid, fid, omode: int;
+};