add a sample of FLUX code

These samples were taken from the paper "Flux: A Language for
Programming High-Performance Servers", by Burns et al and Flux V0.02
which can be found here:

https://plasma.cs.umass.edu/emery/flux.1.html
This commit is contained in:
Erik Faye-Lund
2016-02-18 21:41:38 +00:00
parent 8de50edb41
commit 41713d7719
4 changed files with 287 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
typedef engine isEngineMessage;
typedef turn isTurnMessage;
typedef connect isConnectMessage;
typedef disconnect isDisconnectMessage;
ClientMessage(char* data) => ();
ParseMessage(char* data) => (int type, int client, char* data);
ReadMessage(int type, int client, char* data) => ();
ParseEngine(int type, int client, char* data) => (int client, int direction);
DoEngine(int client, int direction) => ();
ParseTurn(int type, int client, char* data) => (int client, int direction);
DoTurn(int client, int direction) => ();
ParseConnect(int type, int client, char* data)
=> (int client, char* host, int port);
DoConnect(int client, char* host, int port) => ();
ParseDisconnect(int type, int client, char* data) => (int client);
DoDisconnect(int client) => ();
UpdateBoard(ClientList clients) => (ClientList clients);
SendData(ClientList clients) => ();
DoUpdate(ClientList clients) => ();
DataTimer() => (ClientList clients);
GetClients() => (ClientList clients);
Wait(ClientList clients) => (ClientList clients);
Listen () => (char* data);
source Listen => ClientMessage;
source DataTimer => DoUpdate;
DataTimer = GetClients -> Wait;
DoUpdate = UpdateBoard -> SendData;
ClientMessage=ParseMessage -> ReadMessage;
ReadMessage:[engine, _, _] = ParseEngine -> DoEngine;
ReadMessage:[turn, _, _] = ParseTurn -> DoTurn;
ReadMessage:[connect, _, _] = ParseConnect -> DoConnect;
ReadMessage:[disconnect, _, _] = ParseDisconnect -> DoDisconnect;
atomic GetClients:{client_lock};
atomic DoConnect:{client_lock};
atomic DoDisconnect:{client_lock};

View File

