diff --git a/etherlink/bin_node/lib_dev/websocket_client.ml b/etherlink/bin_node/lib_dev/websocket_client.ml index 5333f28e9252601a1fd96dca2e967a3d59741096..55b1baf8565954c4ab89cb334bdaf716eb513891 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.ml +++ b/etherlink/bin_node/lib_dev/websocket_client.ml @@ -509,7 +509,8 @@ let send_jsonrpc : | Ok response -> return @@ Data_encoding.Json.destruct M.output_encoding response -let subscribe client (kind : Ethereum_types.Subscription.kind) = +let subscribe' client ~output_encoding (kind : Ethereum_types.Subscription.kind) + = let open Lwt_result_syntax in let* subscription_id = send_jsonrpc client (Call ((module Subscribe), kind)) @@ -521,10 +522,7 @@ let subscribe client (kind : Ethereum_types.Subscription.kind) = | None -> None | Some x -> ( try - Data_encoding.Json.destruct - (Ethereum_types.Subscription.output_encoding - Transaction_object.encoding) - x + Data_encoding.Json.destruct output_encoding x |> Result.ok |> Option.some with e -> let err = @@ -549,6 +547,11 @@ let subscribe client (kind : Ethereum_types.Subscription.kind) = in return {stream; unsubscribe} +let subscribe = + subscribe' + ~output_encoding: + (Ethereum_types.Subscription.output_encoding Transaction_object.encoding) + let subscribe_filter client kind filter = let open Lwt_result_syntax in let+ {stream; unsubscribe} = subscribe client kind in @@ -566,6 +569,14 @@ let subscribe_newHeads client = | Ethereum_types.Subscription.NewHeads h -> Some h | _ -> None +let block_just_number_encoding = + let open Data_encoding in + conv (fun n -> (n, ())) (fun (n, ()) -> n) + @@ merge_objs (obj1 (req "number" Ethereum_types.quantity_encoding)) unit + +let subscribe_newHeadNumbers client = + subscribe' client NewHeads ~output_encoding:block_just_number_encoding + let subscribe_newPendingTransactions client = subscribe_filter client NewPendingTransactions @@ function | Ethereum_types.Subscription.NewPendingTransactions tx -> Some tx diff --git a/etherlink/bin_node/lib_dev/websocket_client.mli b/etherlink/bin_node/lib_dev/websocket_client.mli index 49aa7017802f5477f0c66002e1f57d260d1578e8..d7111ee41da4e90c3d56ac1bd0d97c50cfe79000 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.mli +++ b/etherlink/bin_node/lib_dev/websocket_client.mli @@ -69,6 +69,11 @@ val subscribe_newHeads : t -> Transaction_object.t Ethereum_types.block tzresult subscription tzresult Lwt.t +(** [subscribe_newHeadNumbers client] is like [subscribe_newHeads] but only + parses numbers in blocks. *) +val subscribe_newHeadNumbers : + t -> Ethereum_types.quantity tzresult subscription tzresult Lwt.t + (** [subscribe_newPendingTransactions client] is like [subscribe] but specialized for newPendingTransactions events. *) val subscribe_newPendingTransactions : diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index cebe49d0f4fa3d88a4468f8f02e274a36373c705..dc21329c41bddccf4c5eae535a1562de60ae911b 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -344,13 +344,12 @@ module Event = struct ~pp1:Format.pp_print_string let new_etherlink_head = - declare_2 + declare_1 ~section ~name:"new_etherlink_head" - ~msg:"New etherlink head {level} ({txs} txs)" + ~msg:"New etherlink head {level}" ~level:Notice ("level", Db.quantity_hum_encoding) - ("txs", Data_encoding.int31) ~pp1:Ethereum_types.pp_quantity let catch_up = @@ -649,24 +648,18 @@ let claim_selector = let is_claim_input = String.starts_with ~prefix:claim_selector -let handle_confirmed_txs {db; ws_client; _} - (b : Transaction_object.t Ethereum_types.block) = +let handle_confirmed_txs {db; ws_client; _} number = let open Lwt_result_syntax in + let* b = + Websocket_client.send_jsonrpc + ws_client + (Call ((module Rpc_encodings.Get_block_by_number), (Number number, true))) + in let* txs = match b.transactions with - | TxFull [] | TxHash [] -> return_nil + | TxHash [] -> return [] + | TxHash _ -> assert false | TxFull txs -> return txs - | TxHash _ -> ( - let* block = - Websocket_client.send_jsonrpc - ws_client - (Call - ( (module Rpc_encodings.Get_block_by_number), - (Number b.number, true) )) - in - match block.transactions with - | TxHash _ -> assert false - | TxFull txs -> return txs) in txs |> List.iteri_es @@ fun index tx -> @@ -752,20 +745,14 @@ let claim_deposits ctx = claim ctx ~deposit_id) deposits -let on_new_block ctx ~catch_up (b : _ Ethereum_types.block) = +let on_new_block ctx ~catch_up number = let open Lwt_result_syntax in - let open Ethereum_types in - let nb_txs = - match b.transactions with - | TxHash l -> List.length l - | TxFull l -> List.length l - in - let*! () = Event.(emit new_etherlink_head) (b.number, nb_txs) in + let*! () = Event.(emit new_etherlink_head) number in (* Process logs for this block *) - let* () = get_logs ctx ~block:b.number in + let* () = get_logs ctx ~block:number in (* Notify tx queue and register claimed deposits in DB *) - let* () = handle_confirmed_txs ctx b in - let* () = Db.Pointers.L2_head.set ctx.db b.number in + let* () = handle_confirmed_txs ctx number in + let* () = Db.Pointers.L2_head.set ctx.db number in unless catch_up @@ fun () -> claim_deposits ctx let rec catch_up ctx ~from_block ~end_block = @@ -775,14 +762,7 @@ let rec catch_up ctx ~from_block ~end_block = in if Z.gt from_ end_ then return_unit else - let* block = - Websocket_client.send_jsonrpc - ctx.ws_client - (Call - ( (module Rpc_encodings.Get_block_by_number), - (Number from_block, true) )) - in - let* () = on_new_block ctx block ~catch_up:true in + let* () = on_new_block ctx from_block ~catch_up:true in catch_up ctx ~from_block:(Ethereum_types.Qty.next from_block) ~end_block let monitor_heads ctx = @@ -790,22 +770,24 @@ let monitor_heads ctx = let* head = Websocket_client.send_jsonrpc ctx.ws_client - (Call ((module Rpc_encodings.Get_block_by_number), (Latest, true))) - and* heads_subscription = Websocket_client.subscribe_newHeads ctx.ws_client in + (Call ((module Rpc_encodings.Block_number), ())) + and* heads_subscription = + Websocket_client.subscribe_newHeadNumbers ctx.ws_client + in let* () = lwt_stream_iter_es - (fun (b : Transaction_object.t Ethereum_types.block tzresult) -> - let*? b in + (fun number -> + let*? number in let* last_l2_head = Db.Pointers.L2_head.get ctx.db in let expected_level = Ethereum_types.Qty.next last_l2_head in let* () = - unless Ethereum_types.Qty.(b.number = expected_level) @@ fun () -> + unless Ethereum_types.Qty.(number = expected_level) @@ fun () -> catch_up ctx ~from_block:expected_level - ~end_block:(Ethereum_types.Qty.pred b.number) + ~end_block:(Ethereum_types.Qty.pred number) in - on_new_block ctx b ~catch_up:false) + on_new_block ctx number ~catch_up:false) (Lwt_stream.append (Lwt_stream.return (Ok head)) heads_subscription.stream)