diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml index 862494c33934d1dc900dd7dbac3930d0169ec639..918badcb2b2463c4d6a04666055cd2cf88f3abc0 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml @@ -234,9 +234,10 @@ let hash_encoding = Data_encoding.(conv hash_to_string hash_of_string string) let equal_hash (Hash (Hex h1)) (Hash (Hex h2)) = String.equal h1 h2 -let pp_hash fmt (Hash (Hex h)) = Format.pp_print_string fmt h +let pp_hash fmt h = Format.pp_print_string fmt (hash_to_string h) -let pp_block_hash fmt (Block_hash (Hex h)) = Format.pp_print_string fmt h +let pp_block_hash fmt (Block_hash h) = + Format.pp_print_string fmt (hex_to_string h) let decode_hex bytes = Hex Hex.(of_bytes bytes |> show) diff --git a/etherlink/bin_node/lib_dev/tx_queue.ml b/etherlink/bin_node/lib_dev/tx_queue.ml index 0f092f6453fdf750853d2439c921aa35d39af4c7..0427bef60c5e7d2ce943c7701d25843daca1e11c 100644 --- a/etherlink/bin_node/lib_dev/tx_queue.ml +++ b/etherlink/bin_node/lib_dev/tx_queue.ml @@ -593,14 +593,12 @@ module Handlers = struct return (match batch_response with Singleton r -> [r] | Batch rs -> rs) | Websocket ws_client -> - List.map_es + List.map_ep (fun req -> - let+ resp_json = + let*! response = Websocket_client.send_jsonrpc_request ws_client req in - Data_encoding.Json.destruct - Rpc_encodings.JSONRPC.response_encoding - resp_json) + return Rpc_encodings.JSONRPC.{value = response; id = req.id}) batch in diff --git a/etherlink/bin_node/lib_dev/websocket_client.ml b/etherlink/bin_node/lib_dev/websocket_client.ml index f85defe5ca6d82599eef0ab773ff5aa611032366..57983b907af1f54b9a1f74d9b5b9645d0df7a0e4 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.ml +++ b/etherlink/bin_node/lib_dev/websocket_client.ml @@ -416,7 +416,7 @@ let connect ?monitoring media uri = let disconnect {conn; _} = disconnect conn let send_jsonrpc_request client (request : JSONRPC.request) = - let open Lwt_result_syntax in + let open Lwt_syntax in let message = client.media.construct JSONRPC.request_encoding request in let opcode = if client.binary then Websocket.Frame.Opcode.Binary else Text in let response, resolver = Lwt.task () in @@ -424,16 +424,14 @@ let send_jsonrpc_request client (request : JSONRPC.request) = client.pending_requests request.id (Lwt.wakeup_later_result resolver) ; - let*! () = + let* () = Websocket_lwt_unix.write client.conn (Websocket.Frame.create ~opcode ~content:message ()) in - let*! response in + let* response in Request_table.remove client.pending_requests request.id ; - match response with - | Error e -> tzfail (Request_failed (request, e)) - | Ok resp -> return resp + return response type (_, _) call = | Call : @@ -454,8 +452,11 @@ let send_jsonrpc : id = Some id; } in - let+ response = send_jsonrpc_request client request in - Data_encoding.Json.destruct M.output_encoding response + let*! response = send_jsonrpc_request client request in + match response with + | Error e -> tzfail (Request_failed (request, e)) + | Ok response -> + return @@ Data_encoding.Json.destruct M.output_encoding response let subscribe client (kind : Ethereum_types.Subscription.kind) = let open Lwt_result_syntax in diff --git a/etherlink/bin_node/lib_dev/websocket_client.mli b/etherlink/bin_node/lib_dev/websocket_client.mli index 99a06f27ef2a0eb9b1e870087d41c35514bea4cb..7324e1c29185d5fe5c83cb3914a2010355000e32 100644 --- a/etherlink/bin_node/lib_dev/websocket_client.mli +++ b/etherlink/bin_node/lib_dev/websocket_client.mli @@ -44,8 +44,7 @@ val connect : ?monitoring:monitoring -> Media_type.t -> Uri.t -> t Lwt.t val disconnect : t -> unit Lwt.t (** Send a raw JSON RPC request on the websocket. *) -val send_jsonrpc_request : - t -> JSONRPC.request -> Data_encoding.json tzresult Lwt.t +val send_jsonrpc_request : t -> JSONRPC.request -> JSONRPC.value Lwt.t (** [send_jsonrpc client (Call ((module Method), input))] makes a JSONRPC request with the provided [Method] and [input] to the websocket [client]. It diff --git a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml index 68d8a34cd05346746b84b5ab8977185c0f5c6c40..8cc82aaf6f4e7f19a28ffd9e887cdd4f5eb05858 100644 --- a/etherlink/fa-bridge-watchtower/etherlink_monitor.ml +++ b/etherlink/fa-bridge-watchtower/etherlink_monitor.ml @@ -15,6 +15,7 @@ type ctx = { chain_id : L2_types.chain_id; gas_limit : Z.t; whitelist : Config.whitelist_item list option; + mutable nonce : Ethereum_types.quantity; } module Craft = struct @@ -58,20 +59,20 @@ module Tx_queue = struct match v with Ok v -> f v | Error err -> return (Error err) (* as found in etherlink/bin_floodgate/tx_queue.ml *) - let transfer ctx ?to_ ?(value = Z.zero) ~nonce ~data () = + let transfer ctx ?to_ ?(value = Z.zero) ~data () = let open Lwt_result_syntax in + let (Ethereum_types.Qty nonce as qnonce) = ctx.nonce in let txn = Craft.transfer ctx ~nonce ?to_ ~value ~data () in let tx_raw = Ethereum_types.hex_to_bytes txn in let hash = Ethereum_types.hash_raw_tx tx_raw in let**? tx = Transaction.decode tx_raw in let**? tx_object = Transaction.to_transaction_object ~hash tx in - let+ res = - Tx_container.add - tx_object - ~raw_tx:txn - ~next_nonce:(Ethereum_types.Qty nonce) - in - match res with Ok _hash -> Ok () | Error _ as res -> res + let+ res = Tx_container.add tx_object ~raw_tx:txn ~next_nonce:qnonce in + match res with + | Ok _hash -> + ctx.nonce <- Ethereum_types.Qty.next ctx.nonce ; + Ok () + | Error _ as res -> res end module Contract = Tezos_raw_protocol_alpha.Alpha_context.Contract @@ -212,19 +213,18 @@ module Event = struct ("log", Ethereum_types.transaction_log_encoding) let deposit_log = - declare_7 + declare_6 ~section ~name:"deposit_log" ~msg: "Deposit {nonce}: {amount} {token} to {receiver} in transaction \ - {transactionHash}({transactionIndex}) of block {blockNumber}" + {transactionHash} of block {blockNumber}" ~level:Notice ("nonce", Db.quantity_hum_encoding) ("amount", Data_encoding.float) ("token", Data_encoding.string) ("receiver", Ethereum_types.address_encoding) ("transactionHash", Ethereum_types.hash_encoding) - ("transactionIndex", Db.quantity_hum_encoding) ("blockNumber", Db.quantity_hum_encoding) ~pp1:Ethereum_types.pp_quantity ~pp3:Format.pp_print_string @@ -232,7 +232,6 @@ module Event = struct Format.pp_print_string fmt (Ethereum_types.Address.to_string a)) ~pp5:Ethereum_types.pp_hash ~pp6:Ethereum_types.pp_quantity - ~pp7:Ethereum_types.pp_quantity let emit_deposit_log ws_client (d : Db.deposit) (l : Db.log_info) = let open Lwt_result_syntax in @@ -242,13 +241,7 @@ module Event = struct Lwt_result.ok @@ emit deposit_log - ( d.nonce, - amount, - symbol, - d.receiver, - l.transactionHash, - l.transactionIndex, - l.blockNumber ) + (d.nonce, amount, symbol, d.receiver, l.transactionHash, l.blockNumber) let unclaimed_deposits = declare_1 @@ -268,21 +261,19 @@ module Event = struct ~pp1:Ethereum_types.pp_quantity let claimed_deposit = - declare_4 + declare_3 ~section ~name:"claimed_deposit" ~msg: - "Claimed deposit {nonce} in transaction \ - {transactionHash}({transactionIndex}) of block {blockNumber}" + "Claimed deposit {nonce} in transaction {transactionHash} of block \ + {blockNumber}" ~level:Notice ("nonce", Db.quantity_hum_encoding) ("transactionHash", Ethereum_types.hash_encoding) - ("transactionIndex", Db.quantity_hum_encoding) ("blockNumber", Db.quantity_hum_encoding) ~pp1:Ethereum_types.pp_quantity ~pp2:Ethereum_types.pp_hash ~pp3:Ethereum_types.pp_quantity - ~pp4:Ethereum_types.pp_quantity let claiming_deposit_status_fail = declare_3 @@ -550,20 +541,13 @@ let precompiled_contract_address = Efunc_core.Private.a Deposit.address_hex let claim ctx ~deposit_id = let open Lwt_result_syntax in - let* (Ethereum_types.Qty nonce) = - Websocket_client.send_jsonrpc - ctx.ws_client - (Call - ( (module Rpc_encodings.Get_transaction_count), - (ctx.public_key, Block_parameter Latest) )) - in let data = Efunc_core.Evm.encode ~name:"claim" [`uint 256] [`int deposit_id] in let _ : unit Lwt.t = let open Lwt_syntax in let* res = - Tx_queue.transfer ctx ~nonce ~to_:precompiled_contract_address ~data () + Tx_queue.transfer ctx ~to_:precompiled_contract_address ~data () in match res with | Ok (Ok ()) -> return_unit @@ -717,10 +701,7 @@ let handle_confirmed_txs {db; ws_client; _} in let*! () = Event.(emit claimed_deposit) - ( nonce, - exec.transactionHash, - exec.transactionIndex, - exec.blockNumber ) + (nonce, exec.transactionHash, exec.blockNumber) in let* () = Db.Deposits.set_claimed db nonce exec in let* () = @@ -744,20 +725,26 @@ let handle_confirmed_txs {db; ws_client; _} let claim_deposits ctx = let open Lwt_result_syntax in let* deposits = Db.Deposits.get_unclaimed ctx.db in - let*! () = - let number = List.length deposits in - if number > 0 then Event.(emit unclaimed_deposits) number - else Lwt.return_unit - in - let* () = - List.iter_es - (fun deposit -> - let (Qty deposit_id) = deposit.Db.nonce in - let*! () = Event.(emit claiming_deposit) deposit.Db.nonce in - claim ctx ~deposit_id) - deposits - in - return_unit + match deposits with + | [] -> return_unit + | _ -> + let*! () = Event.(emit unclaimed_deposits) (List.length deposits) in + let* nonce = + Websocket_client.send_jsonrpc + ctx.ws_client + (Call + ( (module Rpc_encodings.Get_transaction_count), + (ctx.public_key, Block_parameter Latest) )) + in + ctx.nonce <- nonce ; + (* Clear queue because we reinject all missing claims. *) + let* () = Tx_queue.Tx_container.clear () in + List.iter_es + (fun deposit -> + let (Qty deposit_id) = deposit.Db.nonce in + let*! () = Event.(emit claiming_deposit) deposit.Db.nonce in + claim ctx ~deposit_id) + deposits let on_new_block ctx ~catch_up (b : _ Ethereum_types.block) = let open Lwt_result_syntax in @@ -887,6 +874,7 @@ let start db ~config ~notify_ws_change ~first_block = chain_id; gas_limit = Z.of_int64 config.gas_limit; whitelist = config.whitelist; + nonce = Ethereum_types.Qty.zero; } in monitor_heads ctx @@ -913,7 +901,7 @@ let start db ~config ~notify_ws_change ~first_block = | Error e -> Format.kasprintf Event.(emit tx_queue_error) "%a" pp_print_trace e in - let*! () = Lwt_unix.sleep 0.05 in + let*! () = Lwt_unix.sleep 0.5 in tx_queue_beacon () in let* () =