[go: up one dir, main page]

framer 1.0.1 copy "framer: ^1.0.1" to clipboard
framer: ^1.0.1 copied to clipboard

Incremental framing for Stream<List<int>> with hard progress guarantees, spill/drain-safe codecs, and a fast ring buffer.

framer #

Coverage Status

Fast, incremental framing for byte streams with hard progress guarantees and built-in DoS guards.

The problem everyone has (and hates) #

You receive bytes in arbitrary chunks.
They never align with your message boundaries.

One onData callback can contain half a message, 17 full messages, 2.3 messages, or a malicious 4 GB "message".

The standard approach is to write a BytesBuilder, scan for delimiters, track partial headers, handle edge cases, and hope the result doesn't allocate quadratically. If you've ever written this before, you know it's never fun and looks like:

final buffer = BytesBuilder();
await for (final chunk in stream) {
  buffer.add(chunk);
  var bytes = buffer.toBytes();           // ← O(n) copy
  var i = bytes.indexOf(0x0A);            // ← O(n) scan
  while (i != -1) {
    handle(bytes.sublist(0, i));          // ← another copy
    bytes = bytes.sublist(i + 1);         // ← yet another copy
    i = bytes.indexOf(0x0A);
  }
  buffer.clear();
  buffer.add(bytes); // keep remainder
}

Works in tests.
Melts in production with 1-byte chunks.
OOMs when someone sends a huge length prefix.
Breaks backpressure if you forget to forward pause.

The one-line fix #

await for (final frame in stream.decodeFrames(DelimiterCodec(0x0A))) {
  handle(frame);
}

framer does it right, once and for all:

  • Fixed-memory ring buffer → no allocation storms
  • Zero-copy views whenever possible → minimal GC pressure
  • Full backpressure support → respects pause/resume/cancel
  • Safe by default → length prefixes are bounds-checked before allocation
  • Built-in DoS guards → maxFrameSize, maxLineLength, etc.
  • Typed exceptions → MalformedFrameException, FrameTooLargeException, CodecStalledError
  • Works everywhere → Socket, File, WebSocket, any Stream<List<int>>

Quick start #

dart pub add framer

Or manually:

# pubspec.yaml
dependencies:
  # See pub.dev for the latest.
  framer: ^1.0.0
import 'package:framer/framer.dart';

That gives you the core API and every built-in codec with safe-by-default semantics (frames are owned copies, not tied to internal buffers).

Newline-delimited messages (NDJSON, line protocols) #

await for (final msg in socket.decodeFrames(DelimiterCodec(0x0A))) {
  // msg is an owned Uint8List — safe to store, pass around, mutate.
}

SSE (text/event-stream) #

// `byteStream` can be a socket, file, HTTP response body, etc.
await for (final event in byteStream.decodeFrames(SseCodec())) {
  print('${event.type}: ${event.data}');
}

Content-Length (LSP / stdio JSON-RPC) #

import 'dart:convert';
import 'dart:io';

import 'package:framer/framer.dart';

await for (final body in stdin.decodeFrames(ContentLengthCodec())) {
  print(utf8.decode(body));
}

LSP JSON-RPC (framing + JSON parse) #

import 'dart:io';

import 'package:framer/framer.dart';

await for (final obj in stdin.decodeFrames(LspJsonRpcCodec())) {
  print(obj); // typically a Map<String, Object?>
}

gRPC transport framing #

await for (final msg in byteStream.decodeFrames(GrpcMessageCodec())) {
  final payload = msg.payload; // Uint8List
  final compressed = msg.compressed;
}

WebSocket frame parsing (RFC 6455 framing layer) #

This parses raw WebSocket frames (not the handshake / HTTP upgrade).

final codec = WebSocketFrameCodec(requireMask: false);
await for (final frame in byteStream.decodeFrames(codec)) {
  print(frame);
}

StreamTransformer API (idiomatic pipelines) #

final lines = byteStream.transform(
  FramerTransformer(() => DelimiterCodec(0x0A)),
);

More examples in example/.

Built-in codecs #

Codec Protocol Notes
DelimiterCodec Newline, NUL, pipe, etc. Single-byte delimiter
DelimiterSequenceCodec \r\n\r\n, multipart boundaries Multi-byte delimiter
CrlfLineCodec HTTP-ish headers Strict \r\n or lenient
LengthPrefixedCodec Binary protocols u32be and varint32
ContentLengthCodec LSP / stdio JSON-RPC Header block + body
LspJsonRpcCodec LSP Framing + JSON decode
GrpcMessageCodec gRPC transport framing 5-byte header + payload
SseCodec Server-Sent Events text/event-stream
WebSocketFrameCodec WebSocket (RFC 6455) Frame-level only

These are framing codecs only — they parse the wire format but don't implement full protocol state machines (handshakes, compression negotiation, etc.).

Codec composition #

Build "bytes → parsed objects" pipelines with FrameCodecX.map:

