diff --git a/src/proto_alpha/lib_protocol/dal_slot_repr.ml b/src/proto_alpha/lib_protocol/dal_slot_repr.ml index a55d4036ff18b0bbf2ee63125d6052cc129f2364..b5fe01bb4816d7724a79e4775f47ae9a6794b293 100644 --- a/src/proto_alpha/lib_protocol/dal_slot_repr.ml +++ b/src/proto_alpha/lib_protocol/dal_slot_repr.ml @@ -294,8 +294,8 @@ module Slots_history = struct in return @@ next ~prev_cell ~prev_cell_ptr elt - let search ~deref ~cell_ptr ~id_target = - search ~deref ~cell_ptr ~compare:(compare_lwt id_target) + let search ~deref ~cell ~id_target = + search ~deref ~cell ~compare:(compare_lwt id_target) (* FIXME/DAL: search will be used in refutation proof. But we need to introduce it here to explain why we need an ordering on the skip list's diff --git a/src/proto_alpha/lib_protocol/sc_rollup_inbox_repr.ml b/src/proto_alpha/lib_protocol/sc_rollup_inbox_repr.ml index f55c481720aca7be8813a8155c53ae4790a4f77f..e0504dbd4d4680437e0570bd5b1285a2d2fd1c32 100644 --- a/src/proto_alpha/lib_protocol/sc_rollup_inbox_repr.ml +++ b/src/proto_alpha/lib_protocol/sc_rollup_inbox_repr.ml @@ -1059,8 +1059,6 @@ struct let produce_proof ctxt history inbox (l, n) = let open Lwt_tzresult_syntax in - let cell_ptr = hash_skip_list_cell inbox in - let*? history = History.remember cell_ptr inbox history in let deref ptr = History.find ptr history in let compare hash = let*! tree = P.lookup_tree ctxt hash in @@ -1073,22 +1071,22 @@ struct | None -> -1 | Some level -> Raw_level_repr.compare level l) in - let* path = - option_to_result - (Format.sprintf - "Skip_list.search failed to find path to requested level (%ld)" - (Raw_level_repr.to_int32 l)) - (Skip_list.search ~deref ~compare ~cell_ptr) - in - let* inc = - option_to_result - "failed to deref some level in the path" - (Lwt.return (lift_ptr_path deref path)) - in - let* level = - option_to_result - "Skip_list.search returned empty list" - (Lwt.return (List.last_opt inc)) + let*! result = Skip_list.search ~deref ~compare ~cell:inbox in + let* inc, level = + match result with + | Skip_list.{rev_path; last_cell = Found level} -> + return (List.rev rev_path, level) + | {last_cell = Nearest _; _} + | {last_cell = No_exact_or_lower_ptr; _} + | {last_cell = Deref_returned_none; _} -> + (* We are only interested to the result where [search] than a + path to the cell we were looking for. All the other cases + should be considered as an error. *) + proof_error + (Format.asprintf + "Skip_list.search failed to find a valid path: %a" + (Skip_list.pp_search_result ~pp_cell:pp_history_proof) + result) in let* level_tree = option_to_result @@ -1112,6 +1110,9 @@ struct return (Single_level {level; inc; message_proof}, None) else let target_index = Skip_list.index level + 1 in + let cell_ptr = hash_skip_list_cell inbox in + let*? history = History.remember cell_ptr inbox history in + let deref ptr = History.find ptr history in let* inc = option_to_result "failed to find path to upper level" diff --git a/src/proto_alpha/lib_protocol/skip_list_repr.ml b/src/proto_alpha/lib_protocol/skip_list_repr.ml index d8c46df0212b5a8d77422abd4251e5238a440fe2..44b484af9f5a02cce28659368d7fa6bd652c288c 100644 --- a/src/proto_alpha/lib_protocol/skip_list_repr.ml +++ b/src/proto_alpha/lib_protocol/skip_list_repr.ml @@ -75,11 +75,31 @@ module type S = sig 'ptr list -> bool + type ('ptr, 'content) search_cell_result = + | Found of ('ptr, 'content) cell + | Nearest of { + lower : ('ptr, 'content) cell; + upper : ('ptr, 'content) cell option; + } + | No_exact_or_lower_ptr + | Deref_returned_none + + type ('ptr, 'content) search_result = { + rev_path : ('ptr, 'content) cell list; + last_cell : ('ptr, 'content) search_cell_result; + } + + val pp_search_result : + pp_cell:(Format.formatter -> ('ptr, 'content) cell -> unit) -> + Format.formatter -> + ('ptr, 'content) search_result -> + unit + val search : deref:('ptr -> ('content, 'ptr) cell option) -> compare:('content -> int Lwt.t) -> - cell_ptr:'ptr -> - 'ptr list option Lwt.t + cell:('content, 'ptr) cell -> + ('content, 'ptr) search_result Lwt.t end module Make (Parameters : sig @@ -319,39 +339,136 @@ end) : S = struct | first_cell_ptr :: path -> equal_ptr first_cell_ptr cell_ptr && valid_path cell_index cell_ptr path - let search ~deref ~compare ~cell_ptr = - let open Lwt_option_syntax in - let rec aux path ptr ix = - let*? cell = deref ptr in - let*? candidate_ptr = back_pointer cell ix in - let*? candidate_cell = deref candidate_ptr in - let*! comparison = compare candidate_cell.content in - if Compare.Int.(comparison = 0) then - (* In this case, we have reached our target cell. *) - return (List.rev (candidate_ptr :: ptr :: path)) - else if Compare.Int.(comparison < 0) then - if Compare.Int.(ix = 0) then - (* If the first back pointer is 'too far' ([comparison < 0]), - that means we won't find a valid target cell. *) - fail - else - (* If a back pointer other than the first is 'too far' - we can then backtrack to the previous back pointer. *) - let*? new_ptr = back_pointer cell (ix - 1) in - aux (ptr :: path) new_ptr 0 - else if Compare.Int.(ix + 1 >= FallbackArray.length cell.back_pointers) - then - (* If we reach the final back pointer and still have - [comparison > 0], we should continue from that cell. *) - aux (ptr :: path) candidate_ptr 0 + type ('ptr, 'content) search_cell_result = + | Found of ('ptr, 'content) cell + | Nearest of { + lower : ('ptr, 'content) cell; + upper : ('ptr, 'content) cell option; + } + | No_exact_or_lower_ptr + | Deref_returned_none + + type ('ptr, 'content) search_result = { + rev_path : ('ptr, 'content) cell list; + last_cell : ('ptr, 'content) search_cell_result; + } + + let pp_rev_path ~pp_cell = + Format.pp_print_list ~pp_sep:Format.pp_print_space pp_cell + + let pp_search_cell_result ~pp_cell fmt = function + | Found ptr -> Format.fprintf fmt "Found(%a)" pp_cell ptr + | Nearest {lower; upper} -> + Format.fprintf + fmt + "Nearest(lower=%a;upper=%a)" + pp_cell + lower + (Format.pp_print_option pp_cell) + upper + | No_exact_or_lower_ptr -> Format.fprintf fmt "No_exact_or_lower_ptr" + | Deref_returned_none -> Format.fprintf fmt "Deref_returned_none" + + let pp_search_result ~pp_cell fmt {rev_path; last_cell} = + Format.fprintf + fmt + "{rev_path = %a; last_point = %a}" + (pp_rev_path ~pp_cell) + rev_path + (pp_search_cell_result ~pp_cell) + last_cell + + let search (type ptr) ~(deref : ptr -> ('content, ptr) cell option) ~compare + ~cell = + let open Lwt_syntax in + let ( = ), ( < ), ( > ) = Compare.Int.(( = ), ( < ), ( > )) in + (* Given a cell, to compute the minimal path, we need to find the + good back-pointer. This is done linearly with respect to the + number of back-pointers. This number of back-pointers is + logarithmic with respect to the number of non-empty + inboxes. The complexity is consequently in O(log_2^2(n)). Since + in practice, [n < 2^32], we have at most [1000] calls. Besides, + the recursive function is tail recursive. + + The linear search could be turned into a dichotomy search if + necessary. But since this piece of code won't be used in a + carbonated function, we prefer to keep a simple implementation + for the moment. *) + let rec aux rev_path cell ix = + (* Below, we call the [target] the cell for which [compare target = 0]. *) + + (* Invariant: + + - compare cell > target + - ix >= 0 + - if cell <> genesis => ix < List.length (back_pointers cell) + - \exists path' rev_path = cell:path' + *) + let back_pointers_length = FallbackArray.length cell.back_pointers in + if back_pointers_length = 0 then + (* [cell] is the genesis cell. *) + return {rev_path; last_cell = No_exact_or_lower_ptr} else - (* Final case, we just try the next back pointer. *) - aux path ptr (ix + 1) + let candidate_ptr = + match back_pointer cell ix with + | None -> + (* At this point we have [cell <> genesis]. Consequently, + thanks to the invariant of this function, we have [ix + < List.length (back_pointers cell)]. Consequently, the + call to [back_pointer] cannot fail. *) + assert false + | Some candidate_ptr -> candidate_ptr + in + match deref candidate_ptr with + | None -> + (* If we cannot dereference a pointer, We stop the search + and returns the current path. *) + return {rev_path; last_cell = Deref_returned_none} + | Some next_cell -> ( + let* comparison = compare next_cell.content in + if comparison = 0 then + (* We have found the target.*) + let rev_path = next_cell :: rev_path in + return {rev_path; last_cell = Found next_cell} + else if comparison > 0 then + if ix < back_pointers_length - 1 then + (* There might be a short path by dereferencing the next pointer. *) + aux rev_path cell (ix + 1) + else + (* The last pointer is still above the target. We are on the good track, *) + let rev_path = next_cell :: rev_path in + aux rev_path next_cell 0 + else if ix = 0 then + (* We found a cell lower than the target. *) + (* The first back pointers gives a cell below the target *) + let rev_path = next_cell :: rev_path in + return + { + rev_path; + last_cell = Nearest {lower = next_cell; upper = Some cell}; + } + else + (* We found a cell lower than the target. *) + (* The previous pointer was actually the good one. *) + let good_candidate_ptr = + match back_pointer cell (ix - 1) with + | None -> assert false + | Some candidate_ptr -> candidate_ptr + in + match deref good_candidate_ptr with + | None -> + (* We already dereferenced this pointer before. *) + assert false + | Some good_next_cell -> + let rev_path = good_next_cell :: rev_path in + aux rev_path good_next_cell 0) in - let*? cell = deref cell_ptr in - let*! comparison = compare cell.content in - (* We must check that we aren't already at the target cell before - starting the recursion. *) - if Compare.Int.(comparison = 0) then return [cell_ptr] - else aux [] cell_ptr 0 + let* comparison = compare cell.content in + if Compare.Int.(comparison = 0) then + (* Particular case where the target is the start cell. *) + return {rev_path = [cell]; last_cell = Found cell} + else if Compare.Int.(comparison < 0) then + return + {rev_path = [cell]; last_cell = Nearest {lower = cell; upper = None}} + else aux [cell] cell 0 end diff --git a/src/proto_alpha/lib_protocol/skip_list_repr.mli b/src/proto_alpha/lib_protocol/skip_list_repr.mli index c2c3348e4d78c86393d62fd3714ef006adab67b2..489d6e74f9143209dfad2fde6b43fc2e28b81882 100644 --- a/src/proto_alpha/lib_protocol/skip_list_repr.mli +++ b/src/proto_alpha/lib_protocol/skip_list_repr.mli @@ -118,21 +118,71 @@ module type S = sig 'ptr list -> bool - (** [search ~deref ~compare ~cell_ptr] is similar to {!back_path} except - that it will search through the skip list to find a cell at which - [compare content] is zero. This will only work if [compare] is a - function that returns a negative integer for cells before the - target and a positive integer for cells after the target. - - If [d] is the distance in the skip list between the original - [cell_ptr] and the final target cell, this function involves - [O(log_basis(d))] calls to [deref] and [compare] (the logarithm's - base is the [basis] parameter in the skip list). *) + type ('ptr, 'content) search_cell_result = + | Found of ('ptr, 'content) cell + | Nearest of { + lower : ('ptr, 'content) cell; + upper : ('ptr, 'content) cell option; + } + | No_exact_or_lower_ptr + | Deref_returned_none + + type ('ptr, 'content) search_result = { + rev_path : ('ptr, 'content) cell list; + last_cell : ('ptr, 'content) search_cell_result; + } + + val pp_search_result : + pp_cell:(Format.formatter -> ('ptr, 'content) cell -> unit) -> + Format.formatter -> + ('ptr, 'content) search_result -> + unit + + (** [search ~deref ~compare ~cell] allows to find a cell of the skip + list according to its content. This function assumes that the + content of the cells is in increasing order according to the + ordering defined by the function [compare]. In other words, this + function assumes that [compare] is a function that returns a + negative integer for cells before the target and a positive + integer for cells after the target. The value returned by this + function is [{rev_path; last_cell}] such that. + + - [rev_path = []] if and only if [compare (content cell) > 0] + + - For all the cases below, if there is a path from cell [A] to + cell [B], [rev_path] contains the list of cells to go from [B] to + [A]. Consequently, the first element of [rev_path] is [B] and the + path is minimal. + + - [last_pointer = Deref_returned_none] if [deref] fails to + associate a cell to a pointer during the search. In that case, + [rev_path] is a path from [cell] to [candidate] where [candidate] + is the last cell for which candidate did not fail and such that + [compare (content (candidate)) > 0]. + + - [last_pointer = No_exact_or_lower_ptr] if for all cell of the + skip list, [compare (content cell) > 0]. In that case, [rev_path] + is a path from [cell] to the genesis cell. + + - [last_pointer = Found target] if there is a cell [target] such + that [compare (content target) = 0] and a path from [cell] to + [target]. In that case, [rev_path] is the minimal path from + [cell] to [target]. + + - [last_pointer = Nearest_lower {lower;upper}] if there is no + cell in the skip list such that [compare (content cell) = 0]. In + that case [lower] is the unique cell such that [compare (content + lower) < 0] and for all other cells [candidate] such that + [compare (content candidate) < 0] then there is a path from + [lower] to [candidate]. [upper], if it exists is the successor + cell to [lower], i.e. [deref ((back_pointer upper) 0) = Some + lower]. In that case, [rev_path] is the minimal path from [cell] + to [lower]. *) val search : deref:('ptr -> ('content, 'ptr) cell option) -> compare:('content -> int Lwt.t) -> - cell_ptr:'ptr -> - 'ptr list option Lwt.t + cell:('content, 'ptr) cell -> + ('content, 'ptr) search_result Lwt.t end module Make (_ : sig diff --git a/src/proto_alpha/lib_protocol/test/unit/test_skip_list_repr.ml b/src/proto_alpha/lib_protocol/test/unit/test_skip_list_repr.ml index 99f32e8601308746a50c5cb3fe574a6ce5768ffd..f482fdf12c296007be35a09c4795bf3a5bf63bdf 100644 --- a/src/proto_alpha/lib_protocol/test/unit/test_skip_list_repr.ml +++ b/src/proto_alpha/lib_protocol/test/unit/test_skip_list_repr.ml @@ -101,7 +101,7 @@ struct (search ~deref:(deref list) ~compare:(fun x -> Lwt.return Compare.Int.(compare x target_content)) - ~cell_ptr:start) + ~cell:start) let valid_back_path list start stop path = valid_back_path @@ -115,41 +115,37 @@ struct let check_path i j back_path_fn = let l = nlist basis i in - if i <= j then return () - else - match back_path_fn l i j with - | None -> - fail (err (Printf.sprintf "There must be path from %d to %d" i j)) - | Some path -> - let len = List.length path in - let log_basis x = - int_of_float - @@ ceil (log (float_of_int x) /. log (float_of_int basis)) - in - let log_ij = log_basis (i - j + 1) in - let expected = log_ij * basis in - fail_unless - (len <= expected) - (err - (Format.sprintf - "The proof is too long! Expected = %d < len = %d [basis = \ - %d, i = %d, log = %d, j = %d]\n" - expected - len - basis - i - log_ij - j)) - >>=? fun () -> - fail_unless - (valid_back_path l i j path) - (err - (Printf.sprintf - "The path %s does not connect %d to %d (or is \ - invalid/non-minimal)" - (show_path path) - i - j)) + match back_path_fn l i j with + | None -> fail (err (Printf.sprintf "There must be path from %d to %d" i j)) + | Some path -> + let len = List.length path in + let log_basis x = + int_of_float @@ ceil (log (float_of_int x) /. log (float_of_int basis)) + in + let log_ij = log_basis (i - j + 1) in + let expected = max 1 (log_ij * basis) in + fail_unless + (len <= expected) + (err + (Format.sprintf + "The proof is too long! Expected = %d < len = %d [basis = %d, \ + i = %d, log = %d, j = %d]\n" + expected + len + basis + i + log_ij + j)) + >>=? fun () -> + fail_unless + (valid_back_path l i j path) + (err + (Printf.sprintf + "The path %s does not connect %d to %d (or is \ + invalid/non-minimal)" + (show_path path) + i + j)) let check_invalid_paths i = let l = nlist basis i in @@ -169,25 +165,98 @@ struct in aux 0 + let check_lower_path history rev_path target = + match rev_path with + | [] -> + (* checked before. *) + assert false + | [cell_x] -> + if + (* If there is a single element, we check the content of the + cell is smaller than the target. *) + Compare.Int.(content cell_x < target) + then return () + else fail (err (Printf.sprintf "Invalid path: %d" target)) + | rev_path -> ( + (* The path is returned from the start cell to the target. The + invariant we want to check is in the opposite direction. *) + match rev_path with + | cell_x :: cell_z :: _ -> ( + let i = index cell_x in + let next_index = i + 1 in + match + List.nth history.cells (List.length history.cells - next_index - 1) + with + | None -> assert false + | Some (_y, cell_y) -> + if + Compare.Int.( + content cell_x < target + && target < content cell_y + && content cell_y <= content cell_z) + then return () + else + fail (err (Printf.sprintf "Invariant for 'Lower' is broken"))) + | _ -> assert false) + let check_invalid_search_paths i = let l = nlist basis i in let rec aux j = if i <= j then return () else - let t = content_from_index ~default:(-1) l j + 1 in - (match search l i t with - | None -> return () - | Some _path -> - fail - (err - (Printf.sprintf - "There should be no search path connecting %d to a node \ - with content %d" - i - t))) - >>=? fun () -> aux (j + 1) + (* An odd number to make the search fails. *) + let shift_size = 5 in + (* delta is chosen so that j + delta is not in the list and + can be below the smallest element and greater than the + largest element. *) + let delta = + if List.length l.cells mod 2 = 0 then -shift_size else shift_size + in + let t = content_from_index ~default:(-1) l j + delta in + (* By construction, deref never fails since j <= List.length list. *) + match deref l i with + | None -> assert false + | Some start_content -> + (* For each case below, we check whether the last cell + returned is valid with respect to the current path. Two + cases are not possible. *) + (match search l start_content t with + | {last_cell = No_exact_or_lower_ptr; rev_path} -> ( + (* In that case, we check the path returned by search + is above the target. *) + match rev_path with + | [] -> fail (err (Printf.sprintf "unexpected empty path")) + | head :: _ -> + if Compare.Int.(content head > t) then return () + else + fail + (err + (Printf.sprintf + "Invariant for 'No_exact_or_lower_ptr' broken"))) + | {last_cell = Nearest _; rev_path} -> + (* In that case, we check the property of being a lower path. *) + check_lower_path l rev_path t + | {last_cell = Deref_returned_none; _} -> + (* deref should always work *) + assert false + | {last_cell = Found _; _} -> + (* Because we search for a cell that which is not in + the list, if the cell was found, we fail. *) + fail + (err + (Printf.sprintf + "There should be no search path connecting %d to a \ + node with content %d" + i + t))) + >>=? fun () -> aux (j + 1) in aux 0 + + let pp_search_result fmt = + pp_search_result + ~pp_cell:(fun fmt cell -> Format.fprintf fmt "%s" (show_cell cell)) + fmt end let test_skip_list_nat_check_path (basis, i, j) = @@ -241,7 +310,18 @@ let test_skip_list_nat_check_path_with_search (basis, i, j) = end) in M.check_path i j (fun l i j -> let target = M.content_from_index ~default:(-1) l j in - M.search l i target) + let start = + match M.deref l i with None -> assert false | Some start -> start + in + match M.search l start target with + | {last_cell = Found _; rev_path} -> + List.rev_map + (fun cell -> + let x = M.content cell in + (x - 10) / 2) + rev_path + |> Option.some + | _result -> None) let test_skip_list_nat_check_invalid_path_with_search (basis, i) = let module M = TestNat (struct @@ -286,7 +366,7 @@ let tests = ~count:10 QCheck2.Gen.( let* basis = frequency [(5, pure 2); (1, 2 -- 73)] in - let* i = 0 -- 100 in + let* i = 0 -- 10 in return (basis, i)) test_skip_list_nat_check_invalid_path_with_search; ]