Usage¶
A tour of Kuu’s features, ordered by how you’d reach for them.
Defining the App¶
from kuu import Kuu
from kuu.brokers.redis import RedisBroker
from kuu.results.redis import RedisResults
app = Kuu(
broker=RedisBroker(url="redis://localhost:6379/0"),
default_queue="default",
results=RedisResults(url="redis://localhost:6379/0"),
)
Kuu owns the broker, result storage (results), middleware, the task
registry, event bus, and scheduler.
Tasks¶
Declaring¶
@app.task
async def charge(user_id: int, amount_cents: int) -> dict:
return {"ok": True, "charged": amount_cents}
The parametrized form adds more options:
@app.task(queue="payments", max_attempts=3, timeout=10.0)
async def charge(user_id: int, amount_cents: int) -> dict: ...
Sync Functions¶
Sync functions work too. Set blocking=True to run them in a worker thread:
@app.task(blocking=True)
def render_pdf(invoice_id: int) -> bytes: ...
Combining blocking=True with an async def raises TypeError. Async
functions never need it.
Calling¶
Three ways:
# 1) in-process direct call (no broker hop)
result = await charge(user_id=1, amount_cents=500)
# 2) enqueue and forget
handle = await charge.q(user_id=1, amount_cents=500)
# 3) enqueue and wait for the result
result = await handle.result(timeout=30)
charge.q(...) returns a TaskHandle[ChargeResult] typed off the
function’s return annotation, so result keeps the right type without
casts.
handle.result() polls the result backend every 0.2 s by default. Pass
poll= to change the interval:
result = await handle.result(timeout=30, poll=0.5)
Delayed Enqueueing¶
Pass not_before to enqueue_by_name to schedule future delivery.
The broker holds the message until that UTC datetime, then delivers it:
from datetime import datetime, timedelta, timezone
from kuu.message import Payload
run_at = utcnow() + timedelta(minutes=10)
await app.enqueue_by_name(
"myapp.tasks:charge",
Payload(args=(1, 500)),
not_before=run_at,
)
For Redis, delayed messages live in a sorted set and are pumped into the live stream when due. For NATS and Memory brokers, an in-process loop handles promotion.
Idempotency¶
Pass idempotency_key in headers to deduplicate against the result backend:
await app.enqueue_by_name(
"myapp.tasks:charge",
Payload(args=(1, 500)),
headers={"idempotency_key": "charge:order-42"},
)
If a result already exists at that key, the worker ack’s and skips execution.
Error Control¶
Tasks signal their own outcome by raising one of two sentinel exceptions. Any other uncaught exception is treated as a failure.
Retry¶
Raise RetryErr to re-queue the message (immediately or after a delay)
without consuming an attempt:
from kuu import RetryErr
@app.task
async def sync_inventory(sku: str) -> None:
if not upstream_available():
raise RetryErr(delay=30.0) # retry in 30 s
delay is in seconds. Omit it to let RetryMiddleware compute the
backoff. The worker increments message.attempt on every retry; once
max_attempts is reached the task is declared dead regardless of how
RetryErr was raised.
Reject¶
Raise RejectErr to discard the message without retrying. Pass
requeue=True to put it back on the queue:
from kuu import RejectErr
@app.task
async def process_webhook(body: dict) -> None:
if body.get("version") != 2:
raise RejectErr("unsupported webhook version") # drop silently
Brokers¶
Three brokers are bundled, all implementing the same Broker Protocol.
For advanced connection management (cluster, sentinel, connection pooling),
brokers and result backends accept a pre-built Transport via their
transport= parameter.
Redis Streams¶
from kuu.brokers.redis import RedisBroker
broker = RedisBroker(
url="redis://localhost:6379/0",
group="payments",
consumer="payments-1",
)
Live messages flow through Streams; scheduled (not_before) messages live
in a sorted set and are pumped into the live stream when due. Stale pending
entries from dead consumers are reclaimed automatically.
Redis Cluster and Sentinel¶
The convenience url= argument targets a single node. For cluster or
sentinel deployments, build a RedisTransport directly and pass it via
transport=:
from kuu import ClusterConfig, SentinelConfig, RedisTransport
from kuu.brokers.redis import RedisBroker
from kuu.results.redis import RedisResults
# Redis Cluster
broker = RedisBroker(
transport=RedisTransport(ClusterConfig(
url="redis://node1:6379",
read_from_replicas=True,
))
)
# Redis Sentinel
broker = RedisBroker(
transport=RedisTransport(SentinelConfig(
hosts=(("sentinel1", 26379), ("sentinel2", 26379)),
service_name="mymaster",
))
)
RedisResults accepts the same transport= parameter, so a single
RedisTransport can back both the broker and the result store.
transport = RedisTransport(ClusterConfig(url="redis://node1:6379"))
broker = RedisBroker(transport=transport)
results = RedisResults(transport=transport)
In cluster mode the broker hash-tags stream and sorted-set keys with
{queue} so all keys for a given queue land on the same slot.
NATS JetStream¶
from kuu.brokers.nats import NatsBroker
broker = NatsBroker(
servers="nats://localhost:4222",
stream="kuu",
)
In-Memory¶
For tests and single-process setups:
from kuu.brokers.memory import MemoryBroker
broker = MemoryBroker(buffer=1024)
Result Backend¶
Results are optional. When set, task return values persist under the message
id (or idempotency_key if present) and TaskHandle.result() fetches
them.
from kuu.results.redis import RedisResults
results = RedisResults(
url="redis://localhost:6379/0",
prefix="kuu:r:",
ttl=86400, # entries expire after a day; None means no expiry
replay=True, # short-circuit on cache hit
store_errors=True, # persist terminal failures so callers can observe them
)
replay=True enables idempotency_key-based deduplication: the worker
checks the backend before running and skips execution if a result already
exists.
Middleware¶
Three are bundled, same Protocol as user middleware:
from kuu.middleware import LoggingMiddleware, RetryMiddleware, TimeoutMiddleware
app = Kuu(
broker=...,
middleware=[
LoggingMiddleware(),
RetryMiddleware(base=0.5, cap=60.0, jitter=0.2),
TimeoutMiddleware(seconds=30.0),
],
)
LoggingMiddleware: one line on start, one on success/failure with duration.RetryMiddleware: catchesRetryErrand schedules the next attempt with exponential backoff plus jitter. Passretry_onto also auto-retry on specific exception types:RetryMiddleware(base=0.5, cap=60.0, jitter=0.2, retry_on=(httpx.HTTPError,))
Backoff:
min(cap, base * 2 ** attempt)with jitter applied as a fractional multiplier in[-jitter, +jitter].TimeoutMiddleware: per-taskTask.timeoutwins; thissecondsis the default.
Custom Middleware¶
class HeaderInjector:
async def __call__(self, ctx, call_next):
if ctx.phase == "enqueue":
ctx.message.headers["x-tenant"] = current_tenant()
return await call_next()
ctx.phase is "enqueue" for the publish path and "process" for
the worker path. Most middleware filters on phase.
Scheduler¶
Jobs are declared via app.every() or app.sched() decorators, or
programmatically through the schedule object. The scheduler loop runs in
the orchestrator process when [scheduler] enable = true.
Interval jobs¶
from datetime import timedelta
@app.task
async def refresh_balance() -> None: ...
# Decorator style
@app.every(timedelta(minutes=5))
async def refresh() -> None: ...
# Programmatic style
app.schedule.add_every(timedelta(minutes=5), "myapp.tasks:refresh")
Optional kwargs: id, queue, headers, max_attempts.
Schedule-based jobs (cron-like)¶
Schedule jobs use composable schedule objects instead of cron expressions:
from kuu.scheduler.schedule import at, every, on, on_day, in_month, between, Mon, Wed, Fri
# Every weekday at 09:00
@app.sched(on(Mon, Tue, Wed, Thu, Fri) & at(time(9, 0)))
async def morning_report() -> None: ...
# Every 4 hours, anchored from midnight
@app.sched(every(hours=4, starting=time(0, 0)))
async def periodic_sync() -> None: ...
# 1st and 15th of each month at 03:00
@app.sched(on_day(1, 15) & at(time(3, 0)))
async def billing() -> None: ...
# Between 09:00 and 17:00, every 30 minutes
@app.sched(every(minutes=30) & between(time(9), time(17)))
async def office_hours_job() -> None: ...
Schedule objects compose with & (AND - all must match) and | (OR -
any matches):
Schedule |
Purpose |
|---|---|
|
Daily at wall-clock time |
|
Fixed interval within each day |
|
Restrict to specific weekdays (Mon-Sun) |
|
Restrict to days of month (1-31) |
|
Restrict to specific months (Jan-Dec) |
|
Restrict to time window within each day |
Programmatic registration:
from kuu.scheduler.schedule import at, on, Mon, Wed, Fri
app.schedule.add_schedule(at(time(9, 0)) & on(Mon, Wed, Fri), "myapp.tasks:report")
Serializers¶
Brokers and result backends use a Serializer to marshal messages. The
default is JSONSerializer, backed by msgspec.json. Custom per-type
encoders/decoders can be registered via kuu.marshal.Marshal.
Msgpack¶
Faster and more compact than JSON for binary payloads.
from kuu.serializers import MsgpackSerializer
from kuu.brokers.redis import RedisBroker
from kuu.results.redis import RedisResults
broker = RedisBroker(url=..., serializer=MsgpackSerializer())
results = RedisResults(url=..., serializer=MsgpackSerializer())
Pickle¶
Supports arbitrary Python objects but executes arbitrary code on deserialization. Only use in trusted environments.
Set KUU_ALLOW_PICKLE=yes in the environment to suppress the per-call
SecurityWarning:
from kuu.serializers import PickleSerializer
broker = RedisBroker(url=..., serializer=PickleSerializer())
Custom Serializer¶
Implement the Serializer protocol:
from kuu import Serializer
class MySerializer:
def marshal(self, data) -> bytes: ...
def unmarshal(self, data: bytes, into=None): ...
Marshal / Type Coercion¶
The global kuu.marshal.marshal singleton extends msgspec’s
encode/decode hooks with custom type support. Register encoders and
decoders once; the broker, result backend, and persistence layer all
pick them up automatically.
from datetime import datetime, timezone
from kuu.marshal import marshal
# encode datetime → ISO 8601 string
marshal.register(
datetime,
enc=lambda dt: dt.astimezone(timezone.utc).isoformat(),
dec=lambda typ, s: datetime.fromisoformat(s),
)
# encode Path → string
from pathlib import Path
marshal.register(Path, enc=str, dec=lambda typ, s: Path(s))
Call marshal.unregister(type)() to remove a codec.
Persistence¶
Run and log history is stored automatically when [persistence] enable =
true (the default). Two backends ship:
SQLite - zero-config, single-file (
sqlite:///./kuu.db)PostgreSQL - requires the
postgresextra (postgres://user:pass@host/db)
[default.persistence]
enable = true
dsn = "postgres://user:pass@localhost:5432/kuu"
schema = "kuu"
runs_table = "kuu_runs"
logs_table = "kuu_run_logs"
keep_days = 7
max_runs = 100_000
log_level = "INFO"
capture_args = true
The dashboard exposes run history under each registered task. Runs older
than keep_days are purged periodically. max_runs caps total stored
runs.
PostgreSQL Result Backend¶
When using PostgreSQL as the broker transport, you can also store task
results in PostgreSQL via kuu.results.postgres.PostgresResults.
Requires the postgres extra.
from kuu.transports.postgres import PostgresTransport, PostgresConfig
from kuu.brokers.redis import RedisBroker # or any broker
from kuu.results.postgres import PostgresResults
transport = PostgresTransport(PostgresConfig(
dsn="postgres://user:pass@localhost:5432/kuu",
))
results = PostgresResults(transport=transport)
app = Kuu(broker=RedisBroker(url=...), results=results)
PostgresResults uses LISTEN/NOTIFY for instant result delivery
instead of polling. Falls back to polling if the notification channel
drops.
Observability / Control Plane¶
When kuu start runs without --preset, it launches a control
plane that forks one supervisor per preset. Each supervisor emits
structured state snapshots (workers, queues, jobs) and task lifecycle
events.
WebSocket Uplink¶
A leaf supervisor can stream events to a remote dashboard collector via WebSocket:
export KUU_DASHBOARD_URL="ws://dashboard.example.com/collect"
export KUU_DASHBOARD_TOKEN="<auth-token>"
uv run kuu start -p prod
Or pass it directly:
uv run kuu start -p prod --uplink ws://dashboard.example.com/collect
Test-time inspection¶
The kuu.observability module exposes the protocol types for building
custom tooling:
from kuu.observability import (
Envelope, Event, State, Hello, Bye,
WorkerSnapshot, QueueSnapshot, JobSnapshot, TaskInfo,
envelope_to_bytes, envelope_from_bytes,
)
Prometheus Metrics¶
Two integration points: a worker-side emitter and client-side middleware.
Both wire onto app.events.
Worker Metrics¶
When [metrics] enable = true, the orchestrator runs an aggregator HTTP
server on metrics.port and tells each worker subprocess to write to a
shared multiprocess directory.
[metrics]
enable = true
host = "0.0.0.0"
port = 9191
Hit http://host:9191/metrics.
Client-Side¶
from kuu.prometheus import ClientMetrics
app = Kuu(broker=..., middleware=[ClientMetrics()])
This records client_enqueued_total, client_enqueue_errors_total and
client_enqueue_duration_seconds from every task.q(...) call, on the
same registry the worker side uses.
To expose them from your own ASGI app:
from fastapi import FastAPI
from kuu.prometheus import asgi_app
api = FastAPI()
api.mount("/metrics", asgi_app())
Dashboard¶
Requires the dashboard extra. Shows registered tasks, scheduled jobs,
broker depth and live worker count.
[dashboard]
enable = true
host = "0.0.0.0"
port = 8181
path = "/dashboard"
The orchestrator imports the Kuu app via config.app, mounts the dashboard
under path and serves it via uvicorn alongside the worker pool.
Events¶
Every Kuu instance carries an app.events object with named signals.
Connect a handler to observe lifecycle events:
@app.events.task_succeeded.connect
async def on_success(msg, elapsed: float) -> None:
print(f"{msg.task} finished in {elapsed:.3f}s")
@app.events.task_dead.connect
def on_dead(msg) -> None:
sentry_sdk.capture_message(f"task {msg.task} exhausted retries")
Handlers can be sync or async; exceptions are logged and swallowed so a bad handler never brings down the worker.
Available signals and their handler signatures:
Signal |
Handler arguments |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Disconnect a handler with app.events.<signal>.disconnect(handler).
Watch / Hot Reload¶
[watch]
enable = true
root = "."
respect_gitignore = true
exclude = [".git/**"]
reload_delay = 0.25
reload_debounce = 0.5
When enabled, the orchestrator watches root and reloads the worker pool
on settled change batches. .gitignore patterns and exclude globs
filter out noise (caches, build artifacts, IDE temp files).
The scheduler shares the orchestrator lifetime, so a reload restarts
in-memory every(...) jobs at now + interval. Cron jobs re-anchor to
the next match and are unaffected.
CLI¶
Two commands:
uv run kuu start
uv run kuu start -c ./path/to/kuunfig.toml
uv run kuu start -o concurrency=128 -o dashboard.enable=true
uv run kuu info
-o dotted.path=value is repeatable. Values parse as JSON when valid
(true, 42, ["a","b"]); otherwise they stay as strings.
See Config for the full TOML schema.
Testing¶
MemoryBroker plus a programmatic Worker is the usual setup:
import anyio
from kuu import Kuu
from kuu.brokers.memory import MemoryBroker
from kuu.config import Settings
from kuu.worker import Worker
async def test_charge_runs():
app = Kuu(broker=MemoryBroker())
@app.task
async def charge(x: int) -> int:
return x * 2
handle = await charge.q(5)
config = Settings.model_construct(queues=["default"], concurrency=4)
worker = Worker(config, app=app)
async with anyio.create_task_group() as tg:
tg.start_soon(worker.run)
# ...assert behavior, then cancel the worker
tg.cancel_scope.cancel()
Worker(config, app=app) skips the dotted-spec import so you can pass an
inline-built Kuu instance.