@@ -0,0 +1,44 @@
typedef xml TestXML;
typedef html TestHTML;
typedef inCache TestInCache;
Page (int socket) => ();
ReadRequest (int socket) => (int socket, bool close, image_tag *request);
CheckCache (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Handler (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Complete (int socket, bool close, image_tag *request) => ();
ReadInFromDisk (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request, __u8 *rgb_data);
Write (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Compress(int socket, bool close, image_tag *request, __u8 *rgb_data)
=> (int socket, bool close, image_tag *request);
StoreInCache(int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Listen ()
=> (int socket);
source Listen => Page;
Handler:[_, _, inCache]=;
Handler:[_, _, _]=ReadInFromDisk -> Compress -> StoreInCache;
Page = ReadRequest -> CheckCache-> Handler -> Write -> Complete;
atomic CheckCache:{cache};
atomic StoreInCache:{cache};
atomic Complete:{cache};
handle error ReadInFromDisk => FourOhFor;

151
samples/FLUX/mbittorrent.fx Normal file
View File

@@ -0,0 +1,151 @@
typedef choke TestChoke;
typedef unchoke TestUnchoke;
typedef interested TestInterested;
typedef uninterested TestUninterested;
typedef request TestRequest;
typedef cancel TestCancel;
typedef piece TestPiece;
typedef bitfield TestBitfield;
typedef have TestHave;
typedef piececomplete TestPieceComplete;
CheckinWithTracker (torrent_data_t *tdata)
=> ();
SendRequestToTracker (torrent_data_t *tdata)
=> (torrent_data_t *tdata, int socket);
GetTrackerResponse (torrent_data_t *tdata, int socket)
=> ();
UpdateChokeList (torrent_data_t *tdata)
=> ();
PickChoked (torrent_data_t *tdata)
=> (torrent_data_t *tdata, chokelist_t clist);
SendChokeUnchoke (torrent_data_t *tdata, chokelist_t clist)
=> ();
SetupConnection (torrent_data_t *tdata, int socket)
=> ();
Handshake (torrent_data_t *tdata, int socket)
=> (torrent_data_t *tdata, client_data_t *client);
SendBitfield (torrent_data_t *tdata, client_data_t *client)
=> ();
Message (torrent_data_t *tdata, client_data_t *client)
=> ();
ReadMessage (torrent_data_t *tdata, client_data_t *client)
=> (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload);
HandleMessage (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
MessageDone (client_data_t *client)
=> ();
CompletePiece (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client);
VerifyPiece (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client, int piece);
SendHave (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client);
SendUninterested (torrent_data_t *tdata, client_data_t *client)
=> (torrent_data_t *tdata, client_data_t *client);
Choke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
Cancel (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
Interested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
Uninterested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
Bitfield (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
Unchoke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client);
SendRequest (torrent_data_t *tdata, client_data_t *client)
=> (client_data_t *client);
Have (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client);
Piece (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client, int piece);
Request (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);
SendKeepAlives (torrent_data_t *tdata)
=> ();
GetClients ()
=> (int maxfd, fd_set *fds);
SelectSockets (int maxfd, fd_set *fds)
=> (fd_set *fds);
CheckSockets (fd_set *fds)
=> (torrent_data_t *tdata, client_data_t *client);
TrackerTimer ()
=> (torrent_data_t *tdata);
ChokeTimer ()
=> (torrent_data_t *tdata);
Connect ()
=> (torrent_data_t *tdata, int socket);
KeepAliveTimer ()
=> (torrent_data_t *tdata);
Listen ()
=> (torrent_data_t *tdata, client_data_t *client);
source TrackerTimer => CheckinWithTracker;
source ChokeTimer => UpdateChokeList;
source Connect => SetupConnection;
source Listen => Message;
source KeepAliveTimer => SendKeepAlives;
Listen = GetClients -> SelectSockets -> CheckSockets;
CheckinWithTracker = SendRequestToTracker -> GetTrackerResponse;
UpdateChokeList = PickChoked -> SendChokeUnchoke;
SetupConnection = Handshake -> SendBitfield;
Message = ReadMessage -> HandleMessage -> MessageDone;
CompletePiece:[_, _, piececomplete] = VerifyPiece -> SendHave -> SendUninterested;
HandleMessage:[_, _, choke, _, _] = Choke;
HandleMessage:[_, _, unchoke, _, _] = Unchoke -> SendRequest;
HandleMessage:[_, _, interested, _, _] = Interested;
HandleMessage:[_, _, uninterested, _, _] = Uninterested;
HandleMessage:[_, _, request, _, _] = Request;
HandleMessage:[_, _, cancel, _, _] = Cancel;
HandleMessage:[_, _, piece, _, _] = Piece -> CompletePiece -> SendRequest;
HandleMessage:[_, _, bitfield, _, _] = Bitfield;
HandleMessage:[_, _, have, _, _] = Have -> SendRequest;
atomic GetClients:{BigLock};
atomic CheckSockets:{BigLock};
atomic Message:{BigLock};
atomic CheckinWithTracker:{BigLock};
atomic UpdateChokeList:{BigLock};
atomic SetupConnection:{BigLock};
atomic SendKeepAlives:{BigLock};

38
samples/FLUX/test.fx Normal file
View File

@@ -0,0 +1,38 @@
// concrete node signatures
Listen ()
=> (int socket);
ReadRequest (int socket)
=> (int socket, bool close, image_tag *request);
CheckCache (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
// omitted for space:
// ReadInFromDisk, StoreInCache
Compress (int socket, bool close, image_tag *request, __u8 *rgb_data)
=> (int socket, bool close, image_tag *request);
Write (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Complete (int socket, bool close, image_tag *request) => ();
// source node
source Listen => Image;
// abstract node
Image = ReadRequest -> CheckCache -> Handler -> Write -> Complete;
// predicate type & dispatch
typedef hit TestInCache;
Handler:[_, _, hit] = ;
Handler:[_, _, _] =
ReadInFromDisk -> Compress -> StoreInCache;
// error handler
handle error ReadInFromDisk => FourOhFor;
// atomicity constraints
atomic CheckCache:{cache};
atomic StoreInCache:{cache};
atomic Complete:{cache};