From cff075913095b057bce0772a507b191a7e69578a Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Fri, 11 Jul 2025 17:03:19 +0200 Subject: [PATCH] Websocket client: propagate decoding errors to application --- .../bin_node/lib_dev/websocket_client.ml | 64 ++++++++++++++++--- .../bin_node/lib_dev/websocket_client.mli | 18 ++++-- .../bin_outbox_monitor/etherlink_monitor.ml | 13 ++-- .../fa-bridge-watchtower/etherlink_monitor.ml | 7 +- 4 files changed, 81 insertions(+), 21 deletions(-) diff --git a/etherlink/bin_node/lib_dev/websocket_client.ml b/etherlink/bin_node/lib_dev/websocket_client.ml index 57983b907af1..351fac871638 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.ml +++ b/etherlink/bin_node/lib_dev/websocket_client.ml @@ -144,6 +144,8 @@ end type error += | No_response of JSONRPC.request | Request_failed of JSONRPC.request * JSONRPC.error + | Cannot_destruct of + Ethereum_types.Subscription.kind * Ethereum_types.Subscription.id * string let () = register_error_kind @@ -178,7 +180,31 @@ let () = (req "request" JSONRPC.request_encoding) (req "error" JSONRPC.error_encoding)) (function Request_failed (r, e) -> Some (r, e) | _ -> None) - (fun (r, e) -> Request_failed (r, e)) + (fun (r, e) -> Request_failed (r, e)) ; + register_error_kind + `Temporary + ~id:"websocket_client.cannot_destruct" + ~title:"Cannot destruct subscription notification" + ~description:"Cannot destruct subscription notification." + ~pp:(fun ppf (k, id, err) -> + let (Ethereum_types.Subscription.Id (Hex id)) = id in + Format.fprintf + ppf + "Cannot destruct notification for subscription %s of id 0x%s: %s" + (Ezjsonm.value_to_string + ~minify:true + (Data_encoding.Json.construct + Ethereum_types.Subscription.kind_encoding + k)) + id + err) + Data_encoding.( + obj3 + (req "kind" Ethereum_types.Subscription.kind_encoding) + (req "id" Ethereum_types.Subscription.id_encoding) + (req "err" string)) + (function Cannot_destruct (k, id, e) -> Some (k, id, e) | _ -> None) + (fun (k, id, e) -> Cannot_destruct (k, id, e)) module Websocket_lwt_unix = struct include Websocket_lwt_unix @@ -465,12 +491,28 @@ let subscribe client (kind : Ethereum_types.Subscription.kind) = in let stream, push = Lwt_stream.create () in let push x = - push - (Option.map - (Data_encoding.Json.destruct - (Ethereum_types.Subscription.output_encoding - Transaction_object.encoding)) - x) + let v = + match x with + | None -> None + | Some x -> ( + try + Data_encoding.Json.destruct + (Ethereum_types.Subscription.output_encoding + Transaction_object.encoding) + x + |> Result.ok |> Option.some + with e -> + let err = + Format.asprintf + "%a" + (Json_encoding.print_error ?print_unknown:None) + e + in + Some + (Result_syntax.tzfail + (Cannot_destruct (kind, subscription_id, err)))) + in + push v in Subscription_table.replace client.subscriptions subscription_id push ; let unsubscribe () = @@ -485,7 +527,13 @@ let subscribe client (kind : Ethereum_types.Subscription.kind) = let subscribe_filter client kind filter = let open Lwt_result_syntax in let+ {stream; unsubscribe} = subscribe client kind in - let stream = Lwt_stream.filter_map filter stream in + let stream = + Lwt_stream.filter_map + (function + | Ok x -> ( match filter x with Some x -> Some (Ok x) | None -> None) + | Error e -> Some (Error e)) + stream + in {stream; unsubscribe} let subscribe_newHeads client = diff --git a/etherlink/bin_node/lib_dev/websocket_client.mli b/etherlink/bin_node/lib_dev/websocket_client.mli index 7324e1c29185..49aa7017802f 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.mli +++ b/etherlink/bin_node/lib_dev/websocket_client.mli @@ -16,6 +16,8 @@ type t type error += | No_response of JSONRPC.request | Request_failed of JSONRPC.request * JSONRPC.error + | Cannot_destruct of + Ethereum_types.Subscription.kind * Ethereum_types.Subscription.id * string (** Subscriptions returned by [subscribe]. *) type 'a subscription = { @@ -57,23 +59,26 @@ val send_jsonrpc : t -> ('input, 'output) call -> 'output tzresult Lwt.t val subscribe : t -> Subscribe.input -> - Transaction_object.t Ethereum_types.Subscription.output subscription tzresult + Transaction_object.t Ethereum_types.Subscription.output tzresult subscription + tzresult Lwt.t (** [subscribe_newHeads client] is like [subscribe] but specialized for newHeads events. *) val subscribe_newHeads : - t -> Transaction_object.t Ethereum_types.block subscription tzresult Lwt.t + t -> + Transaction_object.t Ethereum_types.block tzresult subscription tzresult Lwt.t (** [subscribe_newPendingTransactions client] is like [subscribe] but specialized for newPendingTransactions events. *) val subscribe_newPendingTransactions : - t -> Ethereum_types.hash subscription tzresult Lwt.t + t -> Ethereum_types.hash tzresult subscription tzresult Lwt.t (** [subscribe_syncing client] is like [subscribe] but specialized for syncing events. *) val subscribe_syncing : - t -> Ethereum_types.Subscription.sync_output subscription tzresult Lwt.t + t -> + Ethereum_types.Subscription.sync_output tzresult subscription tzresult Lwt.t (** [subscribe_logs ?address ?topics client] is like [subscribe] but specialized for logs events filtered by [address] and/or [topics]. *) @@ -81,10 +86,11 @@ val subscribe_logs : ?address:Ethereum_types.Filter.filter_address -> ?topics:Ethereum_types.Filter.topic option list -> t -> - Ethereum_types.transaction_log subscription tzresult Lwt.t + Ethereum_types.transaction_log tzresult subscription tzresult Lwt.t (** Subscribe to L1/L2 levels associations. *) val subscribe_l1_l2_levels : ?start_l1_level:int32 -> t -> - Ethereum_types.Subscription.l1_l2_levels_output subscription tzresult Lwt.t + Ethereum_types.Subscription.l1_l2_levels_output tzresult subscription tzresult + Lwt.t diff --git a/etherlink/bin_outbox_monitor/etherlink_monitor.ml b/etherlink/bin_outbox_monitor/etherlink_monitor.ml index 379dd4e8426a..82ecd5ed2331 100644 --- a/etherlink/bin_outbox_monitor/etherlink_monitor.ml +++ b/etherlink/bin_outbox_monitor/etherlink_monitor.ml @@ -506,8 +506,10 @@ let parse_log (log : Ethereum_types.transaction_log) = (parsed_log_to_db log (Fast_FA_withdrawal withdraw_data)) | None -> return_none))) -let handle_one_log ws_client db (log : Ethereum_types.transaction_log) = +let handle_one_log ws_client db (log : Ethereum_types.transaction_log tzresult) + = let open Lwt_result_syntax in + let*? log in let*! () = Event.(emit transaction_log) log in let withdrawal = parse_log log in match withdrawal with @@ -610,7 +612,7 @@ let rec get_logs db ws_client ~from_block ~to_block = (function | Filter.Block_filter _ | Pending_transaction_filter _ -> return_unit - | Log log -> handle_one_log ws_client db log) + | Log log -> handle_one_log ws_client db (Ok log)) logs | Error (Filter_helpers.Too_many_logs {limit} :: _ as e) when Z.equal from_z to_z -> @@ -694,7 +696,8 @@ let monitor_heads db ws_client = let* heads_subscription = Websocket_client.subscribe_newHeads ws_client in let* () = lwt_stream_iter_es - (fun (b : _ Ethereum_types.block) -> + (fun (b : _ Ethereum_types.block tzresult) -> + let*? b in let*! () = Event.(emit new_etherlink_head) b.number in Db.Pointers.L2_head.set db b.number) heads_subscription.stream @@ -714,8 +717,8 @@ let monitor_l2_l1_levels db ws_client ~rollup_node_rpc ~l1_node_rpc in let* () = lwt_stream_iter_es - (fun ({l1_level; start_l2_level; end_l2_level} : - Ethereum_types.Subscription.l1_l2_levels_output) -> + (fun (l1l2 : Ethereum_types.Subscription.l1_l2_levels_output tzresult) -> + let*? {l1_level; start_l2_level; end_l2_level} = l1l2 in let*! () = if Ethereum_types.Qty.(start_l2_level = end_l2_level) then Event.(emit empty_l1_level) l1_level diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index 27989d1876c1..a88e14ac5afa 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -794,7 +794,8 @@ let monitor_heads ctx = and* heads_subscription = Websocket_client.subscribe_newHeads ctx.ws_client in let* () = lwt_stream_iter_es - (fun (b : Transaction_object.t Ethereum_types.block) -> + (fun (b : Transaction_object.t Ethereum_types.block tzresult) -> + let*? b in let* last_l2_head = Db.Pointers.L2_head.get ctx.db in let expected_level = Ethereum_types.Qty.next last_l2_head in let* () = @@ -805,7 +806,9 @@ let monitor_heads ctx = ~end_block:(Ethereum_types.Qty.pred b.number) in on_new_block ctx b ~catch_up:false) - (Lwt_stream.append (Lwt_stream.return head) heads_subscription.stream) + (Lwt_stream.append + (Lwt_stream.return (Ok head)) + heads_subscription.stream) in return_unit -- GitLab