framer 1.0.1
framer: ^1.0.1 copied to clipboard
Incremental framing for Stream<List<int>> with hard progress guarantees, spill/drain-safe codecs, and a fast ring buffer.
framer #
Fast, incremental framing for byte streams with hard progress guarantees and built-in DoS guards.
The problem everyone has (and hates) #
You receive bytes in arbitrary chunks.
They never align with your message boundaries.
One onData callback can contain half a message,
17 full messages, 2.3 messages, or a malicious 4 GB "message".
The standard approach is to write a BytesBuilder, scan for delimiters, track partial headers, handle edge cases, and hope the result doesn't allocate quadratically. If you've ever written this before, you know it's never fun and looks like:
final buffer = BytesBuilder();
await for (final chunk in stream) {
buffer.add(chunk);
var bytes = buffer.toBytes(); // ← O(n) copy
var i = bytes.indexOf(0x0A); // ← O(n) scan
while (i != -1) {
handle(bytes.sublist(0, i)); // ← another copy
bytes = bytes.sublist(i + 1); // ← yet another copy
i = bytes.indexOf(0x0A);
}
buffer.clear();
buffer.add(bytes); // keep remainder
}
Works in tests.
Melts in production with 1-byte chunks.
OOMs when someone sends a huge length prefix.
Breaks backpressure if you forget to forward pause.
The one-line fix #
await for (final frame in stream.decodeFrames(DelimiterCodec(0x0A))) {
handle(frame);
}
framer does it right, once and for all:
- Fixed-memory ring buffer → no allocation storms
- Zero-copy views whenever possible → minimal GC pressure
- Full backpressure support → respects
pause/resume/cancel - Safe by default → length prefixes are bounds-checked before allocation
- Built-in DoS guards →
maxFrameSize,maxLineLength, etc. - Typed exceptions →
MalformedFrameException,FrameTooLargeException,CodecStalledError - Works everywhere →
Socket,File,WebSocket, anyStream<List<int>>
Quick start #
dart pub add framer
Or manually:
# pubspec.yaml
dependencies:
# See pub.dev for the latest.
framer: ^1.0.0
import 'package:framer/framer.dart';
That gives you the core API and every built-in codec with safe-by-default semantics (frames are owned copies, not tied to internal buffers).
Newline-delimited messages (NDJSON, line protocols) #
await for (final msg in socket.decodeFrames(DelimiterCodec(0x0A))) {
// msg is an owned Uint8List — safe to store, pass around, mutate.
}
SSE (text/event-stream) #
// `byteStream` can be a socket, file, HTTP response body, etc.
await for (final event in byteStream.decodeFrames(SseCodec())) {
print('${event.type}: ${event.data}');
}
Content-Length (LSP / stdio JSON-RPC) #
import 'dart:convert';
import 'dart:io';
import 'package:framer/framer.dart';
await for (final body in stdin.decodeFrames(ContentLengthCodec())) {
print(utf8.decode(body));
}
LSP JSON-RPC (framing + JSON parse) #
import 'dart:io';
import 'package:framer/framer.dart';
await for (final obj in stdin.decodeFrames(LspJsonRpcCodec())) {
print(obj); // typically a Map<String, Object?>
}
gRPC transport framing #
await for (final msg in byteStream.decodeFrames(GrpcMessageCodec())) {
final payload = msg.payload; // Uint8List
final compressed = msg.compressed;
}
WebSocket frame parsing (RFC 6455 framing layer) #
This parses raw WebSocket frames (not the handshake / HTTP upgrade).
final codec = WebSocketFrameCodec(requireMask: false);
await for (final frame in byteStream.decodeFrames(codec)) {
print(frame);
}
StreamTransformer API (idiomatic pipelines) #
final lines = byteStream.transform(
FramerTransformer(() => DelimiterCodec(0x0A)),
);
More examples in example/.
example/sse.dartexample/lsp_content_length.dartexample/lsp_json_rpc.dartexample/grpc_messages.dartexample/websocket_frames.dart
Built-in codecs #
| Codec | Protocol | Notes |
|---|---|---|
DelimiterCodec |
Newline, NUL, pipe, etc. | Single-byte delimiter |
DelimiterSequenceCodec |
\r\n\r\n, multipart boundaries |
Multi-byte delimiter |
CrlfLineCodec |
HTTP-ish headers | Strict \r\n or lenient |
LengthPrefixedCodec |
Binary protocols | u32be and varint32 |
ContentLengthCodec |
LSP / stdio JSON-RPC | Header block + body |
LspJsonRpcCodec |
LSP | Framing + JSON decode |
GrpcMessageCodec |
gRPC transport framing | 5-byte header + payload |
SseCodec |
Server-Sent Events | text/event-stream |
WebSocketFrameCodec |
WebSocket (RFC 6455) | Frame-level only |
These are framing codecs only — they parse the wire format but don't implement full protocol state machines (handshakes, compression negotiation, etc.).
Codec composition #
Build "bytes → parsed objects" pipelines with FrameCodecX.map:
import 'dart:convert';
import 'package:framer/framer.dart';
final jsonLines = DelimiterCodec(0x0A)
.map((b) => utf8.decode(b))
.map((s) => json.decode(s));
One important rule: don’t reuse codec instances #
Codecs are stateful — sharing one codec across multiple streams/subscriptions is a subtle bug. Create a fresh codec instance per stream to build a reusable pipeline safely.
// ✅ **Right:**
await for (final msg in socket.decodeFrames(ContentLengthCodec())) {
// New codec instance created for this stream.
}
// ❌ **Wrong:**
final codec = ContentLengthCodec();
await for (final msg in socket.decodeFrames(codec)) {}
await for (final msg in otherSocket.decodeFrames(codec)) {}
How it works (if you care) #
Stream<List<int>> → RingBuffer → FrameCodec → Stream<T>
↑
power-of-two, bitmasked indexing
zero-copy spans (O(1) peek, O(n) bulk with ≤2 copies)
The driver fills the buffer, calls tryDecode, and handles the choreography: progress detection (can't stall on full buffer), EOF finalization, and error propagation.
Frames larger than ring capacity? Codecs spill to an internal accumulator and drain the ring to make room. It works without you thinking about it.
Pick your speed tier #
Not everyone needs the same tradeoff. Choose the API that matches your workload:
| API | Overhead | Use when |
|---|---|---|
stream.decodeFrames(codec) |
async* + yield |
Simple, idiomatic |
stream.decodeFramesPush(codec) |
Push-based Stream |
Chatty sources, many small chunks |
stream.decodeFramesInto(codec, onFrame) |
Callback, no output stream | Maximum throughput |
stream.decodeFramesIntoPooled(codec, onFrame) |
Callback, pooled buffers | Reduce GC churn (package:framer/extra.dart) |
stream.decodeFramesIntoBorrowed(codec, onFrame) |
Callback, zero-copy views | Extreme throughput, sync only (package:framer/unsafe.dart) |
Framer<T> |
Fully synchronous | Already have push-based bytes |
Note: pooled/borrowed APIs take a BorrowedBytesCodec (a codec that can produce
FrameBorrowedViews). Built-ins like DelimiterCodec and LengthPrefixedCodec
support this.
decodeFrames — the easy one #
await for (final frame in stream.decodeFrames(DelimiterCodec(0x0A))) {
handle(frame); // owned Uint8List, safe to store
}
Pull-based async* driver. Lowest cognitive overhead. The right choice for most code.
decodeFramesInto — the fast one #
await stream.decodeFramesInto(DelimiterCodec(0x0A), (frame) {
handle(frame); // still an owned copy
});
Callback-based. Skips async*/yield and output-stream allocation. Same correctness guarantees.
decodeFramesIntoBorrowed — the fastest/dangerous one #
import 'package:framer/unsafe.dart';
await byteStream.decodeFramesIntoBorrowed(DelimiterCodec(0x0A), (frame) {
// frame may point directly into the ring buffer.
// Valid ONLY inside this callback. Do NOT retain it.
// Need to keep bytes? Copy inside the callback:
final copy = frame.toOwnedBytes();
});
Zero-copy. Frame bytes might reference internal buffers. Use only if you process synchronously and never retain.
decodeFramesIntoPooled — the compromise #
import 'package:framer/extra.dart';
await byteStream.decodeFramesIntoPooled(DelimiterCodec(0x0A), (lease) {
try {
handle(lease.bytesView); // safe while not released
} finally {
lease.release(); // return buffer to pool
}
});
Owned bytes (safe to retain), but uses a pool to reduce GC churn. Good for long-running servers.
Safety by default #
- Size limits everywhere. Every codec has a
maxFrameSize/maxBodySize/maxMessageSize. Default: 1 MiB. - Stall detection. If a codec returns
nullwithout consuming bytes from a full buffer, the driver throwsCodecStalledErrorinstead of looping forever. - Typed exceptions.
MalformedFrameExceptionfor bad input,FrameTooLargeExceptionfor size violations,CodecStalledErrorfor bugs. All implement marker interfaces (FramerException,FramerError) and extendFormatException/StateErrorfor backwards compatibility. - No global state. Codecs are stateful per-instance. You can use
FramerTransformervariants to create fresh codecs per subscription too, so you never accidentally share state. - DoS-resistant. Ring buffer capacity is fixed. Accumulators are bounded. Header line counts, header byte totals, varint lengths — all guarded.
Security / DoS knobs #
Every codec has sensible defaults. Override when you know better:
| Parameter | Purpose | Default |
|---|---|---|
capacity |
Ring buffer size (performance knob) | 64 KB |
maxFrameSize |
Max decoded frame length | 1 MB |
maxLineLength |
Max line length (delimiter codecs) | 1 MB |
maxEventSize |
Max SSE event size | 1 MB |
maxBodySize |
Max Content-Length body | 1 MB |
maxHeaderLines |
Max header count | 64 |
maxHeaderBytes |
Max total header block bytes | 64 KB |
A malicious peer can't trick you into allocating unbounded memory.
The driver enforces these limits before allocating large buffers, so memory use stays bounded.
Exception hierarchy #
All exceptions implement marker interfaces for easy catch patterns:
| Exception | Meaning | Superclass |
|---|---|---|
MalformedFrameException |
Invalid/corrupt input | FormatException, FramerException |
FrameTooLargeException |
Policy limit exceeded | FormatException, FramerException |
CodecStalledError |
Codec bug (no progress) | StateError, FramerError |
try {
await for (final frame in stream.decodeFrames(codec)) { /* ... */ }
} on FrameTooLargeException {
// Policy violation — disconnect.
} on MalformedFrameException {
// Bad input — log and close.
} on FramerException {
// Any framer-specific issue.
}
Correctness guarantees #
- Progress guarantee: after a codec returns
null, the driver fetches more bytes instead of retrying the same buffer. - EOF behavior: trailing bytes at EOF throw
FormatExceptionby default. Mix inFinalizableCodecfor protocols where EOF is a valid terminator. Or passallowTrailingBytesAtEof: trueto silently discard leftovers. - Owned frames (default): freshly allocated, safe to store/mutate.
- Pooled frames (opt-in): owned but backed by a reuse pool (
package:framer/extra.dart). - Borrowed frames (opt-in, unsafe): may reference internal buffers, only valid during callback (
package:framer/unsafe.dart).
Writing a custom codec #
Implement FrameCodec<T>:
final class MyCodec implements FrameCodec<MyFrame> {
bool _inProgress = false;
@override
bool get hasPending => _inProgress;
@override
MyFrame? tryDecode(RingBuffer rb) {
// 1. Parse header — return null if not enough bytes yet.
// 2. Validate limits (throw FrameTooLargeException).
// 3. Read payload bytes, spilling to internal buffer if ring is full.
// 4. Return the complete frame.
}
}
The contract #
- Return a frame when bytes are complete (consume exactly those bytes).
- Return
nullwhen incomplete. You may still consume bytes into internal state. - Never stall: if you return
nulland the ring is full, you must have consumed/drained something, or the driver throwsCodecStalledError. - Throw typed exceptions:
MalformedFrameExceptionfor bad input,FrameTooLargeExceptionfor policy violations.
EOF finalization (opt-in) #
If your protocol treats EOF as a legitimate terminator (e.g., JSON Lines without a trailing newline), mix in FinalizableCodec:
final class JsonLinesCodec
with FinalizableCodec<String>
implements FrameCodec<String> {
// ... tryDecode, hasPending ...
@override
String? tryFinalize(RingBuffer rb) {
// Flush any buffered trailing line at EOF.
}
}
Codecs that don't need it simply don't mix it in — no boilerplate.
Testing with CodecTestHarness #
test/codec_test_harness.dart is a test-only helper you can copy into your own test suite. It fuzzes codecs with adversarial chunking:
final harness = CodecTestHarness<MyFrame>(
codec: MyCodec(),
capacity: 16, // small capacity to stress spill/drain
);
final frames = await harness.decodeFromBytes(wireBytes, chunkMax: 3, seed: 42);
expect(frames, hasLength(expectedCount));
Performance #
Not benchmarketing, just honest numbers. M1 MacBook Pro, 20k 64-byte length-prefixed frames:
dart run tool/bench.dart --frames=20000 --payload=64 --chunkMax=8192 --baseline --minMs=300
Delimiter (into) 642.4 MiB/s
Delimiter (borrowed into) 742.6 MiB/s
Delimiter (BytesBuilder) 82.9 MiB/s ← the "normal" way
U32BE (into) 1980.0 MiB/s
U32BE (borrowed into) 2174.2 MiB/s
The BytesBuilder baseline is the code everyone writes by hand. framer is ~8x faster on delimiter framing — and it's correct.
Honest note: decodeFrames (the async* API) won't beat a tight synchronous loop for millions of tiny frames, because yield overhead dominates. The win is correctness + ergonomics, and the decodeFramesInto, Framer or the core ring-buffer path is fast when you need it.
Imports #
// Everything you need for safe, idiomatic framing:
import 'package:framer/framer.dart';
// Pooled buffers (safe, explicit reuse):
import 'package:framer/extra.dart';
// Zero-copy borrowed views (unsafe, read the docs):
import 'package:framer/unsafe.dart';
The split is intentional. package:framer/framer.dart never exposes borrowed or pooling APIs, so you can't adopt them by accident.
License #
MIT — see LICENSE.