Scala 3 · functional runtime primitives · distributed systems

Build distributed programs as ordinary, composable values.

A small DSL surface and inspectable programs — the substrate that AI agents and humans can both reason about. Isolated processes, typed mailboxes, predictable scheduling, pluggable transports, and swappable distributed-systems algorithms. Describe the system first, then interpret it with the runtime backend you choose.

Programs are values Handlers produce DslF[F, Unit]: composable, testable, replayable.
Pure actor semantics One process, one mailbox, sequential handling, no shared mutable state.
Runtime agnostic Use Cats Effect today, study ParIO, or supply your own backend.

Introduction

Why Parapet

Actor toolkits make distributed work easy to start, but the resulting programs are operational artifacts: they run, but they cannot be passed around, inspected, or composed like ordinary values. Parapet keeps the actor model and adds the missing piece — handlers return data describing what should happen, not callbacks performing it.

That single decision unlocks the rest: you can lift a handler into a test harness, interpret it under any effect type, intercept and trace every event, run the same program on different runtimes, or evolve from a single JVM to a network without rewriting your domain code.

Process isolation

Each process owns a mailbox, processes one message at a time, and never shares mutable state.

Effect polymorphism

Programs are generic in F[_]. Pick Cats Effect for production, ParIO for learning.

Composable DSL

Send, fork, race, suspend, offload — all return values that can be sequenced and combined.

First-class failure

Errors are Failure events. Supervision and recovery use the same model as normal handling.

Why now · 2026

AI writes code. Concurrent code is the part it gets wrong.

Modern LLMs are confident, fluent, and frequently wrong about ordering, races, and partial failure — the exact properties that distributed systems are made of. The response is not to write more imperative scaffolding for the agent to copy; it is to give the agent a substrate whose legal moves are few, named, and inspectable, so the things it generates are things you can review, replay, and prove.

Parapet is built around that constraint. The DSL has roughly a dozen operators. A handler returns a value, not a side effect. Two engineers — or one engineer and one agent — can read the same program and agree on what it does. See how AI-assisted teams use this →

Paths · For AI-assisted teams

Coding with AI agents

The bet is simple: an agent that can only invoke a small set of named operators generates fewer wrong programs than one let loose on raw threads, futures, and ad-hoc message buses. Parapet is that small set.

Constrained surface

About a dozen DSL operators (send, fork, race, offload, …). No bare threads, no shared mutable state. Less room to be creative in the wrong direction.

Reviewable output

A handler is a value. The PR diff shows the program, not the side effects. Easier to read than imperative IO.race(…).flatMap(…) blobs.

Failure is in scope

The interpreter can drop, delay, duplicate, or partition messages. Generated systems become testable without containers — exactly the loop an agent can run on its own.

On the roadmap: a parapet-agent-pack — system prompt, .cursorrules / AGENTS.md, and a curated set of reference processes (counter, gossip ring, sharded KV) — so any AI assistant working in a Parapet repo writes correct code from the first prompt.

Start with First process → and DSL cheatsheet →. Then jump to Errors are events → to see how recovery composes.

Paths · For Scala FP engineers

From Akka Typed & cats-effect

If you've reached for akka-typed for typed actors or for cats-effect alone for concurrency, Parapet sits between them: actor semantics with cats-effect's F[_] polymorphism and a DSL you can interpret differently for tests, tracing, or replay.

ConcernAkka Typedcats-effect aloneParapet
Process / actor isolationYesYou build itYes (first-class)
Effect-polymorphic in F[_]NoYesYes
Programs as inspectable valuesNoPartiallyYes (Free over FlowOp)
Pluggable interpreter (tracing, replay)NoNoYes
Typed mailboxesYesTyped process protocols and refs
Built-in distributed primitivesYes (heavy)Yes (small, composable)

Start with Composable DSL →, then The free encoding →, and finally Add a custom FlowOp to see how the substrate extends without forking the core.

