From 6107ef468fadbf0ffe4242e4ad7e90b2b6b32d4f Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Tue, 17 Jun 2025 17:43:05 +0200 Subject: [PATCH 01/10] Dal/Node/Amplificator: add some amplification events --- src/lib_dal_node/event.ml | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 714717e3fcf7..f613eb4c129e 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -753,6 +753,24 @@ open struct ~level:Info ("query_id", Data_encoding.int31) + let crypto_process_sending_reply_error = + declare_1 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"crypto_process_sending_reply_error" + ~msg:"cryptographic child process: sending reply error #{query_id}." + ~level:Warning + ("query_id", Data_encoding.int31) + + let crypto_process_error = + declare_1 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"crypto_process_error" + ~msg:"cryptographic child process error: #{error}." + ~level:Warning + ("error", Data_encoding.string) + let main_process_sending_query = declare_1 ~section:(section @ ["crypto"]) @@ -773,6 +791,16 @@ open struct ~level:Info ("query_id", Data_encoding.int31) + let main_process_received_reply_error = + declare_2 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"main_process_received_reply_error" + ~msg:"main process: received reply error on query #{query_id} : {error}." + ~level:Warning + ("query_id", Data_encoding.int31) + ("error", Data_encoding.string) + let main_process_enqueue_query = declare_1 ~section:(section @ ["crypto"]) @@ -1326,12 +1354,20 @@ let emit_crypto_process_received_query ~query_id = let emit_crypto_process_sending_reply ~query_id = emit crypto_process_sending_reply query_id +let emit_crypto_process_sending_reply_error ~query_id = + emit crypto_process_sending_reply_error query_id + +let emit_crypto_process_error ~msg = emit crypto_process_error msg + let emit_main_process_sending_query ~query_id = emit main_process_sending_query query_id let emit_main_process_received_reply ~query_id = emit main_process_received_reply query_id +let emit_main_process_received_reply_error ~query_id ~msg = + emit main_process_received_reply_error (query_id, msg) + let emit_main_process_enqueue_query ~query_id = emit main_process_enqueue_query query_id -- GitLab From 31be54d2d17b424b6db2016a94fa46e846d5c850 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Wed, 18 Jun 2025 17:52:13 +0200 Subject: [PATCH 02/10] Dal/Node/Amplficator: do not fail on error --- src/lib_dal_node/amplificator.ml | 184 +++++++++++++++++++++++-------- 1 file changed, 135 insertions(+), 49 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index f478c8925e28..12f420ba8e14 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -192,12 +192,28 @@ module Reconstruction_process_worker = struct let reconstruct_process_worker ic oc (cryptobox, shards_proofs_precomputation) = let open Lwt_result_syntax in + let reply_success ~oc ~query_id ~proved_shards_encoded = + (* Sends back the proved_shards_encoded to the main dal process *) + let*! () = Event.emit_crypto_process_sending_reply ~query_id in + let*! () = Lwt_io.write_int oc query_id in + let*! _ = Process_worker.write_message oc (Bytes.of_string "OK") in + let*! _ = Process_worker.write_message oc proved_shards_encoded in + Lwt.return_unit + in + let reply_error_query ~oc ~query_id ~error = + (* Sends back the proved_shards_encoded to the main dal process *) + let*! () = Event.emit_crypto_process_sending_reply_error ~query_id in + let*! () = Lwt_io.write_int oc query_id in + let*! _ = Process_worker.write_message oc (Bytes.of_string "ERR") in + let bytes = Bytes.of_string error in + let*! _ = Process_worker.write_message oc bytes in + Lwt.return_unit + in (* Read init message from parent with parameters required to initialize cryptobox *) let* () = read_init_message_from_parent ic in let*! () = Event.emit_crypto_process_started ~pid:(Unix.getpid ()) in - let rec loop () = - let*! query_id = Lwt_io.read_int ic in + let process_query query_id = (* Read query from main dal process *) let* bytes_shards = let* r = Process_worker.read_message ic in @@ -222,22 +238,54 @@ module Reconstruction_process_worker = struct let* proved_shards_encoded = reconstruct cryptobox shards_proofs_precomputation shards in - - (* Sends back the proved_shards_encoded to the main dal process *) - let*! () = Event.emit_crypto_process_sending_reply ~query_id in - let len = Bytes.length proved_shards_encoded in - let*! () = Lwt_io.write_int oc query_id in - let*! () = Lwt_io.write_int oc len in - let*! () = Lwt_io.write_from_exactly oc proved_shards_encoded 0 len in - loop () + return proved_shards_encoded + in + let rec loop () = + Lwt.catch + (fun () -> + (* Note: this read_int is very unlikely to appear unless obvious issue *) + let*! query_id = Lwt_io.read_int ic in + let* () = + Lwt.catch + (fun () -> + let*! r = process_query query_id in + match r with + | Ok proved_shards_encoded -> + let*! () = + reply_success ~query_id ~proved_shards_encoded ~oc + in + loop () + | Error err -> + let err = + Format.asprintf "%a" Error_monad.pp_print_trace err + in + (* send a reply with the error, and continue *) + let*! () = reply_error_query ~oc ~query_id ~error:err in + loop ()) + (function + | End_of_file -> raise End_of_file + | exn -> + let error = Printexc.to_string exn in + let*! () = reply_error_query ~oc ~query_id ~error in + return_unit) + in + return_unit) + (function + | End_of_file -> + (* Buffer was closed by the parent (normal termination) *) + return_unit + | exn -> + (* Lwt_io.read_int could fail, in this case, there is an severe + issue: pipe issue between the processes, memory full ? + In this case, we give up *) + let msg = + Format.asprintf "CRITICAL: %s" (Printexc.to_string exn) + in + let*! () = Event.emit_crypto_process_error ~msg in + fail [error_of_exn exn]) in - Lwt.catch loop (function - | End_of_file -> - (* Buffer was closed by the parent (normal termination) *) - return_unit - | exn -> - let err = [error_of_exn exn] in - Lwt.return_error err) + let* () = loop () in + return_unit end (* Serialize queries to crypto worker process while the query queue is not @@ -290,6 +338,7 @@ let reply_receiver_job {process; query_store; _} node_context = Query_store.remove query_store id ; let length = Query_store.length query_store in let () = Dal_metrics.update_amplification_queue_length length in + (* The first message should be a OK | ERR *) let* msg = let* r = Process_worker.read_message ic in match r with @@ -301,35 +350,67 @@ let reply_receiver_job {process; query_store; _} node_context = ] | `Message msg -> return msg in - let*! () = Event.emit_main_process_received_reply ~query_id:id in - let shards, shard_proofs = - Data_encoding.Binary.of_bytes_exn proved_shards_encoding msg - in - let shards = List.to_seq shards in - let* () = - Store.Shards.write_all (Store.shards node_store) slot_id shards - |> Errors.to_tzresult - in - let* () = - Slot_manager.publish_proved_shards - node_context - slot_id - ~level_committee:(Node_context.fetch_committee node_context) - proto_parameters - commitment - shards - shard_proofs - gs_worker - in - let*! () = - Event.emit_reconstruct_finished - ~level:slot_id.slot_level - ~slot_index:slot_id.slot_index - in - let duration = Unix.gettimeofday () -. reconstruction_start_time in - Dal_metrics.update_amplification_complete_duration duration ; - Dal_metrics.reconstruction_done () ; - loop () + match Bytes.to_string msg with + | "ERR" -> + let* msg = + let* r = Process_worker.read_message ic in + match r with + | `End_of_file -> + fail + [ + Amplification_reply_receiver_job + "Incomplete message received. Terminating"; + ] + | `Message msg -> return msg + in + (* The error message *) + let msg = Bytes.to_string msg in + let*! () = + Event.emit_main_process_received_reply_error ~query_id:id ~msg + in + loop () + | "OK" -> + let* msg = + let* r = Process_worker.read_message ic in + match r with + | `End_of_file -> + fail + [ + Amplification_reply_receiver_job + "Incomplete message received. Terminating"; + ] + | `Message msg -> return msg + in + let*! () = Event.emit_main_process_received_reply ~query_id:id in + let shards, shard_proofs = + Data_encoding.Binary.of_bytes_exn proved_shards_encoding msg + in + let shards = List.to_seq shards in + let* () = + Store.Shards.write_all (Store.shards node_store) slot_id shards + |> Errors.to_tzresult + in + let* () = + Slot_manager.publish_proved_shards + node_context + slot_id + ~level_committee:(Node_context.fetch_committee node_context) + proto_parameters + commitment + shards + shard_proofs + gs_worker + in + let*! () = + Event.emit_reconstruct_finished + ~level:slot_id.slot_level + ~slot_index:slot_id.slot_index + in + let duration = Unix.gettimeofday () -. reconstruction_start_time in + Dal_metrics.update_amplification_complete_duration duration ; + Dal_metrics.reconstruction_done () ; + loop () + | _ -> fail [Amplification_reply_receiver_job "Unexpected message"] in Lwt.catch loop (function (* Buffer was closed before proceeding an entire message (sigterm, etc.) *) @@ -570,9 +651,14 @@ let try_amplification commitment slot_metrics slot_id amplificator = in let t = Unix.gettimeofday () in let duration = t -. slot_metrics.Dal_metrics.time_first_shard in - (* If we have received all the shards while waiting the random - delay, there is no point in reconstructing anymore *) - if number_of_already_stored_shards = number_of_shards then ( + (* There is no point trying a reconstruction if we have not received + enough shards. + If we have received all the shards while waiting the random + delay, there is no point in reconstructing either. *) + if + number_of_already_stored_shards < number_of_needed_shards + || number_of_already_stored_shards = number_of_shards + then ( Dal_metrics.update_amplification_abort_reconstruction_duration duration ; let*! () = Event.emit_reconstruct_no_missing_shard -- GitLab From e666b3bc102e202b1d787788f638c5544b20bb74 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Wed, 18 Jun 2025 17:51:36 +0200 Subject: [PATCH 03/10] Dal/Node/Amplificator: use welcome constant --- src/lib_dal_node/amplificator.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 12f420ba8e14..b42b412cb1e9 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -515,7 +515,7 @@ let make node_ctxt = in let* () = let oc = Process_worker.output_channel amplificator.process in - let* r = Process_worker.write_message oc (Bytes.of_string "0 Ready") in + let* r = Process_worker.write_message oc welcome in match r with | `End_of_file -> fail -- GitLab From 47cf19c43735a35b1413f94ed9f3e79fbcb53fb4 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:51:38 +0200 Subject: [PATCH 04/10] Dal/Node/Amplificator: move emit_crypto_process_error --- src/lib_dal_node/event.ml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index f613eb4c129e..83a5df1183f1 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -726,6 +726,15 @@ open struct ~level:Notice ("pid", Data_encoding.int31) + let crypto_process_error = + declare_1 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"crypto_process_error" + ~msg:"cryptographic child process error: #{error}." + ~level:Warning + ("error", Data_encoding.string) + let amplificator_uninitialized = declare_0 ~section:(section @ ["crypto"]) @@ -762,15 +771,6 @@ open struct ~level:Warning ("query_id", Data_encoding.int31) - let crypto_process_error = - declare_1 - ~section:(section @ ["crypto"]) - ~prefix_name_with_section:true - ~name:"crypto_process_error" - ~msg:"cryptographic child process error: #{error}." - ~level:Warning - ("error", Data_encoding.string) - let main_process_sending_query = declare_1 ~section:(section @ ["crypto"]) @@ -1346,6 +1346,8 @@ let emit_store_upgrade_error () = emit store_upgrade_error () let emit_crypto_process_started ~pid = emit crypto_process_started pid +let emit_crypto_process_error ~msg = emit crypto_process_error msg + let emit_amplificator_uninitialized () = emit amplificator_uninitialized () let emit_crypto_process_received_query ~query_id = @@ -1357,8 +1359,6 @@ let emit_crypto_process_sending_reply ~query_id = let emit_crypto_process_sending_reply_error ~query_id = emit crypto_process_sending_reply_error query_id -let emit_crypto_process_error ~msg = emit crypto_process_error msg - let emit_main_process_sending_query ~query_id = emit main_process_sending_query query_id -- GitLab From 40cc81d9dfdf9fc45530c4d4d4cc764d86fbb251 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:52:39 +0200 Subject: [PATCH 05/10] Dal/Node/Amplficator: add new amplificator events --- src/lib_dal_node/event.ml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 83a5df1183f1..24d61499f4c7 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -726,6 +726,24 @@ open struct ~level:Notice ("pid", Data_encoding.int31) + let crypto_process_stopped = + declare_0 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"crypto_process_stopped" + ~msg:"cryptographic child process stopped gracefully" + ~level:Notice + () + + let crypto_process_fatal = + declare_1 + ~section:(section @ ["crypto"]) + ~prefix_name_with_section:true + ~name:"crypto_process_fatal" + ~msg:"cryptographic child process terminated unexpectedly: #{error}." + ~level:Error + ("error", Data_encoding.string) + let crypto_process_error = declare_1 ~section:(section @ ["crypto"]) @@ -1346,8 +1364,12 @@ let emit_store_upgrade_error () = emit store_upgrade_error () let emit_crypto_process_started ~pid = emit crypto_process_started pid +let emit_crypto_process_stopped () = emit crypto_process_stopped () + let emit_crypto_process_error ~msg = emit crypto_process_error msg +let emit_crypto_process_fatal ~msg = emit crypto_process_fatal msg + let emit_amplificator_uninitialized () = emit amplificator_uninitialized () let emit_crypto_process_received_query ~query_id = -- GitLab From bc9316a7e7e97b695b84d1319aeac55001d36df4 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:53:49 +0200 Subject: [PATCH 06/10] Dal/Node/Amplificator: consider lwt cancelled as a normal termination --- src/lib_dal_node/amplificator.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index b42b412cb1e9..12e693bdf7bf 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -271,6 +271,8 @@ module Reconstruction_process_worker = struct in return_unit) (function + | Lwt.Canceled + (* if terminated by direct signal (normal termination) *) | End_of_file -> (* Buffer was closed by the parent (normal termination) *) return_unit -- GitLab From 0973cf2f8d97127440adf0cac2c82c5d78aefb47 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:54:12 +0200 Subject: [PATCH 07/10] Dal/Node/Amplificator: emit event on normal termination --- src/lib_dal_node/amplificator.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 12e693bdf7bf..822777a7c33f 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -275,6 +275,7 @@ module Reconstruction_process_worker = struct (* if terminated by direct signal (normal termination) *) | End_of_file -> (* Buffer was closed by the parent (normal termination) *) + let*! () = Event.emit_crypto_process_stopped () in return_unit | exn -> (* Lwt_io.read_int could fail, in this case, there is an severe -- GitLab From 79a9ca07d26ee70aaee315301566190483820d95 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:54:47 +0200 Subject: [PATCH 08/10] Dal/Node/Amplificator: reduce size of the logging --- src/lib_dal_node/amplificator.ml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 822777a7c33f..e3a780456a81 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -281,9 +281,7 @@ module Reconstruction_process_worker = struct (* Lwt_io.read_int could fail, in this case, there is an severe issue: pipe issue between the processes, memory full ? In this case, we give up *) - let msg = - Format.asprintf "CRITICAL: %s" (Printexc.to_string exn) - in + let msg = Printexc.to_string exn in let*! () = Event.emit_crypto_process_error ~msg in fail [error_of_exn exn]) in -- GitLab From 8a5e8e28147463f75cf140c5ffae44c2fcd525b8 Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 16:55:32 +0200 Subject: [PATCH 09/10] Dal/Node/Amplificator: emit an event in case of protocol error --- src/lib_dal_node/amplificator.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index e3a780456a81..1c5cf93930eb 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -411,7 +411,9 @@ let reply_receiver_job {process; query_store; _} node_context = Dal_metrics.update_amplification_complete_duration duration ; Dal_metrics.reconstruction_done () ; loop () - | _ -> fail [Amplification_reply_receiver_job "Unexpected message"] + | _ -> + let*! () = Event.emit_crypto_process_error ~msg:"Unexpected message" in + fail [Amplification_reply_receiver_job "Unexpected message"] in Lwt.catch loop (function (* Buffer was closed before proceeding an entire message (sigterm, etc.) *) -- GitLab From cf2392ab7eb0db186449f63dea9b15d7dfac130a Mon Sep 17 00:00:00 2001 From: Guillaume Bau Date: Thu, 26 Jun 2025 17:10:50 +0200 Subject: [PATCH 10/10] Dal/Node/Amplificator: add event on fatal error --- src/lib_dal_node/amplificator.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib_dal_node/amplificator.ml b/src/lib_dal_node/amplificator.ml index 1c5cf93930eb..c4e461a167cd 100644 --- a/src/lib_dal_node/amplificator.ml +++ b/src/lib_dal_node/amplificator.ml @@ -421,6 +421,7 @@ let reply_receiver_job {process; query_store; _} node_context = (* Unknown exception *) | exn -> let err = [error_of_exn exn] in + let*! () = Event.emit_crypto_process_fatal ~msg:"Unexpected error" in Lwt.return (Error err)) let determine_amplification_delays node_ctxt = -- GitLab