nanonext - Quick Reference

Core Concepts

nanonext provides bindings to NNG (Nanomsg Next Gen), a high-performance messaging library for building distributed systems.

This is a cheatsheet. Refer to the other vignettes for detailed introductions:

Key Takeaways

1. Sockets and Connections

Create Sockets

library(nanonext)

# Functional interface
s <- socket("pair")
listen(s, "tcp://127.0.0.1:5555")
dial(s, "tcp://127.0.0.1:5555")

# Object-oriented interface
n <- nano("pair", listen = "tcp://127.0.0.1:5555")
n$dial("tcp://127.0.0.1:5556")

# Close when done
close(s)
n$close()

Protocols

Protocol Description Socket Types
Pair 1-to-1 bidirectional "pair"
Poly Polyamorous pair "poly"
Pipeline One-way data flow "push", "pull"
Req/Rep RPC pattern "req", "rep"
Pub/Sub Broadcast/subscribe "pub", "sub"
Survey Query all peers "surveyor", "respondent"
Bus Many-to-many mesh "bus"

Transports

URL Scheme Description
inproc://name In-process (fastest, same process)
ipc:///path Inter-process (Unix socket / named pipe)
tcp://host:port TCP/IP network
ws://host:port/path WebSocket
wss://host:port/path WebSocket over TLS
tls+tcp://host:port TLS encrypted TCP

2. Send and Receive

Synchronous

# Send R object (serialized)
send(s, data.frame(a = 1, b = 2))

# Receive R object
recv(s)

# Send raw bytes (for cross-language exchange)
send(s, c(1.1, 2.2, 3.3), mode = "raw")

# Receive as specific type
recv(s, mode = "double")
recv(s, mode = "character")
recv(s, mode = "raw")

Receive Modes

Mode Description
"serial" / 1 R serialization (default)
"character" / 2 Coerce to character
"complex" / 3 Coerce to complex
"double" / 4 Coerce to double
"integer" / 5 Coerce to integer
"logical" / 6 Coerce to logical
"numeric" / 7 Coerce to numeric
"raw" / 8 Raw bytes
"string" / 9 Fast option for length-1 character

3. Async I/O

Basic Async

# Async send - returns immediately
res <- send_aio(s, data)
res$result          # 0 = success, error code otherwise

# Async receive - returns immediately
msg <- recv_aio(s)
msg$data            # Value when resolved, 'unresolved' NA otherwise

# Check if resolved
unresolved(msg)     # TRUE while pending

# Wait for resolution
call_aio(msg)       # Blocks, returns Aio object
collect_aio(msg)    # Blocks, returns value directly
msg[]               # Blocks (user-interruptible), returns value

Non-blocking Patterns

# Poll while doing other work
while (unresolved(msg)) {
  # do other tasks
}
result <- msg$data

# Multiple async operations
msg1 <- recv_aio(s1)
msg2 <- recv_aio(s2)
# Both run concurrently

4. Condition Variables

Basics

# Create condition variable
cv <- cv()

# Check/signal
cv_value(cv)        # Get counter value
cv_signal(cv)       # Increment counter
cv_reset(cv)        # Reset to zero

# Wait (blocks until counter > 0, then decrements)
wait(cv)

# Wait with timeout (ms), returns FALSE on timeout
until(cv, 1000)

Pipe Notifications

# Signal on connection/disconnection
pipe_notify(socket, cv = cv, add = TRUE, remove = TRUE)

# Distinguish message vs disconnect with flag
pipe_notify(socket, cv = cv, remove = TRUE, flag = TRUE)
r <- recv_aio(socket, cv = cv)
wait(cv) || stop("disconnected")  # FALSE = pipe event

Async with CV

cv <- cv()
msg <- recv_aio(s, cv = cv)
wait(cv)            # Wake on receive completion
msg$data

5. Request/Reply (RPC)

Server

rep <- socket("rep", listen = "tcp://127.0.0.1:5555")
ctx <- context(rep)

# reply() blocks, waiting for request
reply(ctx, execute = my_function, send_mode = "raw")

close(rep)

Client

req <- socket("req", dial = "tcp://127.0.0.1:5555")
ctx <- context(req)

# request() returns immediately
aio <- request(ctx, data = args, recv_mode = "double")

# Do other work while server processes...

# Get result when needed
result <- aio[]

close(req)

6. Pub/Sub