Paths · For data & infra teams

From Flink, Akka, Kafka Connect

The big distributed substrates share a shape: processes / operators, supervision, transports, and a fixed bundle of guarantees baked in. Parapet keeps the shape but unbundles the guarantees — broadcast, replication, failure detection, and consensus are interchangeable connectors, not load-bearing parts of the runtime.

Swap the algorithm, not the app

Move from gossip to total-order broadcast, or from a heartbeat detector to phi-accrual, without touching domain code. Same operator surface, different guarantee.

Build up, not down

Express your system at the level of processes that exchange events. The runtime, transport, and replication strategy are decisions you can defer or revisit.

Smaller surface

No cluster controller, no operator graph runtime, no required deployment topology. The pieces are libraries; you assemble what you need.

Start with Transport traits → and Module map →, then read the Roadmap → to see where reliable channels, sharding, and dataflow fit.

Introduction

Install

Add parapet-core to model processes. Add a runtime integration — parapet-cats-effect for production, parapet-pario for the reference runtime — when you want to run an application.

libraryDependencies ++= Seq(
  "io.parapet" %% "parapet-core"        % version,
  "io.parapet" %% "parapet-cats-effect" % version
)

For snapshots, enable Sonatype Central snapshots:

resolvers += "central-snapshots" at
  "https://central.sonatype.com/repository/maven-snapshots/"

Introduction

First process

A process declares its API as events and reacts to them in handle. The handler is generic in F[_], so the same process can be interpreted by any runtime that provides the required capabilities.

ProcessRef is the address. Keep wiring late-bound by passing refs rather than process instances.

import io.parapet.{Event, ProcessRef}
import io.parapet.core.Process
import io.parapet.core.Events.Start

class Printer[F[_]] extends Process[F, Printer.Print, Nothing]:
  import Printer.*
  import dsl.*

  override def handle: Receive =
    case Print(data) => eval(println(data))

object Printer:
  final case class Print(data: Any) extends Event

class Greeter[F[_]](printer: ProcessRef[Printer.Print]) extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start => Printer.Print("hello world") ~> printer

Introduction

Run an app

Specialize at the application boundary. For production Scala FP applications, mix CatsEffectParApp.

processes returns an effect that yields the seed population — anything else you need (channels, child processes) can be registered from inside Start.

import cats.effect.IO
import io.parapet.cats.CatsEffectParApp
import io.parapet.core.Process

object HelloApp extends CatsEffectParApp:
  def processes(args: Array[String]): IO[Seq[Process[IO, ?, ?]]] =
    IO.delay {
      val printer = new Printer[IO]
      val greeter = new Greeter[IO](printer.ref)
      Seq(printer, greeter)
    }

Learn

The F[_] algebra

F[_] is a type constructor: it takes a type and produces another one. F[Int] means "a description of a computation that, when interpreted, produces an Int". The shape of that description is left abstract on purpose — different effect runtimes encode it differently:

Parapet code is written against F[_] + a small set of capability traits. That keeps domain logic decoupled from any one runtime, and turns the choice of effect system into an end-of-the-world decision instead of one that bleeds into every module.

Learn

Effect & effect systems

An effect is anything that interacts with the outside world or changes observable state: printing, reading a file, sleeping, sending a network packet, mutating a counter. A pure function only computes a value from its inputs.

An effect system is the part of a runtime that lets you build, combine, and execute descriptions of effects without performing them eagerly. In Parapet this is the Effect[F] type-class plus its companions:

trait Effect[F[_]]:
  def pure[A](value: A): F[A]
  def delay[A](thunk: => A): F[A]
  def suspend[A](thunk: => F[A]): F[A]
  def blocking[A](thunk: => A): F[A]
  def raiseError[A](error: Throwable): F[A]
  extension [A](fa: F[A])
    def flatMap[B](f: A => F[B]): F[B]
    def map[B](f: A => B): F[B]
    def handleErrorWith(f: Throwable => F[A]): F[A]

