summaryrefslogtreecommitdiff
path: root/appl/alphabet/grid
diff options
context:
space:
mode:
Diffstat (limited to 'appl/alphabet/grid')
-rw-r--r--appl/alphabet/grid/farm.b144
-rw-r--r--appl/alphabet/grid/line2rec.b91
-rw-r--r--appl/alphabet/grid/local.b86
-rw-r--r--appl/alphabet/grid/mkfile22
-rw-r--r--appl/alphabet/grid/remote.b88
-rw-r--r--appl/alphabet/grid/rexec.b112
6 files changed, 543 insertions, 0 deletions
diff --git a/appl/alphabet/grid/farm.b b/appl/alphabet/grid/farm.b
new file mode 100644
index 00000000..4aaa8758
--- /dev/null
+++ b/appl/alphabet/grid/farm.b
@@ -0,0 +1,144 @@
+implement Farm, Gridmodule;
+include "sys.m";
+ sys: Sys;
+include "draw.m";
+include "sh.m";
+ sh: Sh;
+include "string.m";
+ str: String;
+include "alphabet/reports.m";
+ reports: Reports;
+ report, Report, quit: import reports;
+include "alphabet/endpoints.m";
+ endpoints: Endpoints;
+ Endpoint: import endpoints;
+include "alphabet/grid.m";
+ grid: Grid;
+ Value: import grid;
+
+Farm: module {};
+
+types(): string
+{
+ return "eesss*-A-k-a-v-bs";
+}
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ reports = checkload(load Reports Reports->PATH, Reports->PATH);
+ endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH);
+ endpoints->init();
+ grid = checkload(load Grid Grid->PATH, Grid->PATH);
+ grid->init();
+ sh = checkload(load Sh Sh->PATH, Sh->PATH);
+ sh->initialise();
+ str = checkload(load String String->PATH, String->PATH);
+}
+
+run(nil: chan of string, r: ref Reports->Report,
+ opt: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value
+{
+ ec0 := (hd args).e().i;
+ addr := (hd tl args).s().i;
+ job, opts: string;
+ noauth := 0;
+ for(; opt != nil; opt = tl opt){
+ c := (hd opt).t0;
+ case (hd opt).t0 {
+ 'A' =>
+ noauth = 1;
+ 'b' =>
+ opts += " -b "+(hd (hd opt).t1).s().i;
+ * =>
+ opts += sys->sprint(" -%c", (hd opt).t0);
+ }
+ }
+ for(args = tl tl args; args != nil; args = tl args)
+ job += sys->sprint(" %q", (hd args).s().i);
+
+ spawn farmproc(sync := chan of int, addr, ec0, opts, job, noauth, r.start("farm"), ec := chan of Endpoint);
+ <-sync;
+ return ref Value.Ve(ec);
+}
+
+farmproc(sync: chan of int,
+ addr: string,
+ ec0: chan of Endpoint,
+ opts: string,
+ job: string,
+ noauth: int,
+ errorc: chan of string,
+ ec1: chan of Endpoint)
+{
+ sys->pctl(Sys->FORKNS, nil);
+ sync <-= 1;
+ ep0 := <-ec0;
+ if(ep0.addr == nil){
+ ec1 <-= ep0;
+ quit(errorc);
+ }
+ (v, e) := farm(addr, ep0, opts, job, noauth, errorc);
+ if(e != nil){
+ endpoints->open(nil, ep0);
+ report(errorc, "error: "+e);
+ }
+ ec1 <-= v;
+ quit(errorc);
+}
+
+Nope: con Endpoint(nil, nil, nil);
+
+farm(addr: string,
+ ep0: Endpoint,
+ opts: string,
+ job: string,
+ noauth: int,
+ errorc: chan of string): (Endpoint, string)
+{
+ args := addr::"/n/remote"::nil;
+ if(noauth)
+ args = "-A"::args;
+ if((e := sh->run(nil, "mount"::args)) != nil)
+ return (Nope, sys->sprint("cannot mount scheduler at %q: %s, args %s", addr, e, str->quoted(args)));
+
+ fd := sys->open("/n/remote/admin/clone", Sys->ORDWR);
+ if(fd == nil)
+ return (Nope, sys->sprint("cannot open clone: %r"));
+ if((d := gets(fd)) == nil)
+ return (Nope, "read clone failed");
+ dir := "/n/remote/admin/"+d;
+ if(sys->fprint(fd, "load workflow%s %q %s", opts, ep0.text(), job) == -1)
+ return (Nope, sys->sprint("job load failed: %r"));
+ if(sys->fprint(fd, "start") == -1)
+ return (Nope, sys->sprint("job start failed: %r"));
+ dfd := sys->open(dir+"/data", Sys->OREAD);
+ if(dfd == nil){
+ sys->fprint(fd, "delete");
+ return (Nope, sys->sprint("cannot open job data file: %r"));
+ }
+ s := gets(dfd);
+ ep1 := Endpoint.mk(s);
+ if(ep1.addr == nil)
+ return (Nope, sys->sprint("bad remote endpoint %q", s));
+ report(errorc, sys->sprint("job %s started, id %s", d, gets(sys->open(dir+"/id", Sys->OREAD))));
+ # XXX how is the job going to be deleted eventually
+ ep1.about = sys->sprint("%s | farm%s %s%s", ep0.about, opts, addr, job);
+ return (ep1, nil);
+}
+
+checkload[T](m: T, path: string): T
+{
+ if(m != nil)
+ return m;
+ raise sys->sprint("fail:cannot load %s: %r", path);
+}
+
+gets(fd: ref Sys->FD): string
+{
+ d := array[8192] of byte;
+ n := sys->read(fd, d, len d);
+ if(n <= 0)
+ return nil;
+ return string d[0:n];
+}
diff --git a/appl/alphabet/grid/line2rec.b b/appl/alphabet/grid/line2rec.b
new file mode 100644
index 00000000..2429a67d
--- /dev/null
+++ b/appl/alphabet/grid/line2rec.b
@@ -0,0 +1,91 @@
+implement Line2rec, Gridmodule;
+include "sys.m";
+ sys: Sys;
+include "draw.m";
+include "sh.m";
+include "bufio.m";
+ bufio: Bufio;
+ Iobuf: import bufio;
+include "alphabet/reports.m";
+ reports: Reports;
+ Report, report: import reports;
+include "alphabet/endpoints.m";
+include "alphabet/grid.m";
+ grid: Grid;
+ Value: import grid;
+
+Line2rec: module {};
+
+types(): string
+{
+ return "bf";
+}
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ grid = load Grid Grid->PATH;
+ reports = load Reports Reports->PATH;
+ bufio = load Bufio Bufio->PATH;
+}
+
+quit()
+{
+}
+
+run(nil: chan of string, r: ref Report,
+ nil: list of (int, list of ref Value), args: list of ref Value): ref Value
+{
+ f := chan of ref Sys->FD;
+ spawn line2recproc((hd args).f().i, f, r.start("line2rec"));
+ return ref Value.Vb(f);
+}
+
+line2recproc(
+ f0,
+ f1: chan of ref Sys->FD,
+ errorc: chan of string)
+{
+ (fd0, fd1) := startfilter(f0, f1, errorc);
+ iob0 := bufio->fopen(fd0, Sys->OREAD);
+ iob1 := bufio->fopen(fd1, Sys->OWRITE);
+ {
+ while((s := iob0.gets('\n')) != nil){
+ d := array of byte s;
+ if(iob1.puts("data "+string len d) < 0)
+ break;
+ if(iob1.write(d, len d) != len d)
+ break;
+ }
+ iob1.flush();
+ sys->fprint(fd1, "");
+ }exception{
+ "write on closed pipe" =>
+ ;
+ }
+ reports->quit(errorc);
+}
+
+# read side (when it's an argument):
+# read proposed new fd
+# write actual fd for them to write to (creating pipe in necessary)
+#
+# write side (when you're returning it):
+# write a proposed new fd (or nil if no suggestion)
+# read actual fd for writing
+startfilter(f0, f1: chan of ref Sys->FD, errorc: chan of string): (ref Sys->FD, ref Sys->FD)
+{
+ f1 <-= nil;
+ if((fd1 := <-f1) == nil){
+ <-f0;
+ f0 <-= nil;
+ reports->quit(errorc);
+ }
+ if((fd0 := <-f0) == nil){
+ sys->pipe(p := array[2] of ref Sys->FD);
+ f0 <-= p[1];
+ fd0 = p[0];
+ }else
+ f0 <-= nil;
+ return (fd0, fd1);
+}
diff --git a/appl/alphabet/grid/local.b b/appl/alphabet/grid/local.b
new file mode 100644
index 00000000..2fca7b95
--- /dev/null
+++ b/appl/alphabet/grid/local.b
@@ -0,0 +1,86 @@
+implement Local,Gridmodule;
+include "sys.m";
+ sys: Sys;
+include "draw.m";
+include "sh.m";
+include "alphabet/reports.m";
+ reports: Reports;
+ report, quit, Report: import reports;
+include "alphabet/endpoints.m";
+ endpoints: Endpoints;
+ Endpoint: import endpoints;
+include "alphabet/grid.m";
+ grid: Grid;
+ Value: import grid;
+
+Local: module {};
+types(): string
+{
+ return "fe-v";
+}
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ reports = checkload(load Reports Reports->PATH, Reports->PATH);
+ endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH);
+ endpoints->init();
+ grid = checkload(load Grid Grid->PATH, Grid->PATH);
+ grid->init();
+}
+
+run(nil: chan of string, r: ref Reports->Report,
+ opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value
+{
+
+ spawn localproc((hd args).e().i, f := chan of ref Sys->FD, opts!=nil, r.start("local"));
+ return ref Value.Vf(f);
+}
+
+localproc(ec: chan of Endpoint, f: chan of ref Sys->FD, verbose: int, errorc: chan of string)
+{
+ ep := <-ec;
+ if(ep.addr == nil){
+ # error should already have been printed (XXX is that the right way to do it?)
+ f <-= nil;
+ <-f;
+ quit(errorc);
+ }
+ if(verbose)
+ report(errorc, sys->sprint("endpoint %q at %q: %s", ep.id, ep.addr, ep.about));
+ (fd0, err) := endpoints->open(nil, ep);
+ if(fd0 == nil){
+ report(errorc, sys->sprint("error: local: cannot open endpoint (%q %q): %s", ep.addr, ep.id, err));
+ f <-= nil;
+ <-f;
+ quit(errorc);
+ }
+ f <-= fd0;
+ fd1 := <-f;
+ if(fd1 == nil)
+ quit(errorc);
+
+ buf := array[Sys->ATOMICIO] of byte;
+ {
+ while((n := sys->read(fd0, buf, len buf)) > 0){
+#sys->print("local read %d bytes\n", n);
+ sys->write(fd1, buf, n);
+ }
+#sys->print("local eof %d\n", n);
+ sys->write(fd1, array[0] of byte, 0);
+ if(n < 0)
+ report(errorc, sys->sprint("read error: %r"));
+ } exception e {
+ "write on closed pipe" =>
+ report(errorc, "write on closed pipe");
+ ;
+ }
+ quit(errorc);
+}
+
+checkload[T](m: T, path: string): T
+{
+ if(m != nil)
+ return m;
+ raise sys->sprint("fail:cannot load %s: %r", path);
+}
diff --git a/appl/alphabet/grid/mkfile b/appl/alphabet/grid/mkfile
new file mode 100644
index 00000000..c8e78ffb
--- /dev/null
+++ b/appl/alphabet/grid/mkfile
@@ -0,0 +1,22 @@
+<../../../mkconfig
+
+TARG=\
+ farm.dis\
+ line2rec.dis\
+ local.dis\
+ remote.dis\
+ rexec.dis\
+
+SYSMODULES=\
+ draw.m\
+ alphabet/endpoints.m\
+ alphabet/grid.m\
+ alphabet/reports.m\
+ sh.m\
+ string.m\
+ sys.m\
+
+DISBIN=$ROOT/dis/alphabet/grid
+
+<$ROOT/mkfiles/mkdis
+LIMBOFLAGS=-F $LIMBOFLAGS
diff --git a/appl/alphabet/grid/remote.b b/appl/alphabet/grid/remote.b
new file mode 100644
index 00000000..dbdc86ef
--- /dev/null
+++ b/appl/alphabet/grid/remote.b
@@ -0,0 +1,88 @@
+implement Remote, Gridmodule;
+include "sys.m";
+ sys: Sys;
+include "draw.m";
+include "sh.m";
+include "alphabet/reports.m";
+ reports: Reports;
+ report, quit, Report: import reports;
+include "alphabet/endpoints.m";
+ endpoints: Endpoints;
+ Endpoint: import endpoints;
+include "alphabet/grid.m";
+ grid: Grid;
+ Value: import grid;
+
+Remote: module {};
+
+types(): string
+{
+ return "ef-as";
+}
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ reports = checkload(load Reports Reports->PATH, Reports->PATH);
+ endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH);
+ endpoints->init();
+ grid = checkload(load Grid Grid->PATH, Grid->PATH);
+ grid->init();
+}
+
+run(nil: chan of string, r: ref Reports->Report,
+ opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value
+{
+ addr := "local";
+ if(opts != nil)
+ addr = (hd (hd opts).t1).s().i;
+ f := (hd args).f().i;
+ spawn remoteproc(ec := chan of Endpoint, f, addr, r.start("remote"));
+ return ref Value.Ve(ec);
+}
+
+Noendpoint: con Endpoint(nil, nil, nil);
+
+remoteproc(ec: chan of Endpoint, f: chan of ref Sys->FD, addr: string, errorc: chan of string)
+{
+ (fd1, ep) := endpoints->create(addr);
+ if(fd1 == nil){
+ report(errorc, "error: remote: cannot create endpoint at "+addr+": "+ep.about);
+ ec <-= Noendpoint;
+ <-f;
+ f <-= nil;
+ quit(errorc);
+ }
+ fd0 := <-f;
+ if(fd0 != nil)
+ ep.about = sys->sprint("local(%#q)", sys->fd2path(fd0));
+ else
+ ep.about = "local(pipe)";
+ ec <-= ep;
+ f <-= fd1;
+ quit(errorc);
+}
+
+# sys->pipe(p := array[2] of ref Sys->FD);
+# f <-= p[1];
+# p[1] = nil;
+# buf := array[Sys->ATOMICIO] of byte;
+# while((n := sys->read(p[0], buf, len buf)) > 0){
+# if(sys->write(fd, buf, n) == -1){
+# report(errorc, sys->sprint("write error: %r"));
+# break;
+# }
+# }exception{
+# "write on closed pipe" =>
+# report(errorc, "got write on closed pipe");
+# }
+# sys->write(fd, array[0] of byte, 0);
+# quit(errorc);
+#}
+
+checkload[T](m: T, path: string): T
+{
+ if(m != nil)
+ return m;
+ raise sys->sprint("fail:cannot load %s: %r", path);
+}
diff --git a/appl/alphabet/grid/rexec.b b/appl/alphabet/grid/rexec.b
new file mode 100644
index 00000000..02869659
--- /dev/null
+++ b/appl/alphabet/grid/rexec.b
@@ -0,0 +1,112 @@
+implement Rexec, Gridmodule;
+include "sys.m";
+ sys: Sys;
+include "draw.m";
+include "sh.m";
+ sh: Sh;
+include "string.m";
+ str: String;
+include "alphabet/reports.m";
+ reports: Reports;
+ report, Report, quit: import reports;
+include "alphabet/endpoints.m";
+ endpoints: Endpoints;
+ Endpoint: import endpoints;
+include "alphabet/grid.m";
+ grid: Grid;
+ Value: import grid;
+
+Rexec: module {};
+
+types(): string
+{
+ return "eesc-A";
+}
+
+init()
+{
+ sys = load Sys Sys->PATH;
+ reports = checkload(load Reports Reports->PATH, Reports->PATH);
+ endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH);
+ endpoints->init();
+ grid = checkload(load Grid Grid->PATH, Grid->PATH);
+ grid->init();
+ sh = checkload(load Sh Sh->PATH, Sh->PATH);
+ sh->initialise();
+ str = checkload(load String String->PATH, String->PATH);
+}
+
+run(nil: chan of string, r: ref Reports->Report,
+ opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value
+{
+ ec0 := (hd args).e().i;
+ addr := (hd tl args).s().i;
+ cmd := (hd tl tl args).c().i;
+
+ spawn rexecproc(sync := chan of int, addr, ec0, cmd, r.start("rexec"), opts != nil, ec1 := chan of Endpoint);
+ <-sync;
+ return ref Value.Ve(ec1);
+}
+
+rexecproc(sync: chan of int,
+ addr: string,
+ ec0: chan of Endpoint,
+ cmd: ref Sh->Cmd,
+ errorc: chan of string,
+ noauth: int,
+ ec1: chan of Endpoint
+ )
+{
+ sys->pctl(Sys->FORKNS, nil);
+ sync <-= 1;
+
+ ep0 := <-ec0;
+ if(ep0.addr == nil){
+ ec1 <-= ep0;
+ quit(errorc);
+ }
+
+ (ep1, err) := exec(addr, ep0, cmd, noauth);
+ if(err != nil){
+ endpoints->open(nil, ep0); # discard
+ report(errorc, err);
+ }
+ ec1 <-= ep1;
+ quit(errorc);
+}
+
+Nope: con Endpoint(nil, nil, nil);
+
+exec(addr: string, ep0: Endpoint, cmd: ref Sh->Cmd, noauth: int): (Endpoint, string)
+{
+ args := addr::"/n/remote"::nil;
+ if(noauth)
+ args = "-A"::args;
+ if((e := sh->run(nil, "mount"::args)) != nil)
+ return (Nope, sys->sprint("cannot mount rexec at %q: %s", addr, e));
+
+ fd := sys->open("/n/remote/exec", Sys->ORDWR);
+ if(fd == nil)
+ return (Nope, sys->sprint("cannot open exec at %q: %r", addr));
+ if(sys->fprint(fd, "%q %q", ep0.text(), sh->cmd2string(cmd)) == -1)
+ return (Nope, sys->sprint("exec write failed: %r"));
+ buf := array[1024] of byte;
+ n := sys->read(fd, buf, len buf);
+ if(n < 0)
+ return (Nope, sys->sprint("error reading endpoint: %r"));
+ if(n == 0)
+ return (Nope, "eof reading endpoint");
+ s := string buf[0:n];
+ ep1 := Endpoint.mk(s);
+ if(ep1.addr == nil)
+ return (Nope, sys->sprint("bad endpoint %#q: %s", s, ep1.about));
+ ep1.about = sys->sprint("%s | rexec %q %s", ep0.about, addr, sh->cmd2string(cmd));
+ return (ep1, nil);
+}
+
+checkload[T](m: T, path: string): T
+{
+ if(m != nil)
+ return m;
+ raise sys->sprint("fail:cannot load %s: %r", path);
+}