use crate::MqttCommands;
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;
pub async fn handle_mqtt_command(
mqtt_command: MqttCommands,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match mqtt_command {
MqttCommands::Publish {
host,
port,
topic,
payload,
qos,
retain,
} => {
handle_publish_command(host, port, topic, payload, qos, retain).await?;
}
MqttCommands::Subscribe {
host,
port,
topic,
qos,
} => {
handle_subscribe_command(host, port, topic, qos).await?;
}
MqttCommands::Topics { topics_command } => {
handle_topics_command(topics_command).await?;
}
MqttCommands::Fixtures { fixtures_command } => {
handle_fixtures_command(fixtures_command).await?;
}
MqttCommands::Clients { clients_command } => {
handle_clients_command(clients_command).await?;
}
}
Ok(())
}
async fn handle_publish_command(
host: String,
port: u16,
topic: String,
payload: String,
qos: u8,
retain: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Connecting to MQTT broker at {}:{}...", host, port);
let mut mqtt_options = MqttOptions::new("mockforge-cli", host, port);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
let qos_level = match qos {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => return Err("Invalid QoS level. Must be 0, 1, or 2".into()),
};
println!("📤 Publishing to topic '{}' with QoS {}...", topic, qos);
client.publish(topic.clone(), qos_level, retain, payload.clone()).await?;
println!("✅ Published to topic '{}': {}", topic, payload);
println!(" QoS: {}, Retain: {}", qos, retain);
tokio::time::sleep(Duration::from_millis(500)).await;
println!("🔌 Disconnecting...");
client.disconnect().await?;
println!("✅ Disconnected successfully");
Ok(())
}
async fn handle_subscribe_command(
host: String,
port: u16,
topic: String,
qos: u8,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Connecting to MQTT broker at {}:{}...", host, port);
println!("Subscribing to topic '{}' with QoS {}", topic, qos);
let mut mqtt_options = MqttOptions::new("mockforge-cli-subscriber", host, port);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);
let qos_level = match qos {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => return Err("Invalid QoS level. Must be 0, 1, or 2".into()),
};
client.subscribe(topic.clone(), qos_level).await?;
println!("✅ Subscribed to topic '{}'", topic);
println!("Listening for messages... (Press Ctrl+C to stop)");
loop {
match eventloop.poll().await {
Ok(notification) => {
match notification {
rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) => {
let payload = String::from_utf8_lossy(&publish.payload);
println!("📨 [{}] {}", publish.topic, payload);
}
rumqttc::Event::Incoming(rumqttc::Packet::SubAck(_)) => {
println!("✅ Subscription acknowledged");
}
rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) => {
println!("✅ Connected to broker");
}
_ => {} }
}
Err(e) => {
eprintln!("Connection error: {}", e);
break;
}
}
}
Ok(())
}
async fn handle_topics_command(
topics_command: crate::MqttTopicsCommands,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match topics_command {
crate::MqttTopicsCommands::List => {
handle_topics_list().await?;
}
crate::MqttTopicsCommands::ClearRetained => {
handle_topics_clear_retained().await?;
}
}
Ok(())
}
async fn handle_topics_list() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("📋 Listing active MQTT topics...");
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client.get(format!("{}/mqtt/topics", management_url)).send().await {
Ok(response) => {
if response.status().is_success() {
let topics: Vec<String> = response.json().await?;
if topics.is_empty() {
println!("📭 No active MQTT topics found");
} else {
println!("📬 Found {} active topics:", topics.len());
println!("{:<50} {}", "Topic", "Type");
println!("{}", "-".repeat(70));
for topic in topics {
println!("{:<50} {}", topic, "subscription/retained");
}
}
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to list topics: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_topics_clear_retained() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("🧹 Clearing retained MQTT messages...");
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client.delete(format!("{}/mqtt/topics/retained", management_url)).send().await {
Ok(response) => {
if response.status().is_success() {
println!("✅ Retained messages cleared successfully");
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to clear retained messages: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_fixtures_command(
fixtures_command: crate::MqttFixturesCommands,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match fixtures_command {
crate::MqttFixturesCommands::Load { path } => {
handle_fixtures_load(path).await?;
}
crate::MqttFixturesCommands::StartAutoPublish => {
handle_fixtures_start_auto_publish().await?;
}
crate::MqttFixturesCommands::StopAutoPublish => {
handle_fixtures_stop_auto_publish().await?;
}
}
Ok(())
}
async fn handle_fixtures_load(
path: std::path::PathBuf,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("📁 Loading MQTT fixtures from: {}", path.display());
if !path.exists() {
return Err(format!("Fixtures path does not exist: {}", path.display()).into());
}
let mut fixtures = Vec::new();
let mut loaded_count = 0;
for entry in std::fs::read_dir(&path)? {
let entry = entry?;
let file_path = entry.path();
if file_path.is_file() {
if let Some(extension) = file_path.extension() {
if extension == "json" || extension == "yaml" || extension == "yml" {
match std::fs::read_to_string(&file_path) {
Ok(content) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(fixture) => {
fixtures.push(fixture);
loaded_count += 1;
println!(" ✓ Loaded fixture from {}", file_path.display());
}
Err(e) => {
eprintln!(
" ⚠️ Failed to parse fixture {}: {}",
file_path.display(),
e
);
}
},
Err(e) => {
eprintln!(
" ⚠️ Failed to read fixture {}: {}",
file_path.display(),
e
);
}
}
}
}
}
}
if fixtures.is_empty() {
println!("⚠️ No valid fixtures found in {}", path.display());
return Ok(());
}
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client
.post(format!("{}/mqtt/fixtures", management_url))
.json(&fixtures)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
println!(
"✅ Successfully loaded {} MQTT fixtures into broker registry",
loaded_count
);
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to load fixtures: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_fixtures_start_auto_publish() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
println!("▶️ Starting auto-publish for all MQTT fixtures...");
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client
.post(format!("{}/mqtt/fixtures/auto-publish/start", management_url))
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
println!("✅ Auto-publish started for all MQTT fixtures");
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to start auto-publish: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_fixtures_stop_auto_publish() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
println!("⏹️ Stopping auto-publish for all MQTT fixtures...");
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client
.post(format!("{}/mqtt/fixtures/auto-publish/stop", management_url))
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
println!("✅ Auto-publish stopped for all MQTT fixtures");
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to stop auto-publish: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_clients_command(
clients_command: crate::MqttClientsCommands,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match clients_command {
crate::MqttClientsCommands::List => {
handle_clients_list().await?;
}
crate::MqttClientsCommands::Disconnect { client_id } => {
handle_clients_disconnect(client_id).await?;
}
}
Ok(())
}
async fn handle_clients_list() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("👥 Listing connected MQTT clients...");
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client.get(format!("{}/mqtt/clients", management_url)).send().await {
Ok(response) => {
if response.status().is_success() {
let clients: Vec<serde_json::Value> = response.json().await?;
if clients.is_empty() {
println!("📭 No connected MQTT clients");
} else {
println!("📬 Found {} connected clients:", clients.len());
println!("{:<30} {:<20} {}", "Client ID", "Connected At", "Subscriptions");
println!("{}", "-".repeat(80));
for client_info in clients {
let client_id = client_info["client_id"].as_str().unwrap_or("N/A");
let connected_at = client_info["connected_at"].as_str().unwrap_or("N/A");
let subscriptions = client_info["subscriptions"]
.as_array()
.map(|subs| subs.len().to_string())
.unwrap_or_else(|| "N/A".to_string());
println!("{:<30} {:<20} {}", client_id, connected_at, subscriptions);
}
}
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("❌ MQTT broker management API not available");
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to list clients: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}
async fn handle_clients_disconnect(
client_id: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("🔌 Disconnecting MQTT client: {}", client_id);
let client = reqwest::Client::new();
let management_url = std::env::var("MOCKFORGE_MANAGEMENT_URL")
.unwrap_or_else(|_| "http://localhost:8080/__mockforge/api".to_string());
match client
.delete(format!("{}/mqtt/clients/{}", management_url, urlencoding::encode(&client_id)))
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
println!("✅ Client '{}' disconnected successfully", client_id);
} else if response.status() == reqwest::StatusCode::NOT_FOUND {
println!(
"❌ Client '{}' not found or MQTT broker management API not available",
client_id
);
println!(" Make sure MockForge server is running with MQTT support");
} else {
println!("❌ Failed to disconnect client: HTTP {}", response.status());
}
}
Err(e) => {
println!("❌ Failed to connect to management API: {}", e);
println!(" Make sure MockForge server is running on {}", management_url);
}
}
Ok(())
}