pub <- socket("pub", listen = "inproc://pubsub")
sub <- socket("sub", dial = "inproc://pubsub")

# Subscribe to topic (prefix matching)
subscribe(sub, topic = "news")
subscribe(sub, topic = NULL)      # All topics

# Unsubscribe
unsubscribe(sub, topic = "news")

# Publish (topic is message prefix)
send(pub, c("news", "headline"), mode = "raw")

# Receive (includes topic)
recv(sub, mode = "character")

close(pub)
close(sub)

7. Surveyor/Respondent

sur <- socket("surveyor", listen = "inproc://survey")
res1 <- socket("respondent", dial = "inproc://survey")
res2 <- socket("respondent", dial = "inproc://survey")

# Set survey timeout (ms)
survey_time(sur, 500)

# Broadcast survey
send(sur, "ping")

# Collect responses (async)
aio1 <- recv_aio(sur)
aio2 <- recv_aio(sur)

# Respondents reply
recv(res1)
send(res1, "pong1")

# Late/missing responses timeout (errorValue 5)
msleep(500)
aio2$data           # errorValue if no response

close(sur)
close(res1)
close(res2)

8. TLS Secure Connections

Self-signed Certificates

# Generate certificate (cn must match URL host exactly)
cert <- write_cert(cn = "127.0.0.1")

# Create TLS configs
server_tls <- tls_config(server = cert$server)
client_tls <- tls_config(client = cert$client)

# Use with tls+tcp:// or wss://
s1 <- socket(listen = "tls+tcp://127.0.0.1:5555", tls = server_tls)
s2 <- socket(dial = "tls+tcp://127.0.0.1:5555", tls = client_tls)

CA Certificates

# Client with CA cert file
client_tls <- tls_config(client = "/path/to/ca-cert.pem")

# Server with cert + key
server_tls <- tls_config(server = c("/path/to/cert.pem", "/path/to/key.pem"))

9. Options and Statistics

Get/Set Options

# Delayed start for configuration
s <- socket(listen = "tcp://127.0.0.1:5555", autostart = FALSE)

# Get option
opt(s$listener[[1]], "recv-size-max")

# Set option
opt(s$listener[[1]], "recv-size-max") <- 8192L

# Start after configuration
start(s$listener[[1]])

Common Options

Option Description
"recv-size-max" Max message size (0 = unlimited)
"send-timeout" Send timeout (ms)
"recv-timeout" Receive timeout (ms)
"reconnect-time-min" Min reconnect interval (ms)
"reconnect-time-max" Max reconnect interval (ms)
"req:resend-time" Request retry interval
"sub:prefnew" Prefer newer messages

Custom Serialization

# Register custom serializer for a class
serial <- serial_config(
  "class_name",
  function(x) serialize(x, NULL),  # serialize
  unserialize                      # unserialize
)
opt(socket, "serial") <- serial

Statistics

stat(socket, "pipes")      # Active connections
stat(listener, "accept")   # Connection attempts
stat(dialer, "reject")     # Rejected connections

10. Contexts

Contexts enable concurrent operations on a single socket (for req/rep, surveyor/respondent).

s <- socket("req", dial = "tcp://127.0.0.1:5555")

# Create independent contexts
ctx1 <- context(s)
ctx2 <- context(s)

# Concurrent requests
aio1 <- request(ctx1, data1)
aio2 <- request(ctx2, data2)

# Close contexts (or they close with socket)
close(ctx1)
close(ctx2)
close(s)

11. Cross-language Exchange

R to Python (NumPy)

# R: send raw doubles
n <- nano("pair", dial = "ipc:///tmp/nanonext")
n$send(c(1.1, 2.2, 3.3), mode = "raw")
result <- n$recv(mode = "double")
# Python: receive as NumPy array
import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext")
array = np.frombuffer(socket.recv())
socket.send(array.tobytes())

12. Error Handling

# Errors return as 'errorValue' class
result <- recv(s, block = FALSE)

# Check for errors
is_error_value(result)

# Error codes
# 5  = Timed out
# 6  = Connection refused
# 8  = Try again (non-blocking, no message)

# Get error message
nng_error(5)        # "Timed out"

13. Utilities

# Sleep (uninterruptible, ms)
msleep(100)

# Random bytes
random(8)                          # 8 random bytes as hex string
random(8, convert = FALSE)         # As raw vector

# Parse URL
parse_url("tcp://127.0.0.1:5555")