diff --git a/samples/FLUX/gameserver.fx b/samples/FLUX/gameserver.fx new file mode 100644 index 00000000..9c40d5f1 --- /dev/null +++ b/samples/FLUX/gameserver.fx @@ -0,0 +1,54 @@ +typedef engine isEngineMessage; +typedef turn isTurnMessage; +typedef connect isConnectMessage; +typedef disconnect isDisconnectMessage; + +ClientMessage(char* data) => (); +ParseMessage(char* data) => (int type, int client, char* data); +ReadMessage(int type, int client, char* data) => (); + +ParseEngine(int type, int client, char* data) => (int client, int direction); +DoEngine(int client, int direction) => (); + +ParseTurn(int type, int client, char* data) => (int client, int direction); +DoTurn(int client, int direction) => (); + +ParseConnect(int type, int client, char* data) + => (int client, char* host, int port); +DoConnect(int client, char* host, int port) => (); + +ParseDisconnect(int type, int client, char* data) => (int client); +DoDisconnect(int client) => (); + +UpdateBoard(ClientList clients) => (ClientList clients); +SendData(ClientList clients) => (); + +DoUpdate(ClientList clients) => (); + +DataTimer() => (ClientList clients); + +GetClients() => (ClientList clients); + +Wait(ClientList clients) => (ClientList clients); + +Listen () => (char* data); + +source Listen => ClientMessage; +source DataTimer => DoUpdate; + +DataTimer = GetClients -> Wait; + +DoUpdate = UpdateBoard -> SendData; + +ClientMessage=ParseMessage -> ReadMessage; + +ReadMessage:[engine, _, _] = ParseEngine -> DoEngine; +ReadMessage:[turn, _, _] = ParseTurn -> DoTurn; +ReadMessage:[connect, _, _] = ParseConnect -> DoConnect; +ReadMessage:[disconnect, _, _] = ParseDisconnect -> DoDisconnect; + +atomic GetClients:{client_lock}; +atomic DoConnect:{client_lock}; +atomic DoDisconnect:{client_lock}; + + diff --git a/samples/FLUX/imageserver.fx b/samples/FLUX/imageserver.fx new file mode 100644 index 00000000..effbfaed --- /dev/null +++ b/samples/FLUX/imageserver.fx @@ -0,0 +1,44 @@ +typedef xml TestXML; +typedef html TestHTML; + +typedef inCache TestInCache; + +Page (int socket) => (); + +ReadRequest (int socket) => (int socket, bool close, image_tag *request); + +CheckCache (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); + +Handler (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); + +Complete (int socket, bool close, image_tag *request) => (); + +ReadInFromDisk (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request, __u8 *rgb_data); + +Write (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); + +Compress(int socket, bool close, image_tag *request, __u8 *rgb_data) + => (int socket, bool close, image_tag *request); + +StoreInCache(int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); + +Listen () + => (int socket); + +source Listen => Page; + +Handler:[_, _, inCache]=; +Handler:[_, _, _]=ReadInFromDisk -> Compress -> StoreInCache; + +Page = ReadRequest -> CheckCache-> Handler -> Write -> Complete; + +atomic CheckCache:{cache}; +atomic StoreInCache:{cache}; +atomic Complete:{cache}; + +handle error ReadInFromDisk => FourOhFor; diff --git a/samples/FLUX/mbittorrent.fx b/samples/FLUX/mbittorrent.fx new file mode 100644 index 00000000..620bcd54 --- /dev/null +++ b/samples/FLUX/mbittorrent.fx @@ -0,0 +1,151 @@ +typedef choke TestChoke; +typedef unchoke TestUnchoke; +typedef interested TestInterested; +typedef uninterested TestUninterested; +typedef request TestRequest; +typedef cancel TestCancel; +typedef piece TestPiece; +typedef bitfield TestBitfield; +typedef have TestHave; +typedef piececomplete TestPieceComplete; + +CheckinWithTracker (torrent_data_t *tdata) + => (); + +SendRequestToTracker (torrent_data_t *tdata) + => (torrent_data_t *tdata, int socket); + +GetTrackerResponse (torrent_data_t *tdata, int socket) + => (); + +UpdateChokeList (torrent_data_t *tdata) + => (); + +PickChoked (torrent_data_t *tdata) + => (torrent_data_t *tdata, chokelist_t clist); + +SendChokeUnchoke (torrent_data_t *tdata, chokelist_t clist) + => (); + +SetupConnection (torrent_data_t *tdata, int socket) + => (); + +Handshake (torrent_data_t *tdata, int socket) + => (torrent_data_t *tdata, client_data_t *client); + +SendBitfield (torrent_data_t *tdata, client_data_t *client) + => (); + +Message (torrent_data_t *tdata, client_data_t *client) + => (); + +ReadMessage (torrent_data_t *tdata, client_data_t *client) + => (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload); + +HandleMessage (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +MessageDone (client_data_t *client) + => (); + +CompletePiece (torrent_data_t *tdata, client_data_t *client, int piece) + => (torrent_data_t *tdata, client_data_t *client); + +VerifyPiece (torrent_data_t *tdata, client_data_t *client, int piece) + => (torrent_data_t *tdata, client_data_t *client, int piece); + +SendHave (torrent_data_t *tdata, client_data_t *client, int piece) + => (torrent_data_t *tdata, client_data_t *client); + +SendUninterested (torrent_data_t *tdata, client_data_t *client) + => (torrent_data_t *tdata, client_data_t *client); + +Choke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +Cancel (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +Interested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +Uninterested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +Bitfield (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +Unchoke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (torrent_data_t *tdata, client_data_t *client); + +SendRequest (torrent_data_t *tdata, client_data_t *client) + => (client_data_t *client); + +Have (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (torrent_data_t *tdata, client_data_t *client); + +Piece (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (torrent_data_t *tdata, client_data_t *client, int piece); + +Request (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload) + => (client_data_t *client); + +SendKeepAlives (torrent_data_t *tdata) + => (); + +GetClients () + => (int maxfd, fd_set *fds); + +SelectSockets (int maxfd, fd_set *fds) + => (fd_set *fds); + +CheckSockets (fd_set *fds) + => (torrent_data_t *tdata, client_data_t *client); + +TrackerTimer () + => (torrent_data_t *tdata); + +ChokeTimer () + => (torrent_data_t *tdata); + +Connect () + => (torrent_data_t *tdata, int socket); + +KeepAliveTimer () + => (torrent_data_t *tdata); + +Listen () + => (torrent_data_t *tdata, client_data_t *client); + +source TrackerTimer => CheckinWithTracker; +source ChokeTimer => UpdateChokeList; +source Connect => SetupConnection; +source Listen => Message; +source KeepAliveTimer => SendKeepAlives; + +Listen = GetClients -> SelectSockets -> CheckSockets; +CheckinWithTracker = SendRequestToTracker -> GetTrackerResponse; +UpdateChokeList = PickChoked -> SendChokeUnchoke; +SetupConnection = Handshake -> SendBitfield; +Message = ReadMessage -> HandleMessage -> MessageDone; + +CompletePiece:[_, _, piececomplete] = VerifyPiece -> SendHave -> SendUninterested; + +HandleMessage:[_, _, choke, _, _] = Choke; +HandleMessage:[_, _, unchoke, _, _] = Unchoke -> SendRequest; +HandleMessage:[_, _, interested, _, _] = Interested; + +HandleMessage:[_, _, uninterested, _, _] = Uninterested; +HandleMessage:[_, _, request, _, _] = Request; +HandleMessage:[_, _, cancel, _, _] = Cancel; +HandleMessage:[_, _, piece, _, _] = Piece -> CompletePiece -> SendRequest; +HandleMessage:[_, _, bitfield, _, _] = Bitfield; +HandleMessage:[_, _, have, _, _] = Have -> SendRequest; + +atomic GetClients:{BigLock}; +atomic CheckSockets:{BigLock}; +atomic Message:{BigLock}; +atomic CheckinWithTracker:{BigLock}; +atomic UpdateChokeList:{BigLock}; +atomic SetupConnection:{BigLock}; +atomic SendKeepAlives:{BigLock}; diff --git a/samples/FLUX/test.fx b/samples/FLUX/test.fx new file mode 100644 index 00000000..bb81fa9a --- /dev/null +++ b/samples/FLUX/test.fx @@ -0,0 +1,38 @@ +// concrete node signatures +Listen () + => (int socket); + +ReadRequest (int socket) + => (int socket, bool close, image_tag *request); + +CheckCache (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); + +// omitted for space: +// ReadInFromDisk, StoreInCache +Compress (int socket, bool close, image_tag *request, __u8 *rgb_data) + => (int socket, bool close, image_tag *request); +Write (int socket, bool close, image_tag *request) + => (int socket, bool close, image_tag *request); +Complete (int socket, bool close, image_tag *request) => (); + +// source node +source Listen => Image; + +// abstract node +Image = ReadRequest -> CheckCache -> Handler -> Write -> Complete; + +// predicate type & dispatch +typedef hit TestInCache; +Handler:[_, _, hit] = ; +Handler:[_, _, _] = +ReadInFromDisk -> Compress -> StoreInCache; + +// error handler +handle error ReadInFromDisk => FourOhFor; + +// atomicity constraints +atomic CheckCache:{cache}; +atomic StoreInCache:{cache}; +atomic Complete:{cache}; +