diff options
| author | Charles.Forsyth <devnull@localhost> | 2006-12-22 17:07:39 +0000 |
|---|---|---|
| committer | Charles.Forsyth <devnull@localhost> | 2006-12-22 17:07:39 +0000 |
| commit | 37da2899f40661e3e9631e497da8dc59b971cbd0 (patch) | |
| tree | cbc6d4680e347d906f5fa7fca73214418741df72 /appl/cmd/lockfs.b | |
| parent | 54bc8ff236ac10b3eaa928fd6bcfc0cdb2ba46ae (diff) | |
20060303a
Diffstat (limited to 'appl/cmd/lockfs.b')
| -rw-r--r-- | appl/cmd/lockfs.b | 773 |
1 files changed, 773 insertions, 0 deletions
diff --git a/appl/cmd/lockfs.b b/appl/cmd/lockfs.b new file mode 100644 index 00000000..1b958de2 --- /dev/null +++ b/appl/cmd/lockfs.b @@ -0,0 +1,773 @@ +implement Lockfs; +include "sys.m"; + sys: Sys; + stderr: ref Sys->FD; +include "draw.m"; +include "styx.m"; + styx: Styx; + Tmsg, Rmsg: import styx; +include "styxlib.m"; + styxlib: Styxlib; + Dirtab, Styxserver, Chan, + devdir, + Eperm, Ebadfid, Eexists, Enotdir, Enotfound, Einuse: import styxlib; +include "arg.m"; +include "keyring.m"; + keyring: Keyring; +include "security.m"; + auth: Auth; + +Lockfs: module { + init: fn(nil: ref Draw->Context, argv: list of string); + dirgen: fn(srv: ref Styxlib->Styxserver, c: ref Styxlib->Chan, + tab: array of Styxlib->Dirtab, i: int): (int, Sys->Dir); +}; + +Elocked: con "file is locked"; + +devgen: Dirgenmod; + +Openreq: adt { + srv: ref Styxserver; + tag: int; + omode: int; + c: ref Chan; + uproc: Uproc; +}; + +Lockqueue: adt { + h: list of ref Openreq; + t: list of ref Openreq; + put: fn(q: self ref Lockqueue, s: ref Openreq); + get: fn(q: self ref Lockqueue): ref Openreq; + peek: fn(q: self ref Lockqueue): ref Openreq; + flush: fn(q: self ref Lockqueue, srv: ref Styxserver, tag: int); +}; + +Lockfile: adt { + waitq: ref Lockqueue; + fd: ref Sys->FD; + readers: int; + writers: int; + d: Sys->Dir; +}; + +Ureq: adt { + fname: string; + pick { + Open => + omode: int; + Create => + omode: int; + perm: int; + Remove => + Wstat => + dir: Sys->Dir; + } +}; + +Uproc: type chan of (ref Ureq, chan of (ref Sys->FD, string)); + +maxqidpath := big 1; +locks: list of ref Lockfile; +lockdir: string; +authinfo: ref Keyring->Authinfo; +timefd: ref Sys->FD; + +MAXCONN: con 20; + +verbose := 0; + +usage() +{ + sys->fprint(stderr, "usage: lockfs [-A] [-a alg]... [-p addr] dir [mountpoint]\n"); + raise "fail:usage"; +} + +badmodule(p: string) +{ + sys->fprint(stderr, "lockfs: cannot load %s: %r\n", p); + raise "fail:bad module"; +} + +init(nil: ref Draw->Context, argv: list of string) +{ + sys = load Sys Sys->PATH; + stderr = sys->fildes(2); + styx = load Styx Styx->PATH; + if (styx == nil) + badmodule(Styx->PATH); + styx->init(); + styxlib = load Styxlib Styxlib->PATH; + if (styxlib == nil) + badmodule(Styxlib->PATH); + styxlib->init(styx); + devgen = load Dirgenmod "$self"; + if (devgen == nil) + badmodule("self as Dirgenmod"); + timefd = sys->open("/dev/time", sys->OREAD); + if (timefd == nil) { + sys->fprint(stderr, "lockfs: cannot open /dev/time: %r\n"); + raise "fail:no time"; + } + arg := load Arg Arg->PATH; + if (arg == nil) + badmodule(Arg->PATH); + arg->init(argv); + + addr := ""; + doauth := 1; + algs: list of string; + while ((opt := arg->opt()) != 0) { + case opt { + 'p' => + addr = arg->arg(); + 'a' => + alg := arg->arg(); + if (alg == nil) + usage(); + algs = alg :: algs; + 'A' => + doauth = 0; + 'v' => + verbose = 1; + * => + usage(); + } + } + argv = arg->argv(); + if (argv == nil || (addr != nil && tl argv != nil)) + usage(); + if (addr == nil) + doauth = 0; # no authentication necessary for local mount + if (doauth) { + auth = load Auth Auth->PATH; + if (auth == nil) + badmodule(Auth->PATH); + if ((e := auth->init()) != nil) { + sys->fprint(stderr, "lockfs: cannot init auth: %s\n", e); + raise "fail:errors"; + } + keyring = load Keyring Keyring->PATH; + if (keyring == nil) + badmodule(Keyring->PATH); + authinfo = keyring->readauthinfo("/usr/" + user() + "/keyring/default"); + } + + mountpoint := lockdir = hd argv; + if (tl argv != nil) + mountpoint = hd tl argv; + if (addr != nil) { + if (doauth && algs == nil) + algs = "none" :: nil; # XXX is this default a bad idea? + srvrq := chan of (ref Sys->FD, string, Uproc); + srvsync := chan of (int, string); + spawn listener(addr, srvrq, srvsync, algs); + (srvpid, err) := <-srvsync; + srvsync = nil; + if (srvpid == -1) { + sys->fprint(stderr, "lockfs: failed to start listener: %s\n", err); + raise "fail:errors"; + } + sync := chan of int; + spawn server(srvrq, sync); + <-sync; + } else { + rq := chan of (ref Sys->FD, string, Uproc); + fds := array[2] of ref Sys->FD; + sys->pipe(fds); + sync := chan of int; + spawn server(rq, sync); + <-sync; + rq <-= (fds[0], "lock", nil); + rq <-= (nil, nil, nil); + if (sys->mount(fds[1], nil, mountpoint, Sys->MREPL | Sys->MCREATE, nil) == -1) { + sys->fprint(stderr, "lockfs: cannot mount: %r\n"); + raise "fail:cannot mount"; + } + } +} + +server(srvrq: chan of (ref Sys->FD, string, Uproc), sync: chan of int) +{ + sys->pctl(Sys->FORKNS, nil); + sync <-= 1; + down := 0; + nclient := 0; + tchans := array[MAXCONN] of chan of ref Tmsg; + srv := array[MAXCONN] of ref Styxserver; + uprocs := array[MAXCONN] of Uproc; + lockinit(); +Service: + for (;;) alt { + (fd, reqstr, uprocch) := <-srvrq => + if (fd == nil) { + if (verbose && reqstr != nil) + sys->print("lockfs: localserver going down (reason: %s)\n", reqstr); + down = 1; + } else { + if (verbose) + sys->print("lockfs: got new connection (s == '%s')\n", reqstr); + for (i := 0; i < len tchans; i++) + if (tchans[i] == nil) { + (tchans[i], srv[i]) = Styxserver.new(fd); + if(verbose) + sys->print("svc started\n"); + uprocs[i] = uprocch; + break; + } + if (i == len tchans) { + sys->fprint(stderr, "lockfs: too many clients\n"); # XXX expand arrays + if (uprocch != nil) + uprocch <-= (nil, nil); + } else + nclient++; + } + (n, gm) := <-tchans => + if (handletmsg(srv[n], gm, uprocs[n]) == -1) { + tchans[n] = nil; + srv[n] = nil; + if (uprocs[n] != nil) { + uprocs[n] <-= (nil, nil); + uprocs[n] = nil; + } + if (nclient-- <= 1 && down) + break Service; + } + } + if (verbose) + sys->print("lockfs: finished\n"); +} + +dirgen(nil: ref Styxserver, nil: ref Styxlib->Chan, + nil: array of Dirtab, s: int): (int, Sys->Dir) +{ + d: Sys->Dir; + ll := locks; + for (i := 0; i < s && ll != nil; i++) + ll = tl ll; + if (ll == nil) + return (-1, d); + return (1, (hd ll).d); +} + +handletmsg(srv: ref Styxserver, gm: ref Tmsg, uproc: Uproc): int +{ +{ + if (gm == nil) + gm = ref Tmsg.Readerror(-1, "eof"); + if(verbose) + sys->print("<- %s\n", gm.text()); + pick m := gm { + Readerror => + # could be more efficient... + for (cl := srv.chanlist(); cl != nil; cl = tl cl) { + c := hd cl; + for (ll := locks; ll != nil; ll = tl ll) { + if ((hd ll).d.qid.path == c.qid.path) { + l := hd ll; + l.waitq.flush(srv, -1); + if (c.open) + unlocked(l); + break; + } + } + } + if (m.error != "eof") + sys->fprint(stderr, "lockfs: read error: %s\n", m.error); + return -1; + Version => + srv.devversion(m); + Auth => + srv.devauth(m); + Walk => + c := fid2chan(srv, m.fid); + qids: array of Sys->Qid; + cc := ref *c; + if (len m.names > 0) { + qids = array[1] of Sys->Qid; # it's just one level + if ((cc.qid.qtype & Sys->QTDIR) == 0) { + srv.reply(ref Rmsg.Error(m.tag, Enotdir)); + break; + } + for (ll := locks; ll != nil; ll = tl ll) + if ((hd ll).d.name == m.names[0]) + break; + if (ll == nil) { + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + break; + } + d := (hd ll).d; + cc.qid = d.qid; + cc.path = d.name; + qids[0] = c.qid; + } + if(m.newfid != m.fid){ + nc := srv.clone(cc, m.newfid); + if(nc == nil){ + srv.reply(ref Rmsg.Error(m.tag, Einuse)); + break; + } + }else{ + c.qid = cc.qid; + c.path = cc.path; + } + srv.reply(ref Rmsg.Walk(m.tag, qids)); + Open => + c := fid2chan(srv, m.fid); + if (c.qid.qtype & Sys->QTDIR) { + srv.reply(ref Rmsg.Open(m.tag, c.qid, Styx->MAXFDATA)); + break; + } + for (ll := locks; ll != nil; ll = tl ll) + if ((hd ll).d.qid.path == c.qid.path) + break; + if (ll == nil) { + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + break; + } + l := hd ll; + req := ref Openreq(srv, m.tag, m.mode, c, uproc); + if (l.fd == nil || (m.mode == Sys->OREAD && l.writers == 0)) { + openlockfile(l, req); + } else { + l.waitq.put(req); + } + req = nil; + Create => + c := fid2chan(srv, m.fid); + if ((c.qid.qtype & Sys->QTDIR) == 0) { + srv.reply(ref Rmsg.Error(m.tag, Enotdir)); + break; + } + if (m.perm & Sys->DMDIR) { + srv.reply(ref Rmsg.Error(m.tag, Eperm)); + break; + } + for (ll := locks; ll != nil; ll = tl ll) + if ((hd ll).d.name == m.name) + break; + if (ll != nil) { + srv.reply(ref Rmsg.Error(m.tag, Eexists)); + break; + } + (fd, err) := create(uproc, lockdir + "/" + m.name, m.mode, m.perm); + if (fd == nil) { + srv.reply(ref Rmsg.Error(m.tag, err)); + break; + } + (ok, d) := sys->fstat(fd); + if (ok == -1) { + srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); + break; + } + l := ref Lockfile(ref Lockqueue, fd, 0, 0, d); + l.d.qid = (maxqidpath++, 0, Sys->QTFILE); + l.d.mtime = l.d.atime = now(); + if (m.mode == Sys->OREAD) + l.readers = 1; + else + l.writers = 1; + locks = l :: locks; + c.qid.path = (hd locks).d.qid.path; + c.open = 1; + srv.reply(ref Rmsg.Create(m.tag, c.qid, Styx->MAXFDATA)); + Read => + c := fid2chan(srv, m.fid); + if (c.qid.qtype & Sys->QTDIR) + srv.devdirread(m, devgen, nil); + else { + l := qid2lock(c.qid); + if (l == nil) + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + else { + d := array[m.count] of byte; + sys->seek(l.fd, m.offset, Sys->SEEKSTART); + n := sys->read(l.fd, d, m.count); + if (n == -1) + srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); + else { + srv.reply(ref Rmsg.Read(m.tag, d[0:n])); + l.d.atime = now(); + } + } + } + Write => + c := fid2chan(srv, m.fid); + if (c.qid.qtype & Sys->QTDIR) { + srv.reply(ref Rmsg.Error(m.tag, Eperm)); + break; + } + l := qid2lock(c.qid); + if (l == nil) { + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + break; + } + sys->seek(l.fd, m.offset, Sys->SEEKSTART); + n := sys->write(l.fd, m.data, len m.data); + if (n == -1) + srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); + else { + srv.reply(ref Rmsg.Write(m.tag, n)); + nlength := m.offset + big n; + if (nlength > l.d.length) + l.d.length = nlength; + l.d.mtime = now(); + l.d.qid.vers++; + } + Clunk => + c := srv.devclunk(m); + if (c != nil && c.open && (l := qid2lock(c.qid)) != nil) + unlocked(l); + Flush => + for (ll := locks; ll != nil; ll = tl ll) + (hd ll).waitq.flush(srv, m.tag); + srv.reply(ref Rmsg.Flush(m.tag)); + Stat => + srv.devstat(m, devgen, nil); + Remove => + c := fid2chan(srv, m.fid); + srv.chanfree(c); + if (c.qid.qtype & Sys->QTDIR) { + srv.reply(ref Rmsg.Error(m.tag, Eperm)); + break; + } + l := qid2lock(c.qid); + if (l == nil) { + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + break; + } + if (l.fd != nil) { + srv.reply(ref Rmsg.Error(m.tag, Elocked)); + break; + } + if ((err := remove(uproc, lockdir + "/" + l.d.name)) == nil) { + srv.reply(ref Rmsg.Error(m.tag, err)); + break; + } + ll: list of ref Lockfile; + for (; locks != nil; locks = tl locks) + if (hd locks != l) + ll = hd locks :: ll; + locks = ll; + srv.reply(ref Rmsg.Remove(m.tag)); + Wstat => + c := fid2chan(srv, m.fid); + if (c.qid.qtype & Sys->QTDIR) { + srv.reply(ref Rmsg.Error(m.tag, Eperm)); + break; + } + l := qid2lock(c.qid); + if (l == nil) { + srv.reply(ref Rmsg.Error(m.tag, Enotfound)); + break; + } + if ((err := wstat(uproc, lockdir + "/" + l.d.name, m.stat)) != nil) { + srv.reply(ref Rmsg.Error(m.tag, err)); + break; + } + (ok, d) := sys->stat(lockdir + "/" + m.stat.name); + if (ok == -1) { + srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); + break; + } + d.qid = l.d.qid; + l.d = d; + srv.reply(ref Rmsg.Wstat(m.tag)); + Attach => + srv.devattach(m); + } + return 0; +} +exception e{ + "panic:*" => + sys->fprint(stderr, "lockfs: %s\n", e); + srv.reply(ref Rmsg.Error(gm.tag, e[len "panic:":])); + return 0; +} +} + +unlocked(l: ref Lockfile) +{ + if (l.readers > 0) + l.readers--; + else + l.writers--; + if (l.readers > 0) + return; + l.fd = nil; + + # unblock all readers at the head of the queue. + # XXX should we queuejump other readers? + while ((nreq := l.waitq.peek()) != nil && l.writers == 0) { + if (nreq.omode != Sys->OREAD && l.readers > 0) + break; + openlockfile(l, nreq); + l.waitq.get(); + } +} + +openlockfile(l: ref Lockfile, req: ref Openreq): int +{ + err: string; + (l.fd, err) = open(req.uproc, lockdir + "/" + l.d.name, req.omode); + if (l.fd == nil) { + req.srv.reply(ref Rmsg.Error(req.tag, err)); + return -1; + } + req.c.open = 1; + if (req.omode & Sys->OTRUNC) + l.d.length = big 0; + req.srv.reply(ref Rmsg.Open(req.tag, l.d.qid, Styx->MAXFDATA)); + if (req.omode == Sys->OREAD) + l.readers++; + else + l.writers++; + return 0; +} + +qid2lock(q: Sys->Qid): ref Lockfile +{ + for (ll := locks; ll != nil; ll = tl ll) + if ((hd ll).d.qid.path == q.path) + return hd ll; + return nil; +} + +lockinit() +{ + fd := sys->open(lockdir, Sys->OREAD); + if (fd == nil) + return; + + lockl: list of ref Lockfile; + # XXX if O(n²) behaviour is a problem, use Readdir module + for(;;){ + (n, e) := sys->dirread(fd); + if(n <= 0) + break; + for (i := 0; i < n; i++) { + for (l := lockl; l != nil; l = tl l) + if ((hd l).d.name == e[i].name) + break; + if (l == nil) { + e[i].qid = (maxqidpath++, 0, Sys->QTFILE); + lockl = ref Lockfile(ref Lockqueue, nil, 0, 0, e[i]) :: lockl; + } + } + } + # remove all directories from list + for (locks = nil; lockl != nil; lockl = tl lockl) + if (((hd lockl).d.mode & Sys->DMDIR) == 0) + locks = hd lockl :: locks; +} + + +fid2chan(srv: ref Styxserver, fid: int): ref Chan +{ + c := srv.fidtochan(fid); + if (c == nil) + raise "panic:bad fid"; + return c; +} + +Lockqueue.put(q: self ref Lockqueue, s: ref Openreq) +{ + q.t = s :: q.t; +} + +Lockqueue.get(q: self ref Lockqueue): ref Openreq +{ + s: ref Openreq; + if(q.h == nil) + (q.h, q.t) = (revrqlist(q.t), nil); + + if(q.h != nil) + (s, q.h) = (hd q.h, tl q.h); + + return s; +} + +Lockqueue.peek(q: self ref Lockqueue): ref Openreq +{ + s := q.get(); + if (s != nil) + q.h = s :: q.h; + return s; +} + +doflush(l: list of ref Openreq, srv: ref Styxserver, tag: int): list of ref Openreq +{ + oldl := l; + nl: list of ref Openreq; + doneone := 0; + while (l != nil) { + oreq := hd l; + if (oreq.srv != srv || (tag != -1 && oreq.tag != tag)) + nl = oreq :: nl; + else + doneone = 1; + l = tl l; + } + if (doneone) + return revrqlist(nl); + else + return oldl; +} + +Lockqueue.flush(q: self ref Lockqueue, srv: ref Styxserver, tag: int) +{ + q.h = doflush(q.h, srv, tag); + q.t = doflush(q.t, srv, tag); +} + +# or inline +revrqlist(ls: list of ref Openreq) : list of ref Openreq +{ + rs: list of ref Openreq; + while(ls != nil){ + rs = hd ls :: rs; + ls = tl ls; + } + return rs; +} + +# addr should be, e.g. tcp!*!2345 +listener(addr: string, ch: chan of (ref Sys->FD, string, Uproc), + sync: chan of (int, string), algs: list of string) +{ + addr = netmkaddr(addr, "tcp", "33234"); + (ok, c) := sys->announce(addr);; + if (ok == -1) { + sync <-= (-1, sys->sprint("cannot anounce on %s: %r", addr)); + return; + } + sync <-= (sys->pctl(0, nil), nil); + for (;;) { + (n, nc) := sys->listen(c); + if (n == -1) { + ch <-= (nil, sys->sprint("listen failed: %r"), nil); + return; + } + dfd := sys->open(nc.dir + "/data", Sys->ORDWR); + if (dfd != nil) { + if (algs == nil) + ch <-= (dfd, nil, nil); + else + spawn authenticator(dfd, ch, algs); + } + } +} + +# authenticate a connection, setting the user id appropriately, +# and then act as a server, performing file operations +# on behalf of the central process. +authenticator(dfd: ref Sys->FD, ch: chan of (ref Sys->FD, string, Uproc), algs: list of string) +{ + (fd, err) := auth->server(algs, authinfo, dfd, 1); + if (fd == nil) { + if (verbose) + sys->fprint(stderr, "lockfs: authentication failed: %s\n", err); + return; + } + uproc := chan of (ref Ureq, chan of (ref Sys->FD, string)); + ch <-= (fd, err, uproc); + for (;;) { + (req, reply) := <-uproc; + if (req == nil) + exit; + reply <-= doreq(req); + } +} + +create(uproc: Uproc, file: string, omode: int, perm: int): (ref Sys->FD, string) +{ + return proxydoreq(uproc, ref Ureq.Create(file, omode, perm)); +} + +open(uproc: Uproc, file: string, omode: int): (ref Sys->FD, string) +{ + return proxydoreq(uproc, ref Ureq.Open(file, omode)); +} + +remove(uproc: Uproc, file: string): string +{ + return proxydoreq(uproc, ref Ureq.Remove(file)).t1; +} + +wstat(uproc: Uproc, file: string, d: Sys->Dir): string +{ + return proxydoreq(uproc, ref Ureq.Wstat(file, d)).t1; +} + +proxydoreq(uproc: Uproc, req: ref Ureq): (ref Sys->FD, string) +{ + if (uproc == nil) + return doreq(req); + reply := chan of (ref Sys->FD, string); + uproc <-= (req, reply); + return <-reply; +} + +doreq(greq: ref Ureq): (ref Sys->FD, string) +{ + fd: ref Sys->FD; + err: string; + pick req := greq { + Open => + if ((fd = sys->open(req.fname, req.omode)) == nil) + err = sys->sprint("%r"); + Create => + if ((fd = sys->create(req.fname, req.omode, req.perm)) == nil) + err = sys->sprint("%r"); + Remove => + if (sys->remove(req.fname) == -1) + err = sys->sprint("%r"); + Wstat => + if (sys->wstat(req.fname, req.dir) == -1) + err = sys->sprint("%r"); + } + return (fd, err); +} + +netmkaddr(addr, net, svc: string): string +{ + if(net == nil) + net = "net"; + (n, nil) := sys->tokenize(addr, "!"); + if(n <= 1){ + if(svc== nil) + return sys->sprint("%s!%s", net, addr); + return sys->sprint("%s!%s!%s", net, addr, svc); + } + if(svc == nil || n > 2) + return addr; + return sys->sprint("%s!%s", addr, svc); +} + +user(): string +{ + fd := sys->open("/dev/user", sys->OREAD); + if(fd == nil){ + sys->fprint(stderr, "lockfs: can't open /dev/user: %r\n"); + raise "fail:no user"; + } + + buf := array[Sys->NAMEMAX] of byte; + n := sys->read(fd, buf, len buf); + if(n < 0) { + sys->fprint(stderr, "lockfs: failed to read /dev/user: %r\n"); + raise "fail:no user"; + } + + return string buf[0:n]; +} + +now(): int +{ + buf := array[128] of byte; + sys->seek(timefd, big 0, 0); + if ((n := sys->read(timefd, buf, len buf)) < 0) + return 0; + return int (big string buf[0:n] / big 1000000); +} |
