DslF[F, Unit]: composable, testable, replayable.
Scala 3 · functional runtime primitives · distributed systems
Build distributed programs as ordinary, composable values.
A small DSL surface and inspectable programs — the substrate that AI agents and humans can both reason about. Isolated processes, typed mailboxes, predictable scheduling, pluggable transports, and swappable distributed-systems algorithms. Describe the system first, then interpret it with the runtime backend you choose.
ParIO, or supply your own backend.
Introduction
Why Parapet
Actor toolkits make distributed work easy to start, but the resulting programs are operational artifacts: they run, but they cannot be passed around, inspected, or composed like ordinary values. Parapet keeps the actor model and adds the missing piece — handlers return data describing what should happen, not callbacks performing it.
That single decision unlocks the rest: you can lift a handler into a test harness, interpret it under any effect type, intercept and trace every event, run the same program on different runtimes, or evolve from a single JVM to a network without rewriting your domain code.
Process isolation
Each process owns a mailbox, processes one message at a time, and never shares mutable state.
Effect polymorphism
Programs are generic in F[_]. Pick Cats Effect for production, ParIO for learning.
Composable DSL
Send, fork, race, suspend, offload — all return values that can be sequenced and combined.
First-class failure
Errors are Failure events. Supervision and recovery use the same model as normal handling.
Why now · 2026
AI writes code. Concurrent code is the part it gets wrong.
Modern LLMs are confident, fluent, and frequently wrong about ordering, races, and partial failure — the exact properties that distributed systems are made of. The response is not to write more imperative scaffolding for the agent to copy; it is to give the agent a substrate whose legal moves are few, named, and inspectable, so the things it generates are things you can review, replay, and prove.
Parapet is built around that constraint. The DSL has roughly a dozen operators. A handler returns a value, not a side effect. Two engineers — or one engineer and one agent — can read the same program and agree on what it does. See how AI-assisted teams use this →
Paths · For AI-assisted teams
Coding with AI agents
The bet is simple: an agent that can only invoke a small set of named operators generates fewer wrong programs than one let loose on raw threads, futures, and ad-hoc message buses. Parapet is that small set.
Constrained surface
About a dozen DSL operators (send, fork, race, offload, …). No bare threads, no shared mutable state. Less room to be creative in the wrong direction.
Reviewable output
A handler is a value. The PR diff shows the program, not the side effects. Easier to read than imperative IO.race(…).flatMap(…) blobs.
Failure is in scope
The interpreter can drop, delay, duplicate, or partition messages. Generated systems become testable without containers — exactly the loop an agent can run on its own.
On the roadmap: a
parapet-agent-pack — system prompt, .cursorrules /
AGENTS.md, and a curated set of reference processes (counter, gossip ring,
sharded KV) — so any AI assistant working in a Parapet repo writes correct code from the
first prompt.
Start with First process → and DSL cheatsheet →. Then jump to Errors are events → to see how recovery composes.
Paths · For Scala FP engineers
From Akka Typed & cats-effect
If you've reached for akka-typed for typed actors or for
cats-effect alone for concurrency, Parapet sits between them: actor
semantics with cats-effect's F[_] polymorphism and a DSL you can interpret
differently for tests, tracing, or replay.
| Concern | Akka Typed | cats-effect alone | Parapet |
|---|---|---|---|
| Process / actor isolation | Yes | You build it | Yes (first-class) |
Effect-polymorphic in F[_] | No | Yes | Yes |
| Programs as inspectable values | No | Partially | Yes (Free over FlowOp) |
| Pluggable interpreter (tracing, replay) | No | No | Yes |
| Typed mailboxes | Yes | — | Typed process protocols and refs |
| Built-in distributed primitives | Yes (heavy) | — | Yes (small, composable) |
Start with Composable DSL →, then
The free encoding →, and finally
Add a custom FlowOp → to see how the substrate
extends without forking the core.
Paths · For data & infra teams
From Flink, Akka, Kafka Connect
The big distributed substrates share a shape: processes / operators, supervision, transports, and a fixed bundle of guarantees baked in. Parapet keeps the shape but unbundles the guarantees — broadcast, replication, failure detection, and consensus are interchangeable connectors, not load-bearing parts of the runtime.
Swap the algorithm, not the app
Move from gossip to total-order broadcast, or from a heartbeat detector to phi-accrual, without touching domain code. Same operator surface, different guarantee.
Build up, not down
Express your system at the level of processes that exchange events. The runtime, transport, and replication strategy are decisions you can defer or revisit.
Smaller surface
No cluster controller, no operator graph runtime, no required deployment topology. The pieces are libraries; you assemble what you need.
Start with Transport traits → and Module map →, then read the Roadmap → to see where reliable channels, sharding, and dataflow fit.
Introduction
Install
Add parapet-core to model processes. Add a runtime integration —
parapet-cats-effect for production, parapet-pario for the
reference runtime — when you want to run an application.
libraryDependencies ++= Seq(
"io.parapet" %% "parapet-core" % version,
"io.parapet" %% "parapet-cats-effect" % version
)
For snapshots, enable Sonatype Central snapshots:
resolvers += "central-snapshots" at
"https://central.sonatype.com/repository/maven-snapshots/"
Introduction
First process
A process declares its API as events and reacts to them in handle. The
handler is generic in F[_], so the same process can be interpreted by any
runtime that provides the required capabilities.
ProcessRef is the address. Keep wiring late-bound by passing refs rather
than process instances.
import io.parapet.{Event, ProcessRef}
import io.parapet.core.Process
import io.parapet.core.Events.Start
class Printer[F[_]] extends Process[F, Printer.Print, Nothing]:
import Printer.*
import dsl.*
override def handle: Receive =
case Print(data) => eval(println(data))
object Printer:
final case class Print(data: Any) extends Event
class Greeter[F[_]](printer: ProcessRef[Printer.Print]) extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start => Printer.Print("hello world") ~> printer
Introduction
Run an app
Specialize at the application boundary. For production Scala FP applications, mix
CatsEffectParApp.
processes returns an effect that yields the seed population — anything else
you need (channels, child processes) can be registered from inside Start.
import cats.effect.IO
import io.parapet.cats.CatsEffectParApp
import io.parapet.core.Process
object HelloApp extends CatsEffectParApp:
def processes(args: Array[String]): IO[Seq[Process[IO, ?, ?]]] =
IO.delay {
val printer = new Printer[IO]
val greeter = new Greeter[IO](printer.ref)
Seq(printer, greeter)
}
Learn
The F[_] algebra
F[_] is a type constructor: it takes a type and produces another
one. F[Int] means "a description of a computation that, when interpreted,
produces an Int". The shape of that description is left abstract on purpose
— different effect runtimes encode it differently:
cats.effect.IO[A]— Cats Effect's asynchronous, cancellable runtime.io.parapet.effect.ParIO[A]— Parapet's reference runtime.- Your own monad, as long as you can provide the required type-class instances.
Parapet code is written against F[_] + a small set of capability traits.
That keeps domain logic decoupled from any one runtime, and turns the choice of effect
system into an end-of-the-world decision instead of one that bleeds into every module.
Learn
Effect & effect systems
An effect is anything that interacts with the outside world or changes observable state: printing, reading a file, sleeping, sending a network packet, mutating a counter. A pure function only computes a value from its inputs.
An effect system is the part of a runtime that lets you build, combine,
and execute descriptions of effects without performing them eagerly. In Parapet this is
the Effect[F] type-class plus its companions:
trait Effect[F[_]]:
def pure[A](value: A): F[A]
def delay[A](thunk: => A): F[A]
def suspend[A](thunk: => F[A]): F[A]
def blocking[A](thunk: => A): F[A]
def raiseError[A](error: Throwable): F[A]
extension [A](fa: F[A])
def flatMap[B](f: A => F[B]): F[B]
def map[B](f: A => B): F[B]
def handleErrorWith(f: Throwable => F[A]): F[A]
A process never calls Effect directly. It produces a DslF[F, Unit]
program — the runtime's interpreter is the one that lifts each FlowOp into
an F-effect.
Learn
Composable DSL
Inside a handler, you build a small program using DSL operators such as
send, fork, delay, and race. These
operators do not perform anything on their own — they return values describing what
should happen, which the runtime later executes.
// A program: "send Ping to peer, wait 100ms, send Pong to peer"
val program: DslF[F, Unit] =
Ping ~> peer ++ delay(100.millis) ++ Pong ~> peer
Because the program is a value, you can store it, log it, test it under a fake
interpreter, return it from a function that takes parameters, or compose it with another
program with ++. This is the headline benefit of an effect system: behavior
becomes data.
Learn
The free encoding
Under the hood, a DslF[F, A] is a Free[FlowOp[F, *], A] — an
immutable tree of FlowOp nodes leading to a final result. The
DslInterpreter walks that tree and turns each node into an F-
effect that the chosen runtime can execute.
You do not need to know category theory to use Parapet. The encoding matters because it guarantees three properties:
- No accidental side effects at construction — building a program does not run it.
- Substitution works — equal programs produce equal effects.
- Inspection is possible — interpreters can rewrite, trace, or reorder.
Learn
Add a custom FlowOp
The DSL is open: you can mix the parapet algebra with your own and route the extra cases through a tailor-made interpreter. The pattern is three steps — declare the new op, expose a smart constructor, then teach the interpreter how to run it.
// 1. Declare the op as a data type — pure, no behavior yet.
import io.parapet.free.{Free, Inject}
sealed trait MetricOp[F[_], A]
final case class Counter[F[_]](name: String, delta: Long)
extends MetricOp[F, Unit]
final case class Timer[F[_], A](name: String, body: Free[MetricOp[F, _], A])
extends MetricOp[F, A]
// 2. A smart constructor that lifts the op into your program type.
final class MetricOps[F[_], C[_]](using inject: Inject[[x] =>> MetricOp[F, x], C]):
def increment(name: String, delta: Long = 1L): Free[C, Unit] =
Free.inject[[x] =>> MetricOp[F, x], C, Unit](Counter[F](name, delta))
def time[A](name: String)(body: Free[MetricOp[F, _], A]): Free[C, A] =
Free.inject[[x] =>> MetricOp[F, x], C, A](Timer[F, A](name, body))
Programs that mix your algebra with the built-in one now type-check naturally. They are still pure values; the runtime decides what each node means.
Learn
Extend the interpreter
A custom interpreter is a FunctionK from your algebra to F.
Provide one that handles the new nodes; everything else delegates to the default Parapet
interpreter. Override ParApp.interpreter (or your runtime equivalent) to
plug it in.
import io.parapet.core.DslInterpreter.Interpreter
import io.parapet.free.{FunctionK, ~>}
final class MetricInterpreter[F[_]](underlying: Interpreter[F])(using metrics: Metrics[F])
extends Interpreter[F]:
def interpret(sender: ProcessRef.Unknown, target: ProcessRef.Unknown, trace: ExecutionTrace) =
new FunctionK[[x] =>> MetricOp[F, x], F]:
def apply[A](op: MetricOp[F, A]): F[A] = op match
case Counter(name, delta) => metrics.add(name, delta)
case Timer(name, body) => metrics.time(name)(
body.foldMap(interpret(sender, target, trace))
)
// ... delegate the two other interpret(...) overloads to `underlying`.
Custom interpreters are the right place for cross-cutting concerns that do not belong in domain handlers: structured logging, distributed tracing, metrics, audit, replay.
Core concepts
Process model
Isolated state
Each process owns its mailbox. The runtime serializes handler invocations per process.
Typed protocol
A process declares the domain events it accepts, and refs carry that protocol through wiring.
Supervision graph
register sets up parent / child relations. Stop cascades down.
Core concepts
Typed refs
Declare a process protocol with Process[F, In, Out], then pass around
ProcessRef[In] to keep wiring explicit.
final case class Save(key: String, value: String) extends Event
class Store[F[_]] extends Process[F, Save, Nothing]:
import dsl.*
override def handle: Receive =
case Save(key, value) => eval(println(s"saved $key=$value"))
case Stop => eval(println("closing store"))
class Writer[F[_]](store: ProcessRef[Save]) extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start => Save("hello", "world") ~> store
Core concepts
Process lifecycle
Every process moves through the same four-stage lifecycle, driven by ordinary events the handler can observe and react to.
- Registration. The application returns a process from
processes(args), or another process callsregister(parent, child). The runtime allocates a mailbox and wires the supervision graph. - Start. The runtime delivers
Events.Startas the first event. Use it to send "ready" notifications, register children, open resources. - Running. Events are delivered in mailbox order;
handleis invoked sequentially. Failures inside the handler are converted into aFailureevent for the sender. - Stop. On graceful shutdown the runtime first stops every child, then delivers
Events.Stopto the parent.Killtears the process down without runninghandle.
class Server[F[_]] extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start =>
eval(println("opening port 8080"))
case Stop =>
eval(println("draining and closing"))
Core concepts
System events
System events live in io.parapet.core.Events. They appear in handlers just
like domain events, so there are no special hooks to learn.
| Event | When it is delivered |
|---|---|
Start | Once, immediately after registration. Use for setup and to send "ready" pings. |
Stop | Once, during graceful shutdown. Children receive Stop before their parent. |
Kill | Forces termination without invoking handle. The mailbox is discarded. |
Failure(envelope, error) | Routed back to the original sender when the receiver's handler raised. The sender can recover, retry, or escalate; unhandled failures become dead letters. |
DeadLetter(envelope, error) | Wraps an envelope that could not be delivered (unknown receiver, terminated process, unhandled Failure). Routed to the configured DeadLetterProcess. |
Core concepts
Switching behavior
A process can replace its active receive function with switch(newReceive).
The swap is synchronous within the current program, so it composes naturally with
state-machine flows.
class Door[F[_]] extends Process[F, Event, Nothing]:
import dsl.*
private def closed: Receive =
case Open => eval(println("opening")) ++ switch(opened)
case Knock => eval(println("knock knock"))
private def opened: Receive =
case Close => eval(println("closing")) ++ switch(closed)
case Enter => eval(println("welcome"))
override def handle: Receive = closed
Core concepts
Process combinators
Processes themselves compose: a ++ b (alias a.and(b)) creates a
process that runs both handlers when both can handle the event; a.or(b)
tries a first and falls back to b. The resulting process
inherits the left-hand ref and name.
class Core[F[_]] extends Process[F, Ping.type, Pong.type]:
override def handle: Receive =
case Ping => reply(Pong)
val core = new Core[F]
val metrics = Process[F](_ => { case Ping => eval(counter.incrementAndGet()) })
val instrumented = core ++ metrics // same ref as `core`
Core concepts
Channel
Channel turns the asynchronous mailbox model into a strict one-call-at-a-time
dialog. It is useful when a handler needs a direct response before continuing, without
inventing correlation ids by hand.
val client: ProcessRef[Event] = ProcessRef("client")
val ch = new Channel[F](client)
val flow =
register(client, ch) ++
ch.send(Request("ping"), server.ref).flatMap {
case scala.util.Success(Response(data)) => eval(println(s"got $data"))
case scala.util.Failure(err) => eval(println(s"failed: $err"))
}
A channel rejects new requests while a request is in flight, which keeps the send-then-await pattern safe without locks.
Core concepts
Errors are events
When a handler raises, the runtime routes a Failure(envelope, error) event
back to the original sender. Senders that handle Failure can retry,
escalate, or convert the error into a domain event; senders that do not see one route
the failure to the dead-letter sink.
class Client[F[_]](server: ProcessRef[Request]) extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start =>
Request("hello") ~> server
case Failure(env, err) =>
eval(println(s"server $server failed: ${err.getMessage}; retrying")) ++
env.event ~> env.receiver
Core concepts
Dead-letter
A DeadLetter wraps an envelope the runtime could not deliver — the receiver
does not exist, has been terminated, or returned a Failure the sender did not
handle. Dead letters are routed to a single well-known process whose ref is
ProcessRef.DeadLetterRef.
The default implementation logs every dead letter with structured fields. Override
ParApp.deadLetter to plug in your own — typical replacements push to a
message bus, surface metrics, or persist the envelope for replay.
import io.parapet.core.processes.DeadLetterProcess
class AlertingDeadLetters[F[_]] extends DeadLetterProcess[F]:
import dsl.*
override val handle: Receive = {
case DeadLetter(env, err) =>
eval(metrics.deadLetters.increment()) ++
alerts.publish(env, err)
}
object MyApp extends CatsEffectParApp:
override def deadLetter: IO[DeadLetterProcess[IO]] =
IO.pure(new AlertingDeadLetters[IO])
def processes(args: Array[String]): IO[Seq[Process[IO, ?, ?]]] = ...
Core concepts
Configuration
ParConfig tunes the runtime: default mailbox capacity, scheduler thread
count, dev-mode logging, tracing, and the optional in-memory event log used for
replay.
override def config =
ParConfig.default
.withProcessBufferSize(1024)
.withWorkerCount(8)
.enableTracing
.enableEventLog
DSL reference
Cheatsheet
| Operator | Purpose |
|---|---|
unit / pure(a) | Do nothing / lift a value. |
flow { ... } | Defer construction (for recursion). |
event ~> ref | Send an event. |
forward(event, ref) | Send while preserving the original sender. |
reply(event) | Reply to the sender of the current event. |
eval { ... } | Lift a plain side-effecting computation. |
suspend(fa) | Lift an existing F-effect. |
delay(d) | Sleep for d without blocking a thread. |
fork(flow) | Start a fiber; returns a join/cancel handle. |
par(p1, p2, ...) | Fork each flow and return a list of fibers. |
race(p1, p2) | Run both, return the winner, cancel the loser. |
offload(flow) | Run in the background; gate next mailbox event. |
register(parent, child) | Add a supervised child. |
halt(ref) | Stop a process and its descendants. |
handleError(body, recover) | DSL try / catch. |
guarantee(body, finalizer) | DSL try / finally. |
switch(receive) | Replace the active receive function. |
DSL reference
unit · pure · flow
unit is a program that does nothing — useful as a fold seed or to give a
handler clause an explicit "do nothing" body. pure(a) lifts a value into a
program. flow { ... } defers construction; use it for recursive flows so
the recursion happens at interpretation time instead of construction time.
// Empty branches.
{
case Start => unit
case Stop => unit
}
// Folding over a list.
processes
.map(event ~> _)
.foldLeft(unit)(_ ++ _)
// Safe recursion.
def times[F[_]](n: Int): DslF[F, Unit] = flow {
if n == 0 then unit
else eval(print(n)) ++ times(n - 1)
}
DSL reference
send · ~>
Sends an event to one or more receivers. The current process is recorded as the sender.
~> is the symbolic alias for the single-receiver case. Events are
constructed lazily, so the runtime can elide construction when delivery is impossible.
The compiler checks that the receiver accepts the event type.
// Single receiver.
Ping ~> peer
// Batch sequence to one receiver.
Seq(Step1, Step2, Step3) ~> pipeline
// Fan-out, ordered enqueue across receivers.
send(Ping, processA, processB, processC)
DSL reference
forward
Forwards an event while preserving the sender of the message currently being handled. The standard building block for proxy processes.
class Server[F[_]] extends Process[F, Request, Response]:
override def handle: Receive =
case Request(body) => reply(Response(s"server/$body"))
val server = new Server[F]
val proxy = Process[F](_ => {
case Request(body) =>
forward(Request(s"proxy/$body"), server.ref)
})
// Replies from server go back to the original sender, not to proxy.
DSL reference
reply
reply sends an event back to the sender of the message currently being handled.
The event is checked against the process's declared output protocol.
class Echo[F[_]] extends Process[F, Request, Response]:
override def handle: Receive =
case Request(data) => reply(Response(s"echo: $data"))
// Batch reply preserves order.
class Acker[F[_]] extends Process[F, Batch, Ack]:
override def handle: Receive =
case Batch(items) => reply(items.map(Ack(_)))
DSL reference
eval · suspend
eval lifts a plain side-effecting expression into the DSL.
suspend lifts an existing F-effect. Reach for
suspend only when you already hold an F[A]; eval
covers everything else.
// Plain side effect.
eval(println("hello"))
// Lift an existing F-effect.
suspend(database.fetch(id))
// Capture a result.
for
value <- eval(System.currentTimeMillis())
_ <- eval(println(s"at $value"))
yield ()
DSL reference
delay
Suspends subsequent steps for duration without blocking a thread on
runtimes that support async sleep. Inside par, only the first step is
delayed; wrap each branch to delay every one individually.
// Delay once before the whole block.
delay(2.seconds) ++ Tick ~> clock
// Delay every parallel branch.
par(
delay(1.second) ++ eval(print("a")),
delay(2.seconds) ++ eval(print("b"))
)
DSL reference
fork · par
fork(flow) starts a fiber and returns its handle. par(p1, p2, ...)
forks each program in parallel and gives back a list of handles. Use
fiber.join to await a value and fiber.cancel to abort.
for
fiber <- fork(eval(computeExpensiveResult()))
// ... other work ...
result <- fiber.join
_ <- eval(println(s"got $result"))
yield ()
// Run independent steps concurrently.
par(eval(print("1")), eval(print("2")), eval(print("3")))
DSL reference
race
Runs two flows concurrently and returns whichever wins; the loser is cancelled. The canonical pattern for timeouts and fastest-responder fan-out.
// Timeout pattern.
race(
server.send(Request, _ => unit),
delay(5.seconds) ++ eval(println("server timed out"))
)
// Fastest replica wins.
race(
ch.send(Request, replicaA),
ch.send(Request, replicaB)
)
DSL reference
offload
Runs flow on a background fiber so it does not stall scheduler workers.
Subsequent steps in the same handler may proceed immediately, but the process will not
accept its next mailbox event until all offloaded work has completed.
class IndexingProcess[F[_]](store: Store[F]) extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Index(record) =>
offload(suspend(store.write(record)))
case Stop =>
eval(println("done indexing"))
DSL reference
register · halt
register(parent, child) adds a child to the supervision graph; children
receive Stop before their parent. halt(ref) stops a process
and cascades to its descendants.
class Pool[F[_]] extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start =>
register(ref,
new Worker[F]("worker-1"),
new Worker[F]("worker-2")
)
case Shutdown =>
halt(ref)
DSL reference
handleError · guarantee
handleError is DSL try / catch; guarantee is DSL
try / finally. Use them to keep recovery and cleanup inside the program
value rather than the underlying effect.
handleError(
eval(throw RuntimeException("boom")),
err => eval(println(s"recovered from ${err.getMessage}"))
)
guarantee(
eval(openFile()),
eval(closeFile())
)
Network
Transport traits
parapet-net separates low-level byte transport from process-level events.
Today it ships TCP request/reply over ZMQ and UDP-style datagrams over Aeron; the same
process adapters can also wrap your own transport.
| Trait | Shape |
|---|---|
ClientTransport[F] | Request/reply client: request(Message) => F[Either[TransportError, Message]]. |
ServerTransport[F] | Polls RoutedMessages and replies with reply(routingId, message). |
DatagramTransport[F] | UDP-style fan-out: publish(Message) / receiveBatch(limit). |
The adapter processes in io.parapet.net.processes lift those traits into the
process world: ClientProcess, ServerProcess, and
DatagramProcess. Application code sends typed events; transport details stay
at the edge.
Network
TCP echo server
ServerProcess wraps a ServerTransport. It receives routed
messages from the transport and forwards them to a sink process as
ServerProcess.Received. To answer, send ServerProcess.Reply
back to the server process.
import io.parapet.ProcessRef
import io.parapet.core.Process
import io.parapet.net.processes.ServerProcess
class Echo[F[_]](
server: ProcessRef[ServerProcess.Reply]
) extends Process[F, ServerProcess.Received | ServerProcess.Failed, Nothing]:
import dsl.*
override def handle: Receive =
case ServerProcess.Received(routingId, message) =>
ServerProcess.Reply(routingId, message) ~> server
case ServerProcess.Failed(error) =>
eval(println(s"server failed: $error"))
val echoRef = ProcessRef[ServerProcess.Received | ServerProcess.Failed]("echo")
val server = new ServerProcess[F](transport, sink = echoRef)
val echo = new Echo[F](server.ref):
override val ref = echoRef
Network
TCP ping client
ClientProcess wraps a ClientTransport. Send
ClientProcess.Request to it and it replies to the sender with either
ClientProcess.Response or ClientProcess.Failed.
import io.parapet.ProcessRef
import io.parapet.core.Events.Start
import io.parapet.core.Process
import io.parapet.net.processes.ClientProcess
import io.parapet.net.transport.Message
class Caller[F[_]](
client: ProcessRef[ClientProcess.Request]
) extends Process[F, ClientProcess.Response | ClientProcess.Failed, Nothing]:
import dsl.*
override def handle: Receive =
case Start =>
ClientProcess.Request(Message.single("ping".getBytes("UTF-8"))) ~> client
case ClientProcess.Response(message) =>
eval(println(s"got ${new String(message.parts.head, "UTF-8")}"))
case ClientProcess.Failed(error) =>
eval(println(s"request failed: $error"))
Network
ZMQ wiring
ZMQ is the current TCP backend. Create the low-level transport at the edge, then register ordinary Parapet processes around it.
import cats.effect.IO
import io.parapet.cats.CatsEffectParApp
import io.parapet.ProcessRef
import io.parapet.net.{Endpoint, TransportProtocol}
import io.parapet.net.processes.{ClientProcess, ServerProcess}
import io.parapet.net.transport.zmq.*
object ServerApp extends CatsEffectParApp:
def processes(args: Array[String]) =
val echoRef = ProcessRef[ServerProcess.Received | ServerProcess.Failed]("echo")
val config = ZmqTcpServerConfig(
bind = Endpoint(TransportProtocol.Tcp, "*", 5555)
)
ZmqTcpServer.make[IO](config).allocated.map { case (transport, _) =>
val server = new ServerProcess[IO](transport, sink = echoRef)
val echo = new Echo[IO](server.ref):
override val ref = echoRef
Seq(echo, server)
}
object ClientApp extends CatsEffectParApp:
def processes(args: Array[String]) =
val config = ZmqTcpClientConfig(
remote = Endpoint(TransportProtocol.Tcp, "server", 5555)
)
ZmqTcpClient.make[IO](config).allocated.map { case (transport, _) =>
val client = new ClientProcess[IO](transport)
val caller = new Caller[IO](client.ref)
Seq(client, caller)
}
In production code, keep the returned Resource alive for the application
lifetime so sockets are closed cleanly on shutdown.
Network
UDP with Aeron
AeronUdpTransport implements DatagramTransport.
DatagramProcess publishes outbound datagrams and forwards received batches
to a sink as DatagramProcess.Received.
import io.parapet.{Event, ProcessRef}
import io.parapet.core.Events.Start
import io.parapet.core.Process
import io.parapet.net.processes.DatagramProcess
import io.parapet.net.transport.Message
import io.parapet.net.transport.aeron.*
class Publisher[F[_]](
datagram: ProcessRef[DatagramProcess.Publish]
) extends Process[F, Event, Nothing]:
import dsl.*
override def handle: Receive =
case Start =>
DatagramProcess.Publish(
Message.single("hello".getBytes("UTF-8"))
) ~> datagram
class UdpSink[F[_]]
extends Process[F, DatagramProcess.Received | DatagramProcess.Failed, Nothing]:
import dsl.*
override def handle: Receive =
case DatagramProcess.Received(message) =>
eval(println(s"udp: ${new String(message.parts.head, "UTF-8")}"))
case DatagramProcess.Failed(error) =>
eval(println(s"udp failed: $error"))
val config = AeronUdpConfig(
channel = "aeron:udp?endpoint=localhost:40123",
streamId = 1001,
embeddedMediaDriver = true
)
for transport <- AeronUdpTransport.make[F](config).allocated.map(_._1)
yield
val sink = new UdpSink[F]
val datagram = new DatagramProcess[F](transport, sink.ref)
val publisher = new Publisher[F](datagram.ref)
Seq(datagram, sink, publisher)
Modules
Module map
IO.Modules
Runtime choice
Use parapet-cats-effect when shipping. Use parapet-pario when
you want to read the runtime, write small examples, or validate a new backend.
Both implement the same capability contracts (Effect,
Parallel, SchedulerRuntime), so handlers are portable across
them with no code changes.
// Production
import io.parapet.cats.CatsEffectParApp
object App extends CatsEffectParApp:
def processes(args: Array[String]) = IO.pure(Seq(...))
// Reference runtime
import io.parapet.ParIOApp
import io.parapet.effect.ParIO
object Sandbox extends ParIOApp:
def processes(args: Array[String]): ParIO[Seq[Process[ParIO, ?, ?]]] =
ParIO.pure(Seq(...))
Modules
Testing
Because handlers are values, you can step a program through a test interpreter and
assert on the emitted events instead of standing up a real network. The shared
conformance specs in parapet-testkit are the same suite each backend has to
satisfy — they are also a useful checklist when writing your own scenarios.
Project
Roadmap
Parapet evolves in layers. Each layer is independently useful; later layers compose what earlier ones expose. The guiding principle: small composable behaviours, not a monolithic framework.
The full plan, including the next milestone (CRDT toolkit + anti-entropy gossiper, then
sharding + cluster singleton) and anti-goals, lives in
roadmap/ROADMAP.md.
Project
Status
Parapet is actively evolving. The core primitives are in place; higher-level modules such as reliable channels, failure detectors, broadcast, gossip, CRDTs, sharding, and observability grow independently.