--- title: "nanonext - NNG Lightweight Messaging Library" vignette: > %\VignetteIndexEntry{nanonext - NNG Lightweight Messaging Library} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### Table of Contents 1. [Cross-language Exchange](#cross-language-exchange) 2. [Async and Concurrency](#async-and-concurrency) 3. [RPC and Distributed Computing](#rpc-and-distributed-computing) 4. [Synchronisation Primitives](#synchronisation-primitives) 5. [TLS Secure Connections](#tls-secure-connections) 6. [Publisher / Subscriber Model](#publisher-subscriber-model) 7. [Surveyor / Respondent Model](#surveyor-respondent-model) 8. [ncurl: (Async) HTTP Client](#ncurl-async-http-client) 9. [stream: Websocket Client](#stream-websocket-client) 10. [Options, Serialization and Statistics](#options-serialization-and-statistics) ### Cross-language Exchange `nanonext` provides a fast and reliable data interface between different programming languages where NNG has an implementation, including C, C++, Java, Python, Go, Rust etc. The following example demonstrates the exchange of numerical data between R and Python (NumPy), two of the most commonly-used languages for data science and machine learning. Using a messaging interface provides a clean and robust approach, light on resources with limited and identifiable points of failure. This approach can also serve as an interface / pipe between different processes written in the same or different languages, running on the same computer or distributed across networks, and is an enabler of modular software design as espoused by the Unix philosophy. One solution it provides is that of processing real-time data where computation times exceed the data frequency - by dividing the computation into stages, this may be set up as a pipeline or 'cascade' of processes, each connected using NNG sockets. Create socket in Python using the NNG binding 'pynng': ``` python import numpy as np import pynng socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket") ``` Create nano object in R using `nanonext`, then send a vector of 'doubles', specifying mode as 'raw': ``` r library(nanonext) n <- nano("pair", dial = "ipc:///tmp/nanonext.socket") n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw") #> [1] 0 ``` Receive in Python as a NumPy array of 'floats', and send back to R: ``` python raw = socket.recv() array = np.frombuffer(raw) print(array) #> [1.1 2.2 3.3 4.4 5.5] msg = array.tobytes() socket.send(msg) socket.close() ``` Receive in R, specifying the receive mode as 'double': ``` r n$recv(mode = "double") #> [1] 1.1 2.2 3.3 4.4 5.5 n$close() ``` [« Back to ToC](#table-of-contents) ### Async and Concurrency `nanonext` implements true async send and receive, leveraging NNG as a massively-scaleable concurrency framework. ``` r s1 <- socket("pair", listen = "inproc://nano") s2 <- socket("pair", dial = "inproc://nano") ``` `send_aio()` and `recv_aio()` functions return immediately with an 'Aio' object, but perform their operations async. An 'Aio' object returns an unresolved value whilst its asynchronous operation is ongoing, automatically resolving to a final value once complete. ``` r # an async receive is requested, but no messages are waiting (yet to be sent) msg <- recv_aio(s2) msg #> < recvAio | $data > msg$data #> 'unresolved' logi NA ``` For a 'sendAio' object, the result is stored at `$result`. ``` r res <- send_aio(s1, data.frame(a = 1, b = 2)) res #> < sendAio | $result > res$result #> [1] 0 ``` *Note: a return value of 0 denotes a successful send, meaning that the message has been accepted by the socket for sending; the message itself may still be buffered within the system.* For a 'recvAio' object, the message is stored at `$data`. ``` r # now that a message has been sent, the 'recvAio' resolves automatically msg$data #> a b #> 1 1 2 ``` Auxiliary function `unresolved()` may be used in control flow statements to perform actions which depend on resolution of the Aio, both before and after. This means there is no need to actually wait (block) for an Aio to resolve, as the example below demonstrates. ``` r msg <- recv_aio(s2) # unresolved() queries for resolution itself so no need to use it again within the while loop while (unresolved(msg)) { # do stuff before checking resolution again send_aio(s1, "resolved") cat("unresolved") } #> unresolved # perform actions which depend on the Aio value outside the while loop msg$data #> [1] "resolved" ``` The values may also be called explicitly using `call_aio()`. This will wait for completion of the Aio (blocking). ``` r # will wait for completion then return the resolved Aio call_aio(msg) # to access the resolved value (waiting if required): call_aio(msg)$data #> [1] "resolved" # or directly: collect_aio(msg) #> [1] "resolved" # or user-interruptible: msg[] #> [1] "resolved" close(s1) close(s2) ``` [« Back to ToC](#table-of-contents) ### RPC and Distributed Computing `nanonext` implements remote procedure calls (RPC) using NNG's req/rep protocol to provide a basis for distributed computing. Can be used to perform computationally-expensive calculations or I/O-bound operations such as writing large amounts of data to disk in a separate 'server' process running concurrently. [S] Server process: `reply()` will wait for a message and apply a function, in this case `rnorm()`, before sending back the result. This is started in a background 'mirai' process. ``` r m <- mirai::mirai({ library(nanonext) rep <- socket("rep", listen = "tcp://127.0.0.1:6556") reply(context(rep), execute = rnorm, send_mode = "raw") Sys.sleep(2) # linger period to flush system socket send }) ``` [C] Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object. ``` r library(nanonext) req <- socket("req", dial = "tcp://127.0.0.1:6556") aio <- request(context(req), data = 1e8, recv_mode = "double") ``` At this point, the client can run additional code concurrent with the server processing the request. ``` r # do more... ``` When the result of the server calculation is required, the `recvAio` may be called using `call_aio()`. The return value from the server request is then retrieved and stored in the Aio as `$data`. ``` r call_aio(aio)$data |> str() #> num [1:100000000] -0.5043 0.3125 0.2775 -0.7714 -0.0508 ... ``` As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete. In this example the calculation is returned, but other operations may reside entirely on the server side, for example writing data to disk. In such a case, calling or querying the value confirms that the operation has completed, and provides the return value of the function, which may typically be NULL or an exit code. The [`mirai`](https://doi.org/10.5281/zenodo.7912722) package () uses `nanonext` as the back-end to provide asynchronous execution of arbitrary R code using the RPC model. [« Back to ToC](#table-of-contents) ### Synchronisation Primitives `nanonext` implements cross-platform synchronisation primitives provided by the NNG library. As the R interpreter runs on a single thread, synchronisation primitives such as mutexes and condition variables are not natively implemented in the R language. However, as NNG is inherently threaded and messaging can be asynchronous, it is possible to synchronise between NNG events and the main R execution thread. The events that can be signalled include asynchronous receive completions, and pipe events - when connections are established or dropped. Condition variables can be used simply to record such events, or more powerfully, to wait upon them. The condition variables implemented in `nanonext` include a both a condition (value) and flag (boolean). Each signal increments the value, and each successful return of `wait()` or `until()` decrements the value. A non-zero condition allows waiting threads to continue. In any situation where polling for an event presents a solution, waiting upon a condition to be signalled can be more efficient, both in terms of consuming no resources while waiting, and also being synchronised with the event (having no latency). The following shows how condition variables and signalling work in practice. Example 1: set up a socket, and wait for the other side to connect: ``` r sock <- socket("pair", listen = "inproc://nanopipe") cv <- cv() # create new condition variable cv_value(cv) #> [1] 0 pipe_notify(sock, cv = cv, add = TRUE, remove = TRUE) # wait(cv) # uncomment in normal usage - but would block # for illustration: sock2 <- socket("pair", dial = "inproc://nanopipe") cv_value(cv) # incremented when pipe to 'sock2' was created #> [1] 1 wait(cv) # wait() now does not block cv_value(cv) # wait() decrements the CV value - calling wait() again will block #> [1] 0 close(sock2) cv_value(cv) # incremented when pipe to 'sock2' was destroyed #> [1] 1 close(sock) ``` Example 2: wait until a message is received or connection is dropped: ``` r sock <- socket("pair", listen = "inproc://nanosignal") sock2 <- socket("pair", dial = "inproc://nanosignal") cv <- cv() # create new condition variable cv_value(cv) #> [1] 0 pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE) send(sock2, "this message will wake waiting thread") # in real usage happens concurrently with wait() #> [1] 0 r <- recv_aio(sock, cv = cv) # same cv passed to recv_aio() # wakes as soon as the asynchronous receive completes wait(cv) || stop("peer disconnected") #> [1] TRUE r$data #> [1] "this message will wake waiting thread" close(sock) close(sock2) ``` The above example shows the working of the flag within the condition variable. As the pipe notification was specified to raise a flag, this can be used to distinguish between a pipe event signal and a message receive signal. In the case a flag is raised, `wait()` returns FALSE rather than TRUE. So the above code will stop with the custom error message upon disconnect or else continue. This affords a way of handling disconnects that would not be possible if simply using `call_aio()`, which is also a blocking wait (on a single message). As can be seen, this type of mechanism presents a powerful way of waiting simulatenously on multiple events, and also distinguishing between them. `pipe_notify()` can also be set to signal two condition variables upon each event, providing even more flexibility in creating complex concurrent applications. For further details, please refer to the function documentation for `cv()`. [« Back to ToC](#table-of-contents) ### TLS Secure Connections Secure connections are enabled through the combination of NNG and Mbed TLS libraries. Authentication of endpoints and encryption of the TCP transport layer is achieved transparently by: i) Specifying a secure `tls+tcp://` or `wss://` URL, and ii) Passing a TLS configuration object to the 'tls' argument of `listen()` or `dial()`. A TLS configuration, or 'tlsConfig', object is created by the `tls_config()` function. Specify the argument 'client' to create a client configuration, and 'server' to create a server configuration. A client configuration requires a PEM-encoded CA certificate (chain) used to verify the server identity. A server configuration requires the certificate and associated private key. These may be supplied as files or directly as character vectors. Valid X.509 certificates generated via a Certificate Signing Request to a Certificate Authority are supported in this way. Additionally, the convenience function `write_cert()` can automatically generate a 4096 bit RSA key pair and self-signed X.509 certificate in the format required by `tls_config()`. The 'cn' argument must be provided and match exactly the hostname / IP address of the URL that is being used, e.g. in the example below '127.0.0.1' must be used throughout, or alternatively 'localhost', but not a mixture of the two. ``` r cert <- write_cert(cn = "127.0.0.1") str(cert) #> List of 2 #> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKQIBAAKCAgEAnqsWJ7vy38KuMIqzwRSEJffVPVNsi21FQlY4z4n5ipmP/k6F\n/mMhN28Ee/+j"| __truncated__ #> $ client: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "" ser <- tls_config(server = cert$server) ser #> < TLS server config | auth mode: optional > cli <- tls_config(client = cert$client) cli #> < TLS client config | auth mode: required > s <- socket(listen = "tls+tcp://127.0.0.1:5558", tls = ser) s1 <- socket(dial = "tls+tcp://127.0.0.1:5558", tls = cli) # secure TLS connection established close(s1) close(s) ``` [« Back to ToC](#table-of-contents) ### Publisher Subscriber Model `nanonext` fully implements NNG's pub/sub protocol as per the below example. A subscriber can subscribe to one or multiple topics broadcast by a publisher. ``` r pub <- socket("pub", listen = "inproc://nanobroadcast") sub <- socket("sub", dial = "inproc://nanobroadcast") sub |> subscribe(topic = "examples") pub |> send(c("examples", "this is an example"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples" "this is an example" pub |> send("examples at the start of a single text message", mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples at the start of a single text message" pub |> send(c("other", "this other topic will not be received"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> 'errorValue' int 8 | Try again # specify NULL to subscribe to ALL topics sub |> subscribe(topic = NULL) pub |> send(c("newTopic", "this is a new topic"), mode = "raw") #> [1] 0 sub |> recv("character") #> [1] "newTopic" "this is a new topic" sub |> unsubscribe(topic = NULL) pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw") #> [1] 0 sub |> recv("character") #> 'errorValue' int 8 | Try again # however the topics explicitly subscribed to are still received pub |> send(c("examples will still be received"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples will still be received" ``` The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received. ``` r sub |> subscribe(topic = 1) pub |> send(c(1, 10, 10, 20), mode = "raw") #> [1] 0 sub |> recv(mode = "double") #> [1] 1 10 10 20 pub |> send(c(2, 10, 10, 20), mode = "raw") #> [1] 0 sub |> recv(mode = "double") #> 'errorValue' int 8 | Try again close(pub) close(sub) ``` [« Back to ToC](#table-of-contents) ### Surveyor Respondent Model This type of pattern is useful for applications such as service discovery. A surveyor sends a survey, which is broadcast to all peer respondents. Respondents are then able to reply, but are not obliged to. The survey itself is a timed event, and responses received after the timeout are discarded. ``` r sur <- socket("surveyor", listen = "inproc://nanoservice") res1 <- socket("respondent", dial = "inproc://nanoservice") res2 <- socket("respondent", dial = "inproc://nanoservice") # sur sets a survey timeout, applying to this and subsequent surveys sur |> survey_time(value = 500) # sur sends a message and then requests 2 async receives sur |> send("service check") #> [1] 0 aio1 <- sur |> recv_aio() aio2 <- sur |> recv_aio() # res1 receives the message and replies using an aio send function res1 |> recv() #> [1] "service check" res1 |> send_aio("res1") # res2 receives the message but fails to reply res2 |> recv() #> [1] "service check" # checking the aio - only the first will have resolved aio1$data #> [1] "res1" aio2$data #> 'unresolved' logi NA # after the survey expires, the second resolves into a timeout error msleep(500) aio2$data #> 'errorValue' int 5 | Timed out close(sur) close(res1) close(res2) ``` Above, `msleep()` is an uninterruptible sleep function (utilising the NNG library), taking a time in milliseconds. It can be seen that the final value resolves into a timeout, which is an integer 5 classed as 'errorValue'. All integer error codes are classed as 'errorValue' to be easily distinguishable from integer message values. [« Back to ToC](#table-of-contents) ### ncurl: Async HTTP Client `ncurl()` is a minimalist http(s) client. `ncurl_aio()` is the async edition, performing requests asynchronously, returning immediately with an 'ncurlAio'. For normal use, it takes just the URL. It can follow redirects. ``` r ncurl("https://postman-echo.com/get") #> $status #> [1] 200 #> #> $headers #> NULL #> #> $data #> [1] "{\n \"args\": {},\n \"headers\": {\n \"host\": \"postman-echo.com\",\n \"x-request-start\": \"t=1724104783.065\",\n \"connection\": \"close\",\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"x-amzn-trace-id\": \"Root=1-66c3c04f-35f31c774acc1b8d6e4f150f\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}" ``` For advanced use, supports additional HTTP methods such as POST or PUT. ``` r res <- ncurl_aio("https://postman-echo.com/post", method = "POST", headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"), data = '{"key": "value"}', response = "date") res #> < ncurlAio | $status $headers $data > call_aio(res)$headers #> $date #> [1] "Mon, 19 Aug 2024 21:59:44 GMT" res$data #> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"host\": \"postman-echo.com\",\n \"x-request-start\": \"t=1724104784.075\",\n \"connection\": \"close\",\n \"content-length\": \"16\",\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"x-amzn-trace-id\": \"Root=1-66c3c04f-137ac65c00ca142929b0088a\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}" ``` In this respect, it may be used as a performant and lightweight method for making REST API requests. ##### ncurl Promises `ncurl_aio()` may also be used anywhere that accepts a ‘promise’ from the promises package, including with Shiny ExtendedTask. If a status code of 200 (OK) is returned then the promise is resolved with the reponse body, otherwise it is rejected with a translation of the status code or ‘errorValue’ as the case may be. ``` r library(promises) p <- ncurl_aio("https://postman-echo.com/get") %...>% cat is.promise(p) #> [1] TRUE ``` ##### ncurl Session `ncurl_session()` creates a re-usable open connection and presents a much faster and more efficient solution for repeated polling of an API endpoint. `transact()` is then used to request data multiple times as required. This method allows a polling frequency that exceeds a server's new connection limits, where this is permitted. By specifying `convert = FALSE`, the received binary data is made available as a raw vector. This may be fed into 'json' parsers which can operate directly on such data etc. ``` r sess <- ncurl_session("https://postman-echo.com/get", convert = FALSE, headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"), response = c("Date", "Content-Type")) sess #> < ncurlSession > - transact() to return data transact(sess) #> $status #> [1] 200 #> #> $headers #> $headers$Date #> [1] "Mon, 19 Aug 2024 21:59:44 GMT" #> #> $headers$`Content-Type` #> [1] "application/json; charset=utf-8" #> #> #> $data #> [1] 7b 0a 20 20 22 61 72 67 73 22 3a 20 7b 7d 2c 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 68 6f #> [39] 73 74 22 3a 20 22 70 6f 73 74 6d 61 6e 2d 65 63 68 6f 2e 63 6f 6d 22 2c 0a 20 20 20 20 22 78 2d 72 65 71 75 65 73 #> [77] 74 2d 73 74 61 72 74 22 3a 20 22 74 3d 31 37 32 34 31 30 34 37 38 34 2e 37 34 35 22 2c 0a 20 20 20 20 22 63 6f 6e #> [115] 6e 65 63 74 69 6f 6e 22 3a 20 22 63 6c 6f 73 65 22 2c 0a 20 20 20 20 22 78 2d 66 6f 72 77 61 72 64 65 64 2d 70 72 #> [153] 6f 74 6f 22 3a 20 22 68 74 74 70 73 22 2c 0a 20 20 20 20 22 78 2d 66 6f 72 77 61 72 64 65 64 2d 70 6f 72 74 22 3a #> [191] 20 22 34 34 33 22 2c 0a 20 20 20 20 22 78 2d 61 6d 7a 6e 2d 74 72 61 63 65 2d 69 64 22 3a 20 22 52 6f 6f 74 3d 31 #> [229] 2d 36 36 63 33 63 30 35 30 2d 34 61 36 33 31 30 63 36 34 61 37 35 63 31 39 37 35 65 61 37 62 31 36 39 22 2c 0a 20 #> [267] 20 20 20 22 63 6f 6e 74 65 6e 74 2d 74 79 70 65 22 3a 20 22 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 22 2c #> [305] 0a 20 20 20 20 22 61 75 74 68 6f 72 69 7a 61 74 69 6f 6e 22 3a 20 22 42 65 61 72 65 72 20 41 50 49 4b 45 59 22 0a #> [343] 20 20 7d 2c 0a 20 20 22 75 72 6c 22 3a 20 22 68 74 74 70 73 3a 2f 2f 70 6f 73 74 6d 61 6e 2d 65 63 68 6f 2e 63 6f #> [381] 6d 2f 67 65 74 22 0a 7d ``` [« Back to ToC](#table-of-contents) ### stream: Websocket Client `stream()` exposes NNG's low-level byte stream interface for communicating with raw sockets. This may be used for connecting to arbitrary non-NNG endpoints. The stream interface can be used to communicate with (secure) websocket servers. The argument `textframes = TRUE` can be specified where the websocket server uses text rather than binary frames. ``` r # connecting to an echo service s <- stream(dial = "wss://echo.websocket.events/", textframes = TRUE) s #> < nanoStream > #> - mode: dialer text frames #> - state: opened #> - url: wss://echo.websocket.events/ ``` `send()` and `recv()`, as well as their asynchronous counterparts `send_aio()` and `recv_aio()` can be used on Streams in the same way as Sockets. This affords a great deal of flexibility in ingesting and processing streaming data. ``` r s |> recv() #> [1] "echo.websocket.events sponsored by Lob.com" s |> send("initial message") #> [1] 0 s |> recv() #> [1] "initial message" s |> recv_aio() -> r s |> send("async message") #> [1] 0 s |> send("final message") #> [1] 0 s |> recv() #> [1] "final message" r$data #> [1] "async message" close(s) ``` [« Back to ToC](#table-of-contents) ### Options, Serialization and Statistics Use `opt()` and `'opt<-'()` to get and set options on a Socket, Context, Stream, Listener or Dialer. See the function documentation page for a list of common options. Once a dialer or listener has started, it is not generally possible to change its configuration. In this case, the dialer or listener should be created specifying 'autostart = FALSE'. ``` r s <- socket(listen = "inproc://options", autostart = FALSE) # no maximum message size opt(s$listener[[1]], "recv-size-max") #> [1] 0 # enfore maximum message size to protect against denial-of-service type attacks opt(s$listener[[1]], "recv-size-max") <- 8192L opt(s$listener[[1]], "recv-size-max") #> [1] 8192 start(s$listener[[1]]) ``` There is the special write-only option 'serial' for Sockets, which sets a serialization configuration returned by `serial_config()`. This registers custom functions to handle serialization and unserialization of reference objects, plugging into the 'refhook' system of native R serialization. This allows the transparent send and receive of such objects using mode 'serial' without the need for a separate 'marshalling' step. Once set, configurations apply to the Socket and all Contexts created from the Socket. ``` r serial <- serial_config("obj_class", function(x) serialize(x, NULL), unserialize) opt(s, "serial") <- serial close(s) ``` Similarly `stat()` has been implemented as the interface to NNG's statistics framework. This can be used on a Socket, Listener or Dialer to query useful statistics such as the total number of connection attempts, the current number of connections etc. See the function documentation page for available statistics. ``` r s <- socket(listen = "inproc://stat") # no active connections (pipes) stat(s, "pipes") #> [1] 0 s1 <- socket(dial = "inproc://stat") # one now that the dialer has conneceted stat(s, "pipes") #> [1] 1 close(s) ``` [« Back to ToC](#table-of-contents)