import 'dart:convert';
import 'package:framer/framer.dart';

final jsonLines = DelimiterCodec(0x0A)
    .map((b) => utf8.decode(b))
    .map((s) => json.decode(s));

One important rule: don’t reuse codec instances #

Codecs are stateful — sharing one codec across multiple streams/subscriptions is a subtle bug. Create a fresh codec instance per stream to build a reusable pipeline safely.

// ✅ **Right:**
await for (final msg in socket.decodeFrames(ContentLengthCodec())) {
  // New codec instance created for this stream.
}

// ❌ **Wrong:**
final codec = ContentLengthCodec();
await for (final msg in socket.decodeFrames(codec)) {}
await for (final msg in otherSocket.decodeFrames(codec)) {}

How it works (if you care) #

Stream<List<int>> → RingBuffer → FrameCodec → Stream<T>
                      ↑
            power-of-two, bitmasked indexing
            zero-copy spans (O(1) peek, O(n) bulk with ≤2 copies)

The driver fills the buffer, calls tryDecode, and handles the choreography: progress detection (can't stall on full buffer), EOF finalization, and error propagation.

Frames larger than ring capacity? Codecs spill to an internal accumulator and drain the ring to make room. It works without you thinking about it.


Pick your speed tier #

Not everyone needs the same tradeoff. Choose the API that matches your workload:

API Overhead Use when
stream.decodeFrames(codec) async* + yield Simple, idiomatic
stream.decodeFramesPush(codec) Push-based Stream Chatty sources, many small chunks
stream.decodeFramesInto(codec, onFrame) Callback, no output stream Maximum throughput
stream.decodeFramesIntoPooled(codec, onFrame) Callback, pooled buffers Reduce GC churn (package:framer/extra.dart)
stream.decodeFramesIntoBorrowed(codec, onFrame) Callback, zero-copy views Extreme throughput, sync only (package:framer/unsafe.dart)
Framer<T> Fully synchronous Already have push-based bytes

Note: pooled/borrowed APIs take a BorrowedBytesCodec (a codec that can produce FrameBorrowedViews). Built-ins like DelimiterCodec and LengthPrefixedCodec support this.

decodeFrames — the easy one #

await for (final frame in stream.decodeFrames(DelimiterCodec(0x0A))) {
  handle(frame); // owned Uint8List, safe to store
}

Pull-based async* driver. Lowest cognitive overhead. The right choice for most code.

decodeFramesInto — the fast one #

await stream.decodeFramesInto(DelimiterCodec(0x0A), (frame) {
  handle(frame); // still an owned copy
});

Callback-based. Skips async*/yield and output-stream allocation. Same correctness guarantees.

decodeFramesIntoBorrowed — the fastest/dangerous one #

import 'package:framer/unsafe.dart';

await byteStream.decodeFramesIntoBorrowed(DelimiterCodec(0x0A), (frame) {
  // frame may point directly into the ring buffer.
  // Valid ONLY inside this callback. Do NOT retain it.
  // Need to keep bytes? Copy inside the callback:
  final copy = frame.toOwnedBytes();
});

Zero-copy. Frame bytes might reference internal buffers. Use only if you process synchronously and never retain.

decodeFramesIntoPooled — the compromise #

import 'package:framer/extra.dart';

await byteStream.decodeFramesIntoPooled(DelimiterCodec(0x0A), (lease) {
  try {
    handle(lease.bytesView); // safe while not released
  } finally {
    lease.release(); // return buffer to pool
  }
});

Owned bytes (safe to retain), but uses a pool to reduce GC churn. Good for long-running servers.

Safety by default #

  • Size limits everywhere. Every codec has a maxFrameSize / maxBodySize / maxMessageSize. Default: 1 MiB.
  • Stall detection. If a codec returns null without consuming bytes from a full buffer, the driver throws CodecStalledError instead of looping forever.
  • Typed exceptions. MalformedFrameException for bad input, FrameTooLargeException for size violations, CodecStalledError for bugs. All implement marker interfaces (FramerException, FramerError) and extend FormatException / StateError for backwards compatibility.
  • No global state. Codecs are stateful per-instance. You can use FramerTransformer variants to create fresh codecs per subscription too, so you never accidentally share state.
  • DoS-resistant. Ring buffer capacity is fixed. Accumulators are bounded. Header line counts, header byte totals, varint lengths — all guarded.

Security / DoS knobs #

Every codec has sensible defaults. Override when you know better:

Parameter Purpose Default
capacity Ring buffer size (performance knob) 64 KB
maxFrameSize Max decoded frame length 1 MB
maxLineLength Max line length (delimiter codecs) 1 MB
maxEventSize Max SSE event size 1 MB
maxBodySize Max Content-Length body 1 MB
maxHeaderLines Max header count 64
maxHeaderBytes Max total header block bytes 64 KB

A malicious peer can't trick you into allocating unbounded memory.

The driver enforces these limits before allocating large buffers, so memory use stays bounded.

Exception hierarchy #

All exceptions implement marker interfaces for easy catch patterns:

Exception Meaning Superclass
MalformedFrameException Invalid/corrupt input FormatException, FramerException
FrameTooLargeException Policy limit exceeded FormatException, FramerException
CodecStalledError Codec bug (no progress) StateError, FramerError
try {
  await for (final frame in stream.decodeFrames(codec)) { /* ... */ }
} on FrameTooLargeException {
  // Policy violation — disconnect.
} on MalformedFrameException {
  // Bad input — log and close.
} on FramerException {
  // Any framer-specific issue.
}

Correctness guarantees #

  • Progress guarantee: after a codec returns null, the driver fetches more bytes instead of retrying the same buffer.
  • EOF behavior: trailing bytes at EOF throw FormatException by default. Mix in FinalizableCodec for protocols where EOF is a valid terminator. Or pass allowTrailingBytesAtEof: true to silently discard leftovers.
  • Owned frames (default): freshly allocated, safe to store/mutate.
  • Pooled frames (opt-in): owned but backed by a reuse pool (package:framer/extra.dart).
  • Borrowed frames (opt-in, unsafe): may reference internal buffers, only valid during callback (package:framer/unsafe.dart).

Writing a custom codec #

Implement FrameCodec<T>:

final class MyCodec implements FrameCodec<MyFrame> {
  bool _inProgress = false;

  @override
  bool get hasPending => _inProgress;

  @override
  MyFrame? tryDecode(RingBuffer rb) {
    // 1. Parse header — return null if not enough bytes yet.
    // 2. Validate limits (throw FrameTooLargeException).
    // 3. Read payload bytes, spilling to internal buffer if ring is full.
    // 4. Return the complete frame.
  }
}

The contract #

  • Return a frame when bytes are complete (consume exactly those bytes).
  • Return null when incomplete. You may still consume bytes into internal state.
  • Never stall: if you return null and the ring is full, you must have consumed/drained something, or the driver throws CodecStalledError.
  • Throw typed exceptions: MalformedFrameException for bad input, FrameTooLargeException for policy violations.

EOF finalization (opt-in) #

If your protocol treats EOF as a legitimate terminator (e.g., JSON Lines without a trailing newline), mix in FinalizableCodec:

final class JsonLinesCodec
    with FinalizableCodec<String>
    implements FrameCodec<String> {
  // ... tryDecode, hasPending ...

  @override
  String? tryFinalize(RingBuffer rb) {
    // Flush any buffered trailing line at EOF.
  }
}

Codecs that don't need it simply don't mix it in — no boilerplate.

Testing with CodecTestHarness #

test/codec_test_harness.dart is a test-only helper you can copy into your own test suite. It fuzzes codecs with adversarial chunking:

final harness = CodecTestHarness<MyFrame>(
  codec: MyCodec(),
  capacity: 16, // small capacity to stress spill/drain
);

final frames = await harness.decodeFromBytes(wireBytes, chunkMax: 3, seed: 42);
expect(frames, hasLength(expectedCount));

Performance #

Not benchmarketing, just honest numbers. M1 MacBook Pro, 20k 64-byte length-prefixed frames:

dart run tool/bench.dart --frames=20000 --payload=64 --chunkMax=8192 --baseline --minMs=300
Delimiter (into)               642.4 MiB/s
Delimiter (borrowed into)      742.6 MiB/s
Delimiter (BytesBuilder)        82.9 MiB/s   ← the "normal" way

U32BE (into)                  1980.0 MiB/s
U32BE (borrowed into)         2174.2 MiB/s

The BytesBuilder baseline is the code everyone writes by hand. framer is ~8x faster on delimiter framing — and it's correct.

Honest note: decodeFrames (the async* API) won't beat a tight synchronous loop for millions of tiny frames, because yield overhead dominates. The win is correctness + ergonomics, and the decodeFramesInto, Framer or the core ring-buffer path is fast when you need it.

Imports #

// Everything you need for safe, idiomatic framing:
import 'package:framer/framer.dart';

// Pooled buffers (safe, explicit reuse):
import 'package:framer/extra.dart';

// Zero-copy borrowed views (unsafe, read the docs):
import 'package:framer/unsafe.dart';

The split is intentional. package:framer/framer.dart never exposes borrowed or pooling APIs, so you can't adopt them by accident.

License #

MIT — see LICENSE.

1
likes
160
points
111
downloads

Publisher

verified publishergetx.site

Weekly Downloads

Incremental framing for Stream<List<int>> with hard progress guarantees, spill/drain-safe codecs, and a fast ring buffer.

Repository (GitHub)
View/report issues
Contributing

Topics

#parsing #streaming #protocols #sse #websocket

Documentation

API reference

License

MIT (license)

More

Packages that depend on framer