A process never calls Effect directly. It produces a DslF[F, Unit] program — the runtime's interpreter is the one that lifts each FlowOp into an F-effect.

Learn

Composable DSL

Inside a handler, you build a small program using DSL operators such as send, fork, delay, and race. These operators do not perform anything on their own — they return values describing what should happen, which the runtime later executes.

// A program: "send Ping to peer, wait 100ms, send Pong to peer"
val program: DslF[F, Unit] =
  Ping ~> peer ++ delay(100.millis) ++ Pong ~> peer

Because the program is a value, you can store it, log it, test it under a fake interpreter, return it from a function that takes parameters, or compose it with another program with ++. This is the headline benefit of an effect system: behavior becomes data.

Learn

The free encoding

Under the hood, a DslF[F, A] is a Free[FlowOp[F, *], A] — an immutable tree of FlowOp nodes leading to a final result. The DslInterpreter walks that tree and turns each node into an F- effect that the chosen runtime can execute.

You do not need to know category theory to use Parapet. The encoding matters because it guarantees three properties:

Learn

Add a custom FlowOp

The DSL is open: you can mix the parapet algebra with your own and route the extra cases through a tailor-made interpreter. The pattern is three steps — declare the new op, expose a smart constructor, then teach the interpreter how to run it.

// 1. Declare the op as a data type — pure, no behavior yet.
import io.parapet.free.{Free, Inject}

sealed trait MetricOp[F[_], A]
final case class Counter[F[_]](name: String, delta: Long)
    extends MetricOp[F, Unit]
final case class Timer[F[_], A](name: String, body: Free[MetricOp[F, _], A])
    extends MetricOp[F, A]

// 2. A smart constructor that lifts the op into your program type.
final class MetricOps[F[_], C[_]](using inject: Inject[[x] =>> MetricOp[F, x], C]):
  def increment(name: String, delta: Long = 1L): Free[C, Unit] =
    Free.inject[[x] =>> MetricOp[F, x], C, Unit](Counter[F](name, delta))

  def time[A](name: String)(body: Free[MetricOp[F, _], A]): Free[C, A] =
    Free.inject[[x] =>> MetricOp[F, x], C, A](Timer[F, A](name, body))

Programs that mix your algebra with the built-in one now type-check naturally. They are still pure values; the runtime decides what each node means.

Learn

Extend the interpreter

A custom interpreter is a FunctionK from your algebra to F. Provide one that handles the new nodes; everything else delegates to the default Parapet interpreter. Override ParApp.interpreter (or your runtime equivalent) to plug it in.

import io.parapet.core.DslInterpreter.Interpreter
import io.parapet.free.{FunctionK, ~>}

final class MetricInterpreter[F[_]](underlying: Interpreter[F])(using metrics: Metrics[F])
    extends Interpreter[F]:

  def interpret(sender: ProcessRef.Unknown, target: ProcessRef.Unknown, trace: ExecutionTrace) =
    new FunctionK[[x] =>> MetricOp[F, x], F]:
      def apply[A](op: MetricOp[F, A]): F[A] = op match
        case Counter(name, delta)  => metrics.add(name, delta)
        case Timer(name, body)     => metrics.time(name)(
          body.foldMap(interpret(sender, target, trace))
        )
  // ... delegate the two other interpret(...) overloads to `underlying`.

Custom interpreters are the right place for cross-cutting concerns that do not belong in domain handlers: structured logging, distributed tracing, metrics, audit, replay.

Core concepts

Process model

Isolated state

Each process owns its mailbox. The runtime serializes handler invocations per process.

Typed protocol

A process declares the domain events it accepts, and refs carry that protocol through wiring.

Supervision graph

register sets up parent / child relations. Stop cascades down.

Core concepts

Typed refs

Declare a process protocol with Process[F, In, Out], then pass around ProcessRef[In] to keep wiring explicit.

