diff options
Diffstat (limited to 'appl/alphabet/grid')
| -rw-r--r-- | appl/alphabet/grid/farm.b | 144 | ||||
| -rw-r--r-- | appl/alphabet/grid/line2rec.b | 91 | ||||
| -rw-r--r-- | appl/alphabet/grid/local.b | 86 | ||||
| -rw-r--r-- | appl/alphabet/grid/mkfile | 22 | ||||
| -rw-r--r-- | appl/alphabet/grid/remote.b | 88 | ||||
| -rw-r--r-- | appl/alphabet/grid/rexec.b | 112 |
6 files changed, 543 insertions, 0 deletions
diff --git a/appl/alphabet/grid/farm.b b/appl/alphabet/grid/farm.b new file mode 100644 index 00000000..4aaa8758 --- /dev/null +++ b/appl/alphabet/grid/farm.b @@ -0,0 +1,144 @@ +implement Farm, Gridmodule; +include "sys.m"; + sys: Sys; +include "draw.m"; +include "sh.m"; + sh: Sh; +include "string.m"; + str: String; +include "alphabet/reports.m"; + reports: Reports; + report, Report, quit: import reports; +include "alphabet/endpoints.m"; + endpoints: Endpoints; + Endpoint: import endpoints; +include "alphabet/grid.m"; + grid: Grid; + Value: import grid; + +Farm: module {}; + +types(): string +{ + return "eesss*-A-k-a-v-bs"; +} + +init() +{ + sys = load Sys Sys->PATH; + reports = checkload(load Reports Reports->PATH, Reports->PATH); + endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH); + endpoints->init(); + grid = checkload(load Grid Grid->PATH, Grid->PATH); + grid->init(); + sh = checkload(load Sh Sh->PATH, Sh->PATH); + sh->initialise(); + str = checkload(load String String->PATH, String->PATH); +} + +run(nil: chan of string, r: ref Reports->Report, + opt: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value +{ + ec0 := (hd args).e().i; + addr := (hd tl args).s().i; + job, opts: string; + noauth := 0; + for(; opt != nil; opt = tl opt){ + c := (hd opt).t0; + case (hd opt).t0 { + 'A' => + noauth = 1; + 'b' => + opts += " -b "+(hd (hd opt).t1).s().i; + * => + opts += sys->sprint(" -%c", (hd opt).t0); + } + } + for(args = tl tl args; args != nil; args = tl args) + job += sys->sprint(" %q", (hd args).s().i); + + spawn farmproc(sync := chan of int, addr, ec0, opts, job, noauth, r.start("farm"), ec := chan of Endpoint); + <-sync; + return ref Value.Ve(ec); +} + +farmproc(sync: chan of int, + addr: string, + ec0: chan of Endpoint, + opts: string, + job: string, + noauth: int, + errorc: chan of string, + ec1: chan of Endpoint) +{ + sys->pctl(Sys->FORKNS, nil); + sync <-= 1; + ep0 := <-ec0; + if(ep0.addr == nil){ + ec1 <-= ep0; + quit(errorc); + } + (v, e) := farm(addr, ep0, opts, job, noauth, errorc); + if(e != nil){ + endpoints->open(nil, ep0); + report(errorc, "error: "+e); + } + ec1 <-= v; + quit(errorc); +} + +Nope: con Endpoint(nil, nil, nil); + +farm(addr: string, + ep0: Endpoint, + opts: string, + job: string, + noauth: int, + errorc: chan of string): (Endpoint, string) +{ + args := addr::"/n/remote"::nil; + if(noauth) + args = "-A"::args; + if((e := sh->run(nil, "mount"::args)) != nil) + return (Nope, sys->sprint("cannot mount scheduler at %q: %s, args %s", addr, e, str->quoted(args))); + + fd := sys->open("/n/remote/admin/clone", Sys->ORDWR); + if(fd == nil) + return (Nope, sys->sprint("cannot open clone: %r")); + if((d := gets(fd)) == nil) + return (Nope, "read clone failed"); + dir := "/n/remote/admin/"+d; + if(sys->fprint(fd, "load workflow%s %q %s", opts, ep0.text(), job) == -1) + return (Nope, sys->sprint("job load failed: %r")); + if(sys->fprint(fd, "start") == -1) + return (Nope, sys->sprint("job start failed: %r")); + dfd := sys->open(dir+"/data", Sys->OREAD); + if(dfd == nil){ + sys->fprint(fd, "delete"); + return (Nope, sys->sprint("cannot open job data file: %r")); + } + s := gets(dfd); + ep1 := Endpoint.mk(s); + if(ep1.addr == nil) + return (Nope, sys->sprint("bad remote endpoint %q", s)); + report(errorc, sys->sprint("job %s started, id %s", d, gets(sys->open(dir+"/id", Sys->OREAD)))); + # XXX how is the job going to be deleted eventually + ep1.about = sys->sprint("%s | farm%s %s%s", ep0.about, opts, addr, job); + return (ep1, nil); +} + +checkload[T](m: T, path: string): T +{ + if(m != nil) + return m; + raise sys->sprint("fail:cannot load %s: %r", path); +} + +gets(fd: ref Sys->FD): string +{ + d := array[8192] of byte; + n := sys->read(fd, d, len d); + if(n <= 0) + return nil; + return string d[0:n]; +} diff --git a/appl/alphabet/grid/line2rec.b b/appl/alphabet/grid/line2rec.b new file mode 100644 index 00000000..2429a67d --- /dev/null +++ b/appl/alphabet/grid/line2rec.b @@ -0,0 +1,91 @@ +implement Line2rec, Gridmodule; +include "sys.m"; + sys: Sys; +include "draw.m"; +include "sh.m"; +include "bufio.m"; + bufio: Bufio; + Iobuf: import bufio; +include "alphabet/reports.m"; + reports: Reports; + Report, report: import reports; +include "alphabet/endpoints.m"; +include "alphabet/grid.m"; + grid: Grid; + Value: import grid; + +Line2rec: module {}; + +types(): string +{ + return "bf"; +} + +init() +{ + sys = load Sys Sys->PATH; + grid = load Grid Grid->PATH; + reports = load Reports Reports->PATH; + bufio = load Bufio Bufio->PATH; +} + +quit() +{ +} + +run(nil: chan of string, r: ref Report, + nil: list of (int, list of ref Value), args: list of ref Value): ref Value +{ + f := chan of ref Sys->FD; + spawn line2recproc((hd args).f().i, f, r.start("line2rec")); + return ref Value.Vb(f); +} + +line2recproc( + f0, + f1: chan of ref Sys->FD, + errorc: chan of string) +{ + (fd0, fd1) := startfilter(f0, f1, errorc); + iob0 := bufio->fopen(fd0, Sys->OREAD); + iob1 := bufio->fopen(fd1, Sys->OWRITE); + { + while((s := iob0.gets('\n')) != nil){ + d := array of byte s; + if(iob1.puts("data "+string len d) < 0) + break; + if(iob1.write(d, len d) != len d) + break; + } + iob1.flush(); + sys->fprint(fd1, ""); + }exception{ + "write on closed pipe" => + ; + } + reports->quit(errorc); +} + +# read side (when it's an argument): +# read proposed new fd +# write actual fd for them to write to (creating pipe in necessary) +# +# write side (when you're returning it): +# write a proposed new fd (or nil if no suggestion) +# read actual fd for writing +startfilter(f0, f1: chan of ref Sys->FD, errorc: chan of string): (ref Sys->FD, ref Sys->FD) +{ + f1 <-= nil; + if((fd1 := <-f1) == nil){ + <-f0; + f0 <-= nil; + reports->quit(errorc); + } + if((fd0 := <-f0) == nil){ + sys->pipe(p := array[2] of ref Sys->FD); + f0 <-= p[1]; + fd0 = p[0]; + }else + f0 <-= nil; + return (fd0, fd1); +} diff --git a/appl/alphabet/grid/local.b b/appl/alphabet/grid/local.b new file mode 100644 index 00000000..2fca7b95 --- /dev/null +++ b/appl/alphabet/grid/local.b @@ -0,0 +1,86 @@ +implement Local,Gridmodule; +include "sys.m"; + sys: Sys; +include "draw.m"; +include "sh.m"; +include "alphabet/reports.m"; + reports: Reports; + report, quit, Report: import reports; +include "alphabet/endpoints.m"; + endpoints: Endpoints; + Endpoint: import endpoints; +include "alphabet/grid.m"; + grid: Grid; + Value: import grid; + +Local: module {}; +types(): string +{ + return "fe-v"; +} + +init() +{ + sys = load Sys Sys->PATH; + reports = checkload(load Reports Reports->PATH, Reports->PATH); + endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH); + endpoints->init(); + grid = checkload(load Grid Grid->PATH, Grid->PATH); + grid->init(); +} + +run(nil: chan of string, r: ref Reports->Report, + opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value +{ + + spawn localproc((hd args).e().i, f := chan of ref Sys->FD, opts!=nil, r.start("local")); + return ref Value.Vf(f); +} + +localproc(ec: chan of Endpoint, f: chan of ref Sys->FD, verbose: int, errorc: chan of string) +{ + ep := <-ec; + if(ep.addr == nil){ + # error should already have been printed (XXX is that the right way to do it?) + f <-= nil; + <-f; + quit(errorc); + } + if(verbose) + report(errorc, sys->sprint("endpoint %q at %q: %s", ep.id, ep.addr, ep.about)); + (fd0, err) := endpoints->open(nil, ep); + if(fd0 == nil){ + report(errorc, sys->sprint("error: local: cannot open endpoint (%q %q): %s", ep.addr, ep.id, err)); + f <-= nil; + <-f; + quit(errorc); + } + f <-= fd0; + fd1 := <-f; + if(fd1 == nil) + quit(errorc); + + buf := array[Sys->ATOMICIO] of byte; + { + while((n := sys->read(fd0, buf, len buf)) > 0){ +#sys->print("local read %d bytes\n", n); + sys->write(fd1, buf, n); + } +#sys->print("local eof %d\n", n); + sys->write(fd1, array[0] of byte, 0); + if(n < 0) + report(errorc, sys->sprint("read error: %r")); + } exception e { + "write on closed pipe" => + report(errorc, "write on closed pipe"); + ; + } + quit(errorc); +} + +checkload[T](m: T, path: string): T +{ + if(m != nil) + return m; + raise sys->sprint("fail:cannot load %s: %r", path); +} diff --git a/appl/alphabet/grid/mkfile b/appl/alphabet/grid/mkfile new file mode 100644 index 00000000..c8e78ffb --- /dev/null +++ b/appl/alphabet/grid/mkfile @@ -0,0 +1,22 @@ +<../../../mkconfig + +TARG=\ + farm.dis\ + line2rec.dis\ + local.dis\ + remote.dis\ + rexec.dis\ + +SYSMODULES=\ + draw.m\ + alphabet/endpoints.m\ + alphabet/grid.m\ + alphabet/reports.m\ + sh.m\ + string.m\ + sys.m\ + +DISBIN=$ROOT/dis/alphabet/grid + +<$ROOT/mkfiles/mkdis +LIMBOFLAGS=-F $LIMBOFLAGS diff --git a/appl/alphabet/grid/remote.b b/appl/alphabet/grid/remote.b new file mode 100644 index 00000000..dbdc86ef --- /dev/null +++ b/appl/alphabet/grid/remote.b @@ -0,0 +1,88 @@ +implement Remote, Gridmodule; +include "sys.m"; + sys: Sys; +include "draw.m"; +include "sh.m"; +include "alphabet/reports.m"; + reports: Reports; + report, quit, Report: import reports; +include "alphabet/endpoints.m"; + endpoints: Endpoints; + Endpoint: import endpoints; +include "alphabet/grid.m"; + grid: Grid; + Value: import grid; + +Remote: module {}; + +types(): string +{ + return "ef-as"; +} + +init() +{ + sys = load Sys Sys->PATH; + reports = checkload(load Reports Reports->PATH, Reports->PATH); + endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH); + endpoints->init(); + grid = checkload(load Grid Grid->PATH, Grid->PATH); + grid->init(); +} + +run(nil: chan of string, r: ref Reports->Report, + opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value +{ + addr := "local"; + if(opts != nil) + addr = (hd (hd opts).t1).s().i; + f := (hd args).f().i; + spawn remoteproc(ec := chan of Endpoint, f, addr, r.start("remote")); + return ref Value.Ve(ec); +} + +Noendpoint: con Endpoint(nil, nil, nil); + +remoteproc(ec: chan of Endpoint, f: chan of ref Sys->FD, addr: string, errorc: chan of string) +{ + (fd1, ep) := endpoints->create(addr); + if(fd1 == nil){ + report(errorc, "error: remote: cannot create endpoint at "+addr+": "+ep.about); + ec <-= Noendpoint; + <-f; + f <-= nil; + quit(errorc); + } + fd0 := <-f; + if(fd0 != nil) + ep.about = sys->sprint("local(%#q)", sys->fd2path(fd0)); + else + ep.about = "local(pipe)"; + ec <-= ep; + f <-= fd1; + quit(errorc); +} + +# sys->pipe(p := array[2] of ref Sys->FD); +# f <-= p[1]; +# p[1] = nil; +# buf := array[Sys->ATOMICIO] of byte; +# while((n := sys->read(p[0], buf, len buf)) > 0){ +# if(sys->write(fd, buf, n) == -1){ +# report(errorc, sys->sprint("write error: %r")); +# break; +# } +# }exception{ +# "write on closed pipe" => +# report(errorc, "got write on closed pipe"); +# } +# sys->write(fd, array[0] of byte, 0); +# quit(errorc); +#} + +checkload[T](m: T, path: string): T +{ + if(m != nil) + return m; + raise sys->sprint("fail:cannot load %s: %r", path); +} diff --git a/appl/alphabet/grid/rexec.b b/appl/alphabet/grid/rexec.b new file mode 100644 index 00000000..02869659 --- /dev/null +++ b/appl/alphabet/grid/rexec.b @@ -0,0 +1,112 @@ +implement Rexec, Gridmodule; +include "sys.m"; + sys: Sys; +include "draw.m"; +include "sh.m"; + sh: Sh; +include "string.m"; + str: String; +include "alphabet/reports.m"; + reports: Reports; + report, Report, quit: import reports; +include "alphabet/endpoints.m"; + endpoints: Endpoints; + Endpoint: import endpoints; +include "alphabet/grid.m"; + grid: Grid; + Value: import grid; + +Rexec: module {}; + +types(): string +{ + return "eesc-A"; +} + +init() +{ + sys = load Sys Sys->PATH; + reports = checkload(load Reports Reports->PATH, Reports->PATH); + endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH); + endpoints->init(); + grid = checkload(load Grid Grid->PATH, Grid->PATH); + grid->init(); + sh = checkload(load Sh Sh->PATH, Sh->PATH); + sh->initialise(); + str = checkload(load String String->PATH, String->PATH); +} + +run(nil: chan of string, r: ref Reports->Report, + opts: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value +{ + ec0 := (hd args).e().i; + addr := (hd tl args).s().i; + cmd := (hd tl tl args).c().i; + + spawn rexecproc(sync := chan of int, addr, ec0, cmd, r.start("rexec"), opts != nil, ec1 := chan of Endpoint); + <-sync; + return ref Value.Ve(ec1); +} + +rexecproc(sync: chan of int, + addr: string, + ec0: chan of Endpoint, + cmd: ref Sh->Cmd, + errorc: chan of string, + noauth: int, + ec1: chan of Endpoint + ) +{ + sys->pctl(Sys->FORKNS, nil); + sync <-= 1; + + ep0 := <-ec0; + if(ep0.addr == nil){ + ec1 <-= ep0; + quit(errorc); + } + + (ep1, err) := exec(addr, ep0, cmd, noauth); + if(err != nil){ + endpoints->open(nil, ep0); # discard + report(errorc, err); + } + ec1 <-= ep1; + quit(errorc); +} + +Nope: con Endpoint(nil, nil, nil); + +exec(addr: string, ep0: Endpoint, cmd: ref Sh->Cmd, noauth: int): (Endpoint, string) +{ + args := addr::"/n/remote"::nil; + if(noauth) + args = "-A"::args; + if((e := sh->run(nil, "mount"::args)) != nil) + return (Nope, sys->sprint("cannot mount rexec at %q: %s", addr, e)); + + fd := sys->open("/n/remote/exec", Sys->ORDWR); + if(fd == nil) + return (Nope, sys->sprint("cannot open exec at %q: %r", addr)); + if(sys->fprint(fd, "%q %q", ep0.text(), sh->cmd2string(cmd)) == -1) + return (Nope, sys->sprint("exec write failed: %r")); + buf := array[1024] of byte; + n := sys->read(fd, buf, len buf); + if(n < 0) + return (Nope, sys->sprint("error reading endpoint: %r")); + if(n == 0) + return (Nope, "eof reading endpoint"); + s := string buf[0:n]; + ep1 := Endpoint.mk(s); + if(ep1.addr == nil) + return (Nope, sys->sprint("bad endpoint %#q: %s", s, ep1.about)); + ep1.about = sys->sprint("%s | rexec %q %s", ep0.about, addr, sh->cmd2string(cmd)); + return (ep1, nil); +} + +checkload[T](m: T, path: string): T +{ + if(m != nil) + return m; + raise sys->sprint("fail:cannot load %s: %r", path); +} |
