1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
implement Farm, Gridmodule;
include "sys.m";
sys: Sys;
include "draw.m";
include "sh.m";
sh: Sh;
include "string.m";
str: String;
include "alphabet/reports.m";
reports: Reports;
report, Report, quit: import reports;
include "alphabet/endpoints.m";
endpoints: Endpoints;
Endpoint: import endpoints;
include "alphabet/grid.m";
grid: Grid;
Value: import grid;
Farm: module {};
types(): string
{
return "eesss*-A-k-a-v-bs";
}
init()
{
sys = load Sys Sys->PATH;
reports = checkload(load Reports Reports->PATH, Reports->PATH);
endpoints = checkload(load Endpoints Endpoints->PATH, Endpoints->PATH);
endpoints->init();
grid = checkload(load Grid Grid->PATH, Grid->PATH);
grid->init();
sh = checkload(load Sh Sh->PATH, Sh->PATH);
sh->initialise();
str = checkload(load String String->PATH, String->PATH);
}
run(nil: chan of string, r: ref Reports->Report,
opt: list of (int, list of ref Grid->Value), args: list of ref Grid->Value): ref Grid->Value
{
ec0 := (hd args).e().i;
addr := (hd tl args).s().i;
job, opts: string;
noauth := 0;
for(; opt != nil; opt = tl opt){
c := (hd opt).t0;
case (hd opt).t0 {
'A' =>
noauth = 1;
'b' =>
opts += " -b "+(hd (hd opt).t1).s().i;
* =>
opts += sys->sprint(" -%c", (hd opt).t0);
}
}
for(args = tl tl args; args != nil; args = tl args)
job += sys->sprint(" %q", (hd args).s().i);
spawn farmproc(sync := chan of int, addr, ec0, opts, job, noauth, r.start("farm"), ec := chan of Endpoint);
<-sync;
return ref Value.Ve(ec);
}
farmproc(sync: chan of int,
addr: string,
ec0: chan of Endpoint,
opts: string,
job: string,
noauth: int,
errorc: chan of string,
ec1: chan of Endpoint)
{
sys->pctl(Sys->FORKNS, nil);
sync <-= 1;
ep0 := <-ec0;
if(ep0.addr == nil){
ec1 <-= ep0;
quit(errorc);
}
(v, e) := farm(addr, ep0, opts, job, noauth, errorc);
if(e != nil){
endpoints->open(nil, ep0);
report(errorc, "error: "+e);
}
ec1 <-= v;
quit(errorc);
}
Nope: con Endpoint(nil, nil, nil);
farm(addr: string,
ep0: Endpoint,
opts: string,
job: string,
noauth: int,
errorc: chan of string): (Endpoint, string)
{
args := addr::"/n/remote"::nil;
if(noauth)
args = "-A"::args;
if((e := sh->run(nil, "mount"::args)) != nil)
return (Nope, sys->sprint("cannot mount scheduler at %q: %s, args %s", addr, e, str->quoted(args)));
fd := sys->open("/n/remote/admin/clone", Sys->ORDWR);
if(fd == nil)
return (Nope, sys->sprint("cannot open clone: %r"));
if((d := gets(fd)) == nil)
return (Nope, "read clone failed");
dir := "/n/remote/admin/"+d;
if(sys->fprint(fd, "load workflow%s %q %s", opts, ep0.text(), job) == -1)
return (Nope, sys->sprint("job load failed: %r"));
if(sys->fprint(fd, "start") == -1)
return (Nope, sys->sprint("job start failed: %r"));
dfd := sys->open(dir+"/data", Sys->OREAD);
if(dfd == nil){
sys->fprint(fd, "delete");
return (Nope, sys->sprint("cannot open job data file: %r"));
}
s := gets(dfd);
ep1 := Endpoint.mk(s);
if(ep1.addr == nil)
return (Nope, sys->sprint("bad remote endpoint %q", s));
report(errorc, sys->sprint("job %s started, id %s", d, gets(sys->open(dir+"/id", Sys->OREAD))));
# XXX how is the job going to be deleted eventually
ep1.about = sys->sprint("%s | farm%s %s%s", ep0.about, opts, addr, job);
return (ep1, nil);
}
checkload[T](m: T, path: string): T
{
if(m != nil)
return m;
raise sys->sprint("fail:cannot load %s: %r", path);
}
gets(fd: ref Sys->FD): string
{
d := array[8192] of byte;
n := sys->read(fd, d, len d);
if(n <= 0)
return nil;
return string d[0:n];
}
|