final case class Save(key: String, value: String) extends Event

class Store[F[_]] extends Process[F, Save, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Save(key, value) => eval(println(s"saved $key=$value"))
    case Stop             => eval(println("closing store"))

class Writer[F[_]](store: ProcessRef[Save]) extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start => Save("hello", "world") ~> store

Core concepts

Process lifecycle

Every process moves through the same four-stage lifecycle, driven by ordinary events the handler can observe and react to.

  1. Registration. The application returns a process from processes(args), or another process calls register(parent, child). The runtime allocates a mailbox and wires the supervision graph.
  2. Start. The runtime delivers Events.Start as the first event. Use it to send "ready" notifications, register children, open resources.
  3. Running. Events are delivered in mailbox order; handle is invoked sequentially. Failures inside the handler are converted into a Failure event for the sender.
  4. Stop. On graceful shutdown the runtime first stops every child, then delivers Events.Stop to the parent. Kill tears the process down without running handle.
class Server[F[_]] extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start =>
      eval(println("opening port 8080"))
    case Stop =>
      eval(println("draining and closing"))

Core concepts

System events

System events live in io.parapet.core.Events. They appear in handlers just like domain events, so there are no special hooks to learn.

EventWhen it is delivered
StartOnce, immediately after registration. Use for setup and to send "ready" pings.
StopOnce, during graceful shutdown. Children receive Stop before their parent.
KillForces termination without invoking handle. The mailbox is discarded.
Failure(envelope, error)Routed back to the original sender when the receiver's handler raised. The sender can recover, retry, or escalate; unhandled failures become dead letters.
DeadLetter(envelope, error)Wraps an envelope that could not be delivered (unknown receiver, terminated process, unhandled Failure). Routed to the configured DeadLetterProcess.

Core concepts

Switching behavior

A process can replace its active receive function with switch(newReceive). The swap is synchronous within the current program, so it composes naturally with state-machine flows.

class Door[F[_]] extends Process[F, Event, Nothing]:
  import dsl.*

  private def closed: Receive =
    case Open  => eval(println("opening")) ++ switch(opened)
    case Knock => eval(println("knock knock"))

  private def opened: Receive =
    case Close => eval(println("closing")) ++ switch(closed)
    case Enter => eval(println("welcome"))

  override def handle: Receive = closed

Core concepts

Process combinators

Processes themselves compose: a ++ b (alias a.and(b)) creates a process that runs both handlers when both can handle the event; a.or(b) tries a first and falls back to b. The resulting process inherits the left-hand ref and name.

class Core[F[_]] extends Process[F, Ping.type, Pong.type]:
  override def handle: Receive =
    case Ping => reply(Pong)

val core    = new Core[F]
val metrics = Process[F](_ => { case Ping => eval(counter.incrementAndGet()) })

val instrumented = core ++ metrics  // same ref as `core`

Core concepts

Channel

Channel turns the asynchronous mailbox model into a strict one-call-at-a-time dialog. It is useful when a handler needs a direct response before continuing, without inventing correlation ids by hand.

val client: ProcessRef[Event] = ProcessRef("client")
val ch     = new Channel[F](client)

val flow =
  register(client, ch) ++
    ch.send(Request("ping"), server.ref).flatMap {
      case scala.util.Success(Response(data)) => eval(println(s"got $data"))
      case scala.util.Failure(err)            => eval(println(s"failed: $err"))
    }

A channel rejects new requests while a request is in flight, which keeps the send-then-await pattern safe without locks.

Core concepts

Errors are events

When a handler raises, the runtime routes a Failure(envelope, error) event back to the original sender. Senders that handle Failure can retry, escalate, or convert the error into a domain event; senders that do not see one route the failure to the dead-letter sink.

class Client[F[_]](server: ProcessRef[Request]) extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start =>
      Request("hello") ~> server
    case Failure(env, err) =>
      eval(println(s"server $server failed: ${err.getMessage}; retrying")) ++
        env.event ~> env.receiver

Core concepts

Dead-letter

A DeadLetter wraps an envelope the runtime could not deliver — the receiver does not exist, has been terminated, or returned a Failure the sender did not handle. Dead letters are routed to a single well-known process whose ref is ProcessRef.DeadLetterRef.

The default implementation logs every dead letter with structured fields. Override ParApp.deadLetter to plug in your own — typical replacements push to a message bus, surface metrics, or persist the envelope for replay.

import io.parapet.core.processes.DeadLetterProcess

class AlertingDeadLetters[F[_]] extends DeadLetterProcess[F]:
  import dsl.*

  override val handle: Receive = {
    case DeadLetter(env, err) =>
      eval(metrics.deadLetters.increment()) ++
        alerts.publish(env, err)
  }

object MyApp extends CatsEffectParApp:
  override def deadLetter: IO[DeadLetterProcess[IO]] =
    IO.pure(new AlertingDeadLetters[IO])

  def processes(args: Array[String]): IO[Seq[Process[IO, ?, ?]]] = ...

Core concepts

Configuration

ParConfig tunes the runtime: default mailbox capacity, scheduler thread count, dev-mode logging, tracing, and the optional in-memory event log used for replay.

override def config =
  ParConfig.default
    .withProcessBufferSize(1024)
    .withWorkerCount(8)
    .enableTracing
    .enableEventLog

DSL reference

Cheatsheet

OperatorPurpose
unit / pure(a)Do nothing / lift a value.
flow { ... }Defer construction (for recursion).
event ~> refSend an event.
forward(event, ref)Send while preserving the original sender.
reply(event)Reply to the sender of the current event.
eval { ... }Lift a plain side-effecting computation.
suspend(fa)Lift an existing F-effect.
delay(d)Sleep for d without blocking a thread.
fork(flow)Start a fiber; returns a join/cancel handle.
par(p1, p2, ...)Fork each flow and return a list of fibers.
race(p1, p2)Run both, return the winner, cancel the loser.
offload(flow)Run in the background; gate next mailbox event.
register(parent, child)Add a supervised child.
halt(ref)Stop a process and its descendants.
handleError(body, recover)DSL try / catch.
guarantee(body, finalizer)DSL try / finally.
switch(receive)Replace the active receive function.

DSL reference

unit · pure · flow

unit is a program that does nothing — useful as a fold seed or to give a handler clause an explicit "do nothing" body. pure(a) lifts a value into a program. flow { ... } defers construction; use it for recursive flows so the recursion happens at interpretation time instead of construction time.

// Empty branches.
{
  case Start => unit
  case Stop  => unit
}

// Folding over a list.
processes
  .map(event ~> _)
  .foldLeft(unit)(_ ++ _)

// Safe recursion.
def times[F[_]](n: Int): DslF[F, Unit] = flow {
  if n == 0 then unit
  else eval(print(n)) ++ times(n - 1)
}

DSL reference

send · ~>

Sends an event to one or more receivers. The current process is recorded as the sender. ~> is the symbolic alias for the single-receiver case. Events are constructed lazily, so the runtime can elide construction when delivery is impossible. The compiler checks that the receiver accepts the event type.

// Single receiver.
Ping ~> peer

// Batch sequence to one receiver.
Seq(Step1, Step2, Step3) ~> pipeline

// Fan-out, ordered enqueue across receivers.
send(Ping, processA, processB, processC)

DSL reference

forward

Forwards an event while preserving the sender of the message currently being handled. The standard building block for proxy processes.

class Server[F[_]] extends Process[F, Request, Response]:
  override def handle: Receive =
    case Request(body) => reply(Response(s"server/$body"))

val server = new Server[F]

val proxy = Process[F](_ => {
  case Request(body) =>
    forward(Request(s"proxy/$body"), server.ref)
})

// Replies from server go back to the original sender, not to proxy.

DSL reference

reply

reply sends an event back to the sender of the message currently being handled. The event is checked against the process's declared output protocol.

class Echo[F[_]] extends Process[F, Request, Response]:
  override def handle: Receive =
    case Request(data) => reply(Response(s"echo: $data"))

// Batch reply preserves order.
class Acker[F[_]] extends Process[F, Batch, Ack]:
  override def handle: Receive =
    case Batch(items) => reply(items.map(Ack(_)))

DSL reference

eval · suspend

eval lifts a plain side-effecting expression into the DSL. suspend lifts an existing F-effect. Reach for suspend only when you already hold an F[A]; eval covers everything else.

// Plain side effect.
eval(println("hello"))

// Lift an existing F-effect.
suspend(database.fetch(id))

// Capture a result.
for
  value <- eval(System.currentTimeMillis())
  _     <- eval(println(s"at $value"))
yield ()

DSL reference

delay

Suspends subsequent steps for duration without blocking a thread on runtimes that support async sleep. Inside par, only the first step is delayed; wrap each branch to delay every one individually.

// Delay once before the whole block.
delay(2.seconds) ++ Tick ~> clock

// Delay every parallel branch.
par(
  delay(1.second) ++ eval(print("a")),
  delay(2.seconds) ++ eval(print("b"))
)

DSL reference

fork · par

fork(flow) starts a fiber and returns its handle. par(p1, p2, ...) forks each program in parallel and gives back a list of handles. Use fiber.join to await a value and fiber.cancel to abort.

for
  fiber  <- fork(eval(computeExpensiveResult()))
  // ... other work ...
  result <- fiber.join
  _      <- eval(println(s"got $result"))
yield ()

// Run independent steps concurrently.
par(eval(print("1")), eval(print("2")), eval(print("3")))

DSL reference

race

Runs two flows concurrently and returns whichever wins; the loser is cancelled. The canonical pattern for timeouts and fastest-responder fan-out.

// Timeout pattern.
race(
  server.send(Request, _ => unit),
  delay(5.seconds) ++ eval(println("server timed out"))
)

// Fastest replica wins.
race(
  ch.send(Request, replicaA),
  ch.send(Request, replicaB)
)

DSL reference

offload

Runs flow on a background fiber so it does not stall scheduler workers. Subsequent steps in the same handler may proceed immediately, but the process will not accept its next mailbox event until all offloaded work has completed.

class IndexingProcess[F[_]](store: Store[F]) extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Index(record) =>
      offload(suspend(store.write(record)))
    case Stop =>
      eval(println("done indexing"))

DSL reference

register · halt

register(parent, child) adds a child to the supervision graph; children receive Stop before their parent. halt(ref) stops a process and cascades to its descendants.

class Pool[F[_]] extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start =>
      register(ref,
        new Worker[F]("worker-1"),
        new Worker[F]("worker-2")
      )
    case Shutdown =>
      halt(ref)

DSL reference

handleError · guarantee

handleError is DSL try / catch; guarantee is DSL try / finally. Use them to keep recovery and cleanup inside the program value rather than the underlying effect.

handleError(
  eval(throw RuntimeException("boom")),
  err => eval(println(s"recovered from ${err.getMessage}"))
)

guarantee(
  eval(openFile()),
  eval(closeFile())
)

Network

Transport traits

parapet-net separates low-level byte transport from process-level events. Today it ships TCP request/reply over ZMQ and UDP-style datagrams over Aeron; the same process adapters can also wrap your own transport.

TraitShape
ClientTransport[F]Request/reply client: request(Message) => F[Either[TransportError, Message]].
ServerTransport[F]Polls RoutedMessages and replies with reply(routingId, message).
DatagramTransport[F]UDP-style fan-out: publish(Message) / receiveBatch(limit).

The adapter processes in io.parapet.net.processes lift those traits into the process world: ClientProcess, ServerProcess, and DatagramProcess. Application code sends typed events; transport details stay at the edge.

Network

TCP echo server

ServerProcess wraps a ServerTransport. It receives routed messages from the transport and forwards them to a sink process as ServerProcess.Received. To answer, send ServerProcess.Reply back to the server process.

import io.parapet.ProcessRef
import io.parapet.core.Process
import io.parapet.net.processes.ServerProcess

class Echo[F[_]](
    server: ProcessRef[ServerProcess.Reply]
) extends Process[F, ServerProcess.Received | ServerProcess.Failed, Nothing]:
  import dsl.*

  override def handle: Receive =
    case ServerProcess.Received(routingId, message) =>
      ServerProcess.Reply(routingId, message) ~> server
    case ServerProcess.Failed(error) =>
      eval(println(s"server failed: $error"))

val echoRef = ProcessRef[ServerProcess.Received | ServerProcess.Failed]("echo")
val server  = new ServerProcess[F](transport, sink = echoRef)
val echo   = new Echo[F](server.ref):
  override val ref = echoRef

Network

TCP ping client

ClientProcess wraps a ClientTransport. Send ClientProcess.Request to it and it replies to the sender with either ClientProcess.Response or ClientProcess.Failed.

import io.parapet.ProcessRef
import io.parapet.core.Events.Start
import io.parapet.core.Process
import io.parapet.net.processes.ClientProcess
import io.parapet.net.transport.Message

class Caller[F[_]](
    client: ProcessRef[ClientProcess.Request]
) extends Process[F, ClientProcess.Response | ClientProcess.Failed, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start =>
      ClientProcess.Request(Message.single("ping".getBytes("UTF-8"))) ~> client
    case ClientProcess.Response(message) =>
      eval(println(s"got ${new String(message.parts.head, "UTF-8")}"))
    case ClientProcess.Failed(error) =>
      eval(println(s"request failed: $error"))

Network

ZMQ wiring

ZMQ is the current TCP backend. Create the low-level transport at the edge, then register ordinary Parapet processes around it.

import cats.effect.IO
import io.parapet.cats.CatsEffectParApp
import io.parapet.ProcessRef
import io.parapet.net.{Endpoint, TransportProtocol}
import io.parapet.net.processes.{ClientProcess, ServerProcess}
import io.parapet.net.transport.zmq.*

object ServerApp extends CatsEffectParApp:
  def processes(args: Array[String]) =
    val echoRef = ProcessRef[ServerProcess.Received | ServerProcess.Failed]("echo")
    val config = ZmqTcpServerConfig(
      bind = Endpoint(TransportProtocol.Tcp, "*", 5555)
    )

    ZmqTcpServer.make[IO](config).allocated.map { case (transport, _) =>
      val server = new ServerProcess[IO](transport, sink = echoRef)
      val echo = new Echo[IO](server.ref):
        override val ref = echoRef
      Seq(echo, server)
    }

object ClientApp extends CatsEffectParApp:
  def processes(args: Array[String]) =
    val config = ZmqTcpClientConfig(
      remote = Endpoint(TransportProtocol.Tcp, "server", 5555)
    )

    ZmqTcpClient.make[IO](config).allocated.map { case (transport, _) =>
      val client = new ClientProcess[IO](transport)
      val caller = new Caller[IO](client.ref)
      Seq(client, caller)
    }

In production code, keep the returned Resource alive for the application lifetime so sockets are closed cleanly on shutdown.

Network

UDP with Aeron

AeronUdpTransport implements DatagramTransport. DatagramProcess publishes outbound datagrams and forwards received batches to a sink as DatagramProcess.Received.

import io.parapet.{Event, ProcessRef}
import io.parapet.core.Events.Start
import io.parapet.core.Process
import io.parapet.net.processes.DatagramProcess
import io.parapet.net.transport.Message
import io.parapet.net.transport.aeron.*

class Publisher[F[_]](
    datagram: ProcessRef[DatagramProcess.Publish]
) extends Process[F, Event, Nothing]:
  import dsl.*

  override def handle: Receive =
    case Start =>
      DatagramProcess.Publish(
        Message.single("hello".getBytes("UTF-8"))
      ) ~> datagram

class UdpSink[F[_]]
    extends Process[F, DatagramProcess.Received | DatagramProcess.Failed, Nothing]:
  import dsl.*

  override def handle: Receive =
    case DatagramProcess.Received(message) =>
      eval(println(s"udp: ${new String(message.parts.head, "UTF-8")}"))
    case DatagramProcess.Failed(error) =>
      eval(println(s"udp failed: $error"))

val config = AeronUdpConfig(
  channel = "aeron:udp?endpoint=localhost:40123",
  streamId = 1001,
  embeddedMediaDriver = true
)

for transport <- AeronUdpTransport.make[F](config).allocated.map(_._1)
yield
  val sink      = new UdpSink[F]
  val datagram  = new DatagramProcess[F](transport, sink.ref)
  val publisher = new Publisher[F](datagram.ref)
  Seq(datagram, sink, publisher)

Modules

Module map

parapet-coreProcess model, DSL, scheduler, capability contracts.
parapet-testkitShared conformance specs for runtime implementations.
parapet-cats-effectRecommended production backend for Cats Effect IO.
parapet-parioSmall reference runtime for learning and tests.
parapet-protocolWire codecs and generated protocol models.
parapet-netTCP / UDP transports and adapter processes.

Modules

Runtime choice

Use parapet-cats-effect when shipping. Use parapet-pario when you want to read the runtime, write small examples, or validate a new backend.

Both implement the same capability contracts (Effect, Parallel, SchedulerRuntime), so handlers are portable across them with no code changes.

// Production
import io.parapet.cats.CatsEffectParApp
object App extends CatsEffectParApp:
  def processes(args: Array[String]) = IO.pure(Seq(...))

// Reference runtime
import io.parapet.ParIOApp
import io.parapet.effect.ParIO
object Sandbox extends ParIOApp:
  def processes(args: Array[String]): ParIO[Seq[Process[ParIO, ?, ?]]] =
    ParIO.pure(Seq(...))

Modules

Testing

Because handlers are values, you can step a program through a test interpreter and assert on the emitted events instead of standing up a real network. The shared conformance specs in parapet-testkit are the same suite each backend has to satisfy — they are also a useful checklist when writing your own scenarios.

Project

Roadmap

Parapet evolves in layers. Each layer is independently useful; later layers compose what earlier ones expose. The guiding principle: small composable behaviours, not a monolithic framework.

Layer 0 · Protocol primitives Channel ✅. Reliable send, failure detector, membership, causal context, ordered broadcast, fencing tokens — on deck.
Layer 1 · Replication & state Raft 🟡 (snapshots / dynamic membership next). CRDT algebra, anti-entropy gossiper, pluggable replicated log.
Layer 2 · Process patterns Sharding, cluster singleton, typed pub/sub, request router, saga orchestrator, rate limiter, durable timers.
Layer 3 · Dataflow Operator graph, keyed state, watermarks, Chandy–Lamport checkpoint coordinator, exactly-once sinks.
Layer 4 · Testing & observability Real event log, deterministic scheduler, fault injection, causal tracing, lin-check harness, replay tooling.
Layer 5 · Algorithm zoo Raft 🟡, Multi-Paxos, EPaxos, Zab, Chandy–Lamport, SWIM, HyParView / Plumtree, ABD register — reference implementations.

The full plan, including the next milestone (CRDT toolkit + anti-entropy gossiper, then sharding + cluster singleton) and anti-goals, lives in roadmap/ROADMAP.md.

Project

Status

Parapet is actively evolving. The core primitives are in place; higher-level modules such as reliable channels, failure detectors, broadcast, gossip, CRDTs, sharding, and observability grow independently.