WS

F# WebSocket Server

Annotation

The idea to write web servers and web frameworks in all languages came to my mind ever since I realized that what I did for the Erlang ecosystem: the direction of frameworks for enterprises under the general brand N2O, and now as part of the erp.uno platform; quite applicable for other languages and platforms as well. This article provides a version of the websocket server for the F# programming language, ws.erp.uno. Package address: nuget.org/packages/ws. Repository address: erpuno/ws.

Foreword

Haskell. The first experiment was carried out by Andrey Melnikov as a port for Haskell: N2O.HS, later a more complete version with N2O and NITRO expressed in existential signatures was made by Marat Khafizov, who runs the O3 Github organization and the o3.click website. It is completely incomprehensible to me why not a single Haskell programmer, who seems to be supposed to admire minimalism, does not follow this path, but usually looks for the truth in such frameworks as UrWeb, IHP, UnisonWeb. In my opinion, these are all overcomplicated things.

Standard ML. Also for academic purposes, Marat Khafizov made a port of the bundle of the N2O web server and the NITRO web framework into the Standard ML language (both major versions of SML/NJ and MLton) — this work is presented by the O1 organization on Github. This is the language that I consider appropriate to teach as the first academic programming language (prior to acquaintance with the industrial languages Erlang, F#, Haskell).

Lean. To consolidate my idea and articulate it more clearly and accurately, I asked Siegmentation Fault to make a port to an even more formal programming language, the Lean 4 mathematical prover. This version of the N2O web server and the NITRO web framework is presented by Github by the O89 organization and two sites at once: lean4.dev and bum.pm. The latter is a package manager written in Lean 4, which Aleksandr Temerev from CERN helps us to maintain. Lean 4 N2O projects were liked by Leonardo de Moura, author of Lean and Z3, and we are immensely happy about it.

Idiomatic WebSocket server in F#

Idiomatic criteria can be perceived differently by everyone, but basically this means a minimum of preludes and a maximum of essence, one way or another, the main mantra of all minimalists in general and N2O infrastructure in particular. So, in the modern criteria for the idiomaticity of a web server for the F# language, I would highlight the following: 1) the use of the System.Net.WebSockets system classes, which already provide a buffered encoder and decoder of frames of the RFC 6455 standard; 2) the server must be built on Async computational expressions; 3) MailboxProcessor should be used to manage asynchronous threads of execution (lightweight processes), and not a self-written system of workers, which, although it will help squeeze the latter out of F# (I got 14 million messages per second), will not demonstrate the essence, since it will be a deviation towards actor runtimes; 4) Using the TcpListener and TcpClient, NetworkStream classes. You are not allowed to use anything else!

What to read before writing?

After a bit of googling, I realized that the Internet lacks an article that describes the history of the concept of asynchronous computations which are popularly known by the async/await keywords. I see the future article called "Survey of brief Async history", which will show a retrospective of Async technology:

0) J operator 1965;
1) LISP call/cc 1968;
2) Erlang 1986;
3) Concurrent ML 1998;
4) Haskell async 2004;
5) C # async yield 2006;
6) Perl IO: Async 2007;
7) F# Async 2010
8) C# / PHP Async 2012
9) Python async 2015
10) ECMAScript async 2017

The seminal article on F# async is Leo Gorodinsky's `F# Async Guide`. The main book that I would recommend to look through before getting acquainted with F# is `Expert F# 4.0` by the author of the language Don Syme. The main presentation on F# Async, I would call Don Syme's talk at the London meetup — `Some F# for the Erlang programmer`. Armed with these documents and this Gist snippet, I went to Lviv to write the most idiomatic websocket server.

Showcase

As is usually accepted in backtracking systems, Prolog and declarative languages, we will move from the end, namely from the interface that we want to get. I would like the ECHO Server to be an id function.

open N2O module Program = [<EntryPoint>] let main _ = let mutable ret = 0 try Server.proto <- fun _ -> id use ws = Server.start "0.0.0.0" 1900 System.Threading.Thread.Sleep -1 with exn -> printfn "EXIT: %s" exn.Message ret <- 1 ret

Asynchronous process architecture

For those familiar with the Erlang/OTP architecture, it is known that designing network applications begins with a supervision tree of lightweight processes and the protocols that govern their interactions. Child processes usually share CancellationToken lifetime tokens, so that exceptions thrown in parent processes can cancel the entire subprocess tree. Therefore, the async process loops contain the expression:

while not ct.IsCancellationRequested do

Our websocket server consists of 7 asynchronous processes:

[Sup] [L]* / / [start]--[S]--[C]* \ \ [H] [T]*

