1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
.TH STYXFLUSH 2
.SH NAME
styxflush \- handler for styx flush protocol
.SH SYNOPSIS
.EX
include "sys.m";
include "styx.m";
include "styxflush.m";
styxflush := load Styxflush Styxflush->PATH;
init: fn();
tmsg: fn(m: ref Styx->Tmsg,
flushc: chan of (int, chan of int),
reply: chan of ref Styx->Rmsg): (int, ref Styx->Rmsg);
rmsg: fn(m: ref Styx->Rmsg): int;
Einterrupted: con "interrupted";
.EE
.SH DESCRIPTION
Getting the semantics of the Styx
.IR flush (5)
protocol correct when handling requests concurrently
is surprisingly hard to do.
.I Styxflush
is designed to help get it right. It deals with Styx
messages for a single styx session \- if a server needs to
deal with multiple sessions, then multiple instances of
.I styxflush
should be loaded. It assumes there is a loop in a central process
which both reads T-messages and sends their R-message replies.
.I Styxflush
handles the flushing of requests that are being run
outside the central process.
.PP
.B Init
must be called before anything else in
.I styxflush
to intialise its internal data structures.
.PP
When a T-message request arrives that will be dealt with concurrently,
.B tmsg(\fIm\fP,\ \fIflushc\fP,\ \fIreply\fP)
should be called to inform
.I styxflush
of the new request.
.I M
gives the T-message;
.I flushc
gives a channel that will be used if the request is flushed (see below),
and
.I reply
should hold an unbuffered channel that can be used to send a reply
to the central loop.
.I Flushc
will usually be a fresh channel for each request, but several
requests may share the same
.IR flushc
if, for instance, one process is managing several requests.
.B Tmsg
returns a tuple
(\fIhandled,\ rm\fP),
where
.I handled
is non-zero if
.I styxflush
has dealt with the request itself. If it has, then
the caller must not handle the request; it
must send
.I rm
as a reply if it is not nil.
.PP
.B Rmsg
should be called when a reply message arrives at the central process
(the same process that has called
.BR tmsg ).
It returns non-zero if the reply message should actually be
sent to the client - otherwise it should be discarded.
.SS "Flush Channel"
.I Styxflush
notifies a request that it has been flushed by sending a tuple,
say
.IR "" ( tag ,\ rc )
on its flush channel.
.I Tag
gives the tag of the message that has been flushed,
and
.I rc
is a channel that should be replied on when the
request has been dealt with. There is no requirement
that a request read on its flush channel - if it does not,
then the replies to any flushes of that request will be delayed
until the request is replied to.
If it does read a flush request, however, it must reply
to the original request before sending on
.IR rc .
If it has succeeded in aborting the request, it should
send an
.IR error (5)
R-message with the message
.B interrupted
(defined as
.BR Einterrupted );
otherwise it should send its
reply as usual.
.SH SOURCE
.B /appl/lib/styxflush.b
.SH EXAMPLE
This is a skeleton of a prototypical structure of a program
that uses
.IR styxflush .
.EX
replyc: chan of ref Rmsg;
centralloop(tm: chan of ref Tmsg, fd: ref Sys->FD)
{
replyc = chan of Rmsg;
for(;;)alt{
m := <-tm =>
if(m == nil || tagof m == tagof Tmsg.Readerror){
cleanup(); # kill outstanding processes, etc.
return;
}
flushc := chan of (int, chan of int);
(handled, rm) := styxflush->tmsg(m, flushc, replyc);
if(!handled)
spawn request(m, flushc);
else if(rm != nil)
sendreply(rm);
rm := <- replyc =>
if(styxflush->rmsg(rm))
sendreply(rm);
}
}
sendreply(fd: ref Sys->FD, rm: ref Rmsg)
{
d := rm.pack();
sys->write(fd, d, len d);
}
request(tm: ref Tmsg, flushc: chan of (int, chan of int))
{
pick m := tm {
Open =>
replyc <-= ref Rmsg.Open(m.tag, ...);
Read =>
[...]
alt{
x := <-readc =>
# read from data produced on readc
replyc <-= ref Rmsg.Read(m.tag, ...);
(nil, rc) := <-flushc =>
# read request has been flushed.
replyc <-= ref Rmsg.Error(m.tag, Einterrupted);
rc <-= 1;
}
etc ...
}
}
|