1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
implement Styxflush;
include "sys.m";
sys: Sys;
include "tables.m";
tables: Tables;
Table: import tables;
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import styx;
include "styxflush.m";
reqs: ref Table[ref Req];
Req: adt {
m: ref Tmsg;
flushc: chan of (int, chan of int);
oldreq: cyclic ref Req;
flushes: cyclic ref Req; # flushes queued on this req.
nextflush: cyclic ref Req; # (flush only) next req in flush queue.
flushready: chan of int; # (flush only) wait for flush attempt.
flushing: int; # request is subject of a flush.
finished: chan of int; # [1]; signals finish to late flushers.
responded: int;
};
init()
{
sys = load Sys Sys->PATH;
tables = load Tables Tables->PATH;
styx = load Styx Styx->PATH;
styx->init();
reqs = Table[ref Req].new(11, nil);
}
tmsg(gm: ref Styx->Tmsg, flushc: chan of (int, chan of int), reply: chan of ref Styx->Rmsg): (int, ref Rmsg)
{
req := ref Req(
gm,
flushc, # flushc
nil, # oldreq
nil, # flushes
nil, # nextflush
nil, # flushready
0, # flushing
chan[1] of int, # finished
0 # responded
);
if(reqs.add(gm.tag, req) == 0)
return (1, ref Rmsg.Error(gm.tag, "duplicate tag"));
pick m := gm {
Flush =>
req.oldreq = reqs.find(m.oldtag);
if(req.oldreq == nil)
return (1, ref Rmsg.Flush(m.tag));
addflush(req);
req.flushc = chan of (int, chan of int);
spawn flushreq(req, reply);
return (1, nil);
}
return (0, nil);
}
rmsg(rm: ref Styx->Rmsg): int
{
req := reqs.find(rm.tag);
if(req == nil){
complain("req has disappeared, reply "+rm.text());
return 0;
}
reqs.del(rm.tag);
if(tagof rm == tagof Rmsg.Flush)
delflush(req);
if(req.flushing)
req.finished <-= 1;
req.responded = 1;
pick m := rm {
Error =>
if(m.ename == Einterrupted){
if(!req.flushing)
complain("interrupted reply but no flush "+req.m.text());
return 0;
}
}
return 1;
}
addflush(req: ref Req)
{
o := req.oldreq;
for(r := o.flushes; r != nil; r = r.nextflush)
if(r.nextflush == nil)
break;
if(r == nil){
o.flushes = req;
req.flushready = nil;
}else{
r.nextflush = req;
req.flushready = chan of int;
}
o.flushing = 1;
}
# remove req (a flush request) from the list of flushes pending
# for req.oldreq. if it was at the head of the list, then give
# the next req a go.
delflush(req: ref Req)
{
oldreq := req.oldreq;
prev: ref Req;
for(r := oldreq.flushes; r != nil; r = r.nextflush){
if(r == req)
break;
prev = r;
}
if(prev == nil){
oldreq.flushes = r.nextflush;
if(oldreq.flushes != nil)
oldreq.flushes.flushready <-= 1;
}else
prev.nextflush = r.nextflush;
r.nextflush = nil;
}
flushreq(req: ref Req, reply: chan of ref Styx->Rmsg)
{
o := req.oldreq;
# if we're queued up, wait our turn.
if(req.flushready != nil)
<-req.flushready;
rc := chan of int;
alt{
o.flushc <-= (req.m.tag, rc) =>
<-rc;
reply <-= ref Rmsg.Flush(req.m.tag);
# old request must have responded before sending on rc,
# but be defensive because it's easy to forget.
if(!o.responded){
complain("flushed request not responded to: "+o.m.text());
o.responded = 1; # race but better than nothing.
}
(nil, nrc) := <-req.flushc =>
reply <-= ref Rmsg.Error(req.m.tag, Einterrupted);
nrc <-= 1;
<-o.finished =>
o.finished <-= 1;
reply <-= ref Rmsg.Flush(req.m.tag);
}
}
complain(e: string)
{
sys->fprint(sys->fildes(2), "styxflush: warning: %s\n", e);
}
|