The legend of this tree is as follows: [start] node is the entry point from which the rest of the asynchronous processes will be born, corresponds to the start function; The [S] node cooperates with the asynchronous process represented by the listen function; [Sup] node corresponds to the startSupervisor function; [C] the node corresponds to the startClient function; [H] node meets the heartbeat function requirement; [L] node matches the loop function; The [T] node meets the telemetry function requirement. An asterisk will denote processes, the number of which depends on the number of active connections: [C]*, [L]*, [T]*.

Interaction protocols

At the moment of birth of the client [C], in the parent process of the server [S], the notification [S] -> [Sup] takes place according to the so-called Supervisor Sup protocol with the same type. The public protocol of the public function Server.protocol is represented by the Msg type , which is used to control an asynchronous process [L].

The server ping system is implemented compatible with the Sup and Msg protocols , the heartbit process [H] sends a Tick message at intervals to the supervisor [Sup], which in turn sends broadcasts to all telemetry clients [T] created on the same queue as [C], ie the same protocol.

Processes [T], [L] and [C] share the WebSocket stream and are all connected to the server's supervisor, notifying it in case of exceptions.

type Msg = | Bin of byte array | Text of string | Nope type Sup = | Connect of MailboxProcessor<Msg> * WebSocket | Disconnect of MailboxProcessor<Msg> | Close of WebSocket | Tick type Req = { path : string; method : string; version : string; headers : NameValueCollection }

I state: for any WebSocket server the following signature is the only needed:

proto : Req -> Msg -> Msg

Also, no more types needed! The Req record type is required in addition to Sup and Msg union types because: a) we need to pass endpoint URL and probably peer IP address to WebSocket async processes; b) we need to dispatch protocol modules (such as ECHO) through URL path; c) we need to read X-Authorization token from RFC 2616 headers in production environment, which is basically the whole request parser, implemented in separete module.

Thus our ECHO protocol function including router could be seen as:

let echo : Req -> Msg -> Msg = fun _ -> id

In pure functional manner you can build even complex router which maps URI services to particular monoidal functions:

let nope = fun _ -> Nope let tick = fun _ -> Text "TICK" let echo = id let route : Req -> Msg -> Msg = fun x -> match x.path with | "/nope" -> nope | "/tick" -> tick | "/echo" -> echo | _ -> id

RFC 2616 Header Parser

Simple header request parser function is a prelude for RFC 6455 handshake. This function populates NameValueCollection with parsed headers from the initial HTTP request.

let parseHeader (headers : NameValueCollection) (line : string) : unit = match line.Split(':', 2, StringSplitOptions.TrimEntries) with | [| key; value |] -> headers.Add(key.ToLower(), value) | _ -> () let request (lines : string array) : Req = let req = { path = ""; version = ""; method = ""; headers = NameValueCollection() } match (Array.head lines).Split(' ', StringSplitOptions.RemoveEmptyEntries) with | [| method; uri; version |] -> Array.iter (parseHeader req.headers) (Array.tail lines) { req with path = uri; version = version; method = method } | _ -> req

RFC 6455 Handshake

Functions for handling HTTP headers. isWebSocketsUpgrade looks for an Upgrade and WebSocket pair in the headers. getLines returns headers as an array of strings, and the getKey function returns the header value by its key.

let isWebSocketsUpgrade (req: Req) = req.headers.["upgrade"].ToLower() = "websocket" let getLines (bytes: Byte []) len = if len > 8 then bytes.[..(len - 9)] |> UTF8Encoding.UTF8.GetString |> fun hs -> hs.Split([| "\r\n" |], StringSplitOptions.RemoveEmptyEntries) else [||]

The RFC 6455 response feature is called handshake. As far as I know, this functionality is not in the system namespaces..

let acceptString6455 acceptCode = "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + acceptCode + "\r\n\r\n" let handshake lines = req.headers.["sec-websocket-key"] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" |> Encoding.ASCII.GetBytes |> SHA1CryptoServiceProvider.Create().ComputeHash |> Convert.ToBase64String |> acceptString6455 |> Encoding.ASCII.GetBytes

Asynchronous Server Processes

The first process, [start], is an entry point where three processes start at once: the supervisor of all connections [Sup] process, the connection listener server process [S], and, if the Server.ticker flag is enabled, the heartbeat process, which works as an interval cyclic timer [H]. The epilogue of the [start] process contains the tokenization of the global token for all subprocesses when the variable is released, which contains the websocket server in external code.

let start (addr: string) (port: int) = let cts = new CancellationTokenSource() let token = cts.Token let sup = startSupervisor token let listener = TcpListener(IPAddress.Parse(addr), port) try listener.Start(10) with | :? SocketException -> failwithf "%s:%i is acquired" addr port | err -> failwithf "%s" err.Message Async.Start(listen listener token sup, token) if ticker then Async.Start(heartbeat interval token sup, token) { new IDisposable with member x.Dispose() = cts.Cancel() }

