MockForge Kafka
Kafka protocol support for MockForge with full broker simulation, topic management, and consumer group coordination.
This crate provides comprehensive Kafka mocking capabilities, allowing you to simulate Apache Kafka brokers for testing event-driven applications. Perfect for testing Kafka producers, consumers, and stream processing applications without requiring a full Kafka cluster.
Features
- Full Kafka Protocol: Support for 10+ Kafka APIs (Produce, Fetch, Metadata, etc.)
- Broker Simulation: Complete Kafka broker implementation without external dependencies
- Topic Management: Dynamic topic creation, deletion, and configuration
- Partition Handling: Multi-partition topics with proper offset management
- Consumer Groups: Simulate consumer group coordination and rebalancing
- Message Fixtures: YAML-based message templates and auto-production
- Metrics & Monitoring: Comprehensive metrics with Prometheus integration
- Protocol Compliance: Full Kafka protocol v2.8+ compatibility
Quick Start
Basic Kafka Broker
use KafkaMockBroker;
use KafkaConfig;
async
Testing with Kafka Clients
use ClientConfig;
use ;
use Duration;
async
Core Components
KafkaMockBroker
The main broker implementation that handles all Kafka protocol operations:
use KafkaMockBroker;
use KafkaConfig;
let config = KafkaConfig ;
let broker = new.await?;
broker.start.await?;
Topic Management
Create and manage Kafka topics dynamically:
use ;
// Create a topic with specific configuration
let topic_config = TopicConfig ;
let topic = new;
// Topics are automatically created when first accessed
// or can be pre-created through the broker API
Message Production
Handle produce requests with full protocol compliance:
use KafkaMessage;
// Create messages for production
let messages = vec!;
// Messages are automatically routed to appropriate partitions
// based on key hashing (if key provided) or round-robin
Consumer Groups
Simulate consumer group behavior and coordination:
use ;
// Create consumer group manager
let group_manager = new;
// Consumer groups are automatically managed when consumers join
// Partition assignment follows Kafka's standard algorithms
let group = new;
// Group handles partition rebalancing when members join/leave
Fixture System
Define message templates and auto-production rules using YAML:
# kafka-fixture.yaml
topics:
- name: "user-events"
partitions: 3
config:
retention.ms: "604800000" # 7 days
- name: "order-events"
partitions: 2
fixtures:
- topic: "user-events"
key_template: "{{uuid}}"
value_template: |
{
"user_id": "{{uuid}}",
"action": "{{random_element 'login' 'logout' 'signup' 'update_profile'}}",
"timestamp": "{{now}}",
"metadata": {
"source": "web",
"version": "1.0"
}
}
headers:
content-type: "application/json"
auto_produce:
- topic: "user-events"
rate_per_second: 5
duration_seconds: 300 # 5 minutes
key_template: "{{uuid}}"
value_template: |
{
"event_type": "heartbeat",
"service": "user-service",
"timestamp": "{{now}}"
}
- topic: "order-events"
rate_per_second: 2
duration_seconds: 600 # 10 minutes
key_template: "order-{{sequence}}"
value_template: |
{
"order_id": "{{sequence}}",
"user_id": "{{uuid}}",
"amount": {{float_range 10.0 1000.0}},
"items": {{int_range 1 10}},
"status": "created",
"created_at": "{{now}}"
}
Loading Fixtures
use ;
// Create broker with fixture support
let spec_registry = new;
let broker = with_registry.await?;
// Load fixtures from file
broker.load_fixtures_from_file.await?;
// Or create fixtures programmatically
use ;
let fixture = KafkaFixture ;
broker.add_fixture.await?;
Supported Kafka APIs
MockForge Kafka implements the following Kafka protocol APIs:
- Produce (API 0): Message production with acknowledgments
- Fetch (API 1): Message consumption with offset management
- Metadata (API 3): Topic and broker metadata discovery
- ListGroups (API 9): Consumer group listing
- DescribeGroups (API 15): Consumer group details and member information
- ApiVersions (API 18): Protocol version negotiation
- CreateTopics (API 19): Dynamic topic creation
- DeleteTopics (API 20): Topic deletion
- DescribeConfigs (API 32): Configuration retrieval
Metrics & Monitoring
Prometheus Metrics
Comprehensive metrics exported in Prometheus format:
use MetricsExporter;
// Create metrics exporter
let exporter = new;
// Export current metrics
let metrics = exporter.export_prometheus.await?;
println!;
// Sample metrics:
// kafka_requests_total{api="produce"} 150
// kafka_messages_produced_total{topic="user-events"} 1000
// kafka_consumer_groups_total 5
// kafka_connections_active 12
Metrics Categories
- Request Metrics: Total requests, errors, latency by API
- Message Metrics: Messages produced/consumed by topic
- Connection Metrics: Active connections, connection rate
- Consumer Group Metrics: Group count, partition assignments
- Topic Metrics: Topic count, partition count, message count
Configuration
KafkaConfig
use KafkaConfig;
let config = KafkaConfig ;
Environment Variables
# Server configuration
# Topic defaults
# Performance
Testing Examples
Producer Testing
use ClientConfig;
use ;
use Duration;
async
Consumer Testing
use ClientConfig;
use ;
use Message;
use StreamExt;
async
Consumer Group Testing
use ClientConfig;
use ;
async
Performance
MockForge Kafka is optimized for testing scenarios:
- In-Memory Storage: Fast message operations without disk persistence
- Concurrent Connections: Handle multiple simultaneous Kafka clients
- Low Latency: Minimal overhead for message operations
- Scalable: Support for high-throughput testing scenarios
- Resource Efficient: Configurable memory limits and cleanup
Integration with MockForge
MockForge Kafka integrates seamlessly with the MockForge ecosystem:
- MockForge Core: Shared configuration and logging
- MockForge CLI: Command-line Kafka broker management
- MockForge Data: Enhanced message generation with templates
- MockForge Observability: Metrics and tracing integration
Troubleshooting
Common Issues
Connection refused:
- Ensure broker is started and listening on correct port
- Check firewall settings and port availability
- Verify client configuration (bootstrap servers)
Messages not consumed:
- Check consumer group configuration
- Verify topic exists (auto-create may be disabled)
- Check offset reset policy (earliest/latest)
High latency:
- Adjust broker thread count for better concurrency
- Check system resources (CPU, memory)
- Review message size and batch settings
Protocol errors:
- Ensure client and broker use compatible Kafka versions
- Check message format and serialization
- Verify topic and partition configurations
Examples
See the examples directory for complete working examples including:
- Basic Kafka broker setup
- Producer/consumer testing patterns
- Consumer group coordination
- Fixture-driven message generation
- Load testing scenarios
Related Crates
mockforge-core: Core mocking functionalityrdkafka: Kafka client library for testing
License
Licensed under MIT OR Apache-2.0