The second process [Sup], the supervisor, is a pure function that processes messages from protocol supervisors about registration and death of the connections. There is also a broadcast of messages as a reaction to the heartbeat of the ticker.

let startSupervisor (ct: CancellationToken) = MailboxProcessor.Start( (fun (inbox: MailboxProcessor<Sup>) -> let listeners = ResizeArray<_>() async { while not ct.IsCancellationRequested do match! inbox.Receive() with | Close ws -> () | Connect (l, ns) -> listeners.Add(l) | Disconnect l -> listeners.Remove(l) |> ignore | Tick -> listeners.ForEach(fun l -> l.Post Nope) }), cancellationToken = ct )

Interval heartbeat timer [H].

let heartbeat (interval: int) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { while not ct.IsCancellationRequested do do! Async.Sleep interval sup.Post(Tick) }

The main loop of the process [S], which accepts new TCP connections and starts new clients [C].

let listen (listener: TcpListener) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { while not ct.IsCancellationRequested do let! client = listener.AcceptTcpClientAsync() |> Async.AwaitTask client.NoDelay <- true startClient client sup ct |> ignore }

Asychronous process [C] with a queue (MailboxProcessor) for processing TCP connections or, more simply, a TCP client. This is the entry point for the client connection, and this is where the handshake happens. In case of a successful handshake, we send RFC 6455 a response and launch two asynchronous processes at once: the first is the processing cycle of the websocket messages [L] itself, and also, if the Server.ticker flag is set, we start the telemetry process [T], which shares the WebSocket stream and can perform asynchronous message flushing there, competing with the main loop [L]. Such processes always exist in pairs.

let startClient (tcp: TcpClient) (sup: MailboxProcessor<Sup>) (ct: CancellationToken) = MailboxProcessor.Start( (fun (inbox: MailboxProcessor<Msg>) -> async { let ns = tcp.GetStream() let size = tcp.ReceiveBufferSize let bytes = Array.create size (byte 0) let! len = ns.ReadAsync(bytes, 0, bytes.Length) |> Async.AwaitTask let lines = getLines bytes len match isWebSocketsUpgrade lines with | true -> do! ns.AsyncWrite (handshake lines) let ws = WebSocket.CreateFromStream( (ns :> Stream), true, "n2o", TimeSpan(1, 0, 0)) sup.Post(Connect(inbox, ws)) if ticker then Async.Start(telemetry ws inbox ct sup, ct) return! looper ws size ct sup | _ -> tcp.Close() }), cancellationToken = ct )

The telemetry process [T] listens to the queue, and for any message, sends the text "TICK" to the websocket channel.

let telemetry (ws: WebSocket) (inbox: MailboxProcessor<Msg>) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { try while not ct.IsCancellationRequested do let! _ = inbox.Receive() do! send ws ct (Text "TICK") finally sup.Post(Disconnect <| inbox) ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "TELEMETRY", ct) |> ignore }

The main message loop [L], in which a buffered WebSocket stream is created, the type of which is explicitly present in the supervisor protocol. Also, a global buffer for the entire cycle is allocated here, where the bytes of their socket are copied using ReceiveAsync. When an exception occurs, the supervisor is notified with a Close message, which signals a disconnection, for example, in the event of a UTF-8 validation error.

let looper (ws: WebSocket) (bufferSize: int) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { try let mutable bytes = Array.create bufferSize (byte 0) while not ct.IsCancellationRequested do let! result = ws.ReceiveAsync(ArraySegment<byte>(bytes), ct) |> Async.AwaitTask let recv = bytes.[0..result.Count - 1] match (result.MessageType) with | WebSocketMessageType.Text -> do! protocol (Text (Encoding.UTF8.GetString recv)) |> send ws ct | WebSocketMessageType.Binary -> do! protocol (Bin recv) |> send ws ct | WebSocketMessageType.Close -> () | _ -> printfn "PROTOCOL VIOLATION" finally sup.Post(Close <| ws) ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "LOOPER", ct) |> ignore }

Channel termination functions inherit the archaic, in my opinion, separation of text and binary messages. As practice shows, treating everything as binary messages only improves the semantics of the protocol.

let sendBytes (ws: WebSocket) ct bytes = ws.SendAsync(ArraySegment<byte>(bytes), WebSocketMessageType.Binary, true, ct) |> ignore let send ws ct (msg: Msg) = async { match msg with | Text text -> sendBytes ws ct (Encoding.UTF8.GetBytes text) | Bin arr -> sendBytes ws ct arr | Nope -> () }

What's next?

Then there are next phases:

1) BERT serialization for compatibility with N2O client infrastructure;
2) Implementation of the NITRO protocol.

Acknowledgments

I would like to thank everyone who liked our project, especially Phillip Carter, program manager of .NET and F# We are extremely excited!

Authors

Maxim Sokhatsky, Igor Gorodetsky, Siegmentation Fault