From 19b70d08b190315bd9b846c8c8a8445bec832892 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Tue, 10 Sep 2024 17:23:32 +0200 Subject: [PATCH 1/3] Teztale/Server: use inlined syntax for Lwt[_result].[bind|map] --- .../bin_teztale_server/teztale_server_main.ml | 586 +++++++++--------- 1 file changed, 289 insertions(+), 297 deletions(-) diff --git a/teztale/bin_teztale_server/teztale_server_main.ml b/teztale/bin_teztale_server/teztale_server_main.ml index b8bd54326e77..650d8cc4a2c2 100644 --- a/teztale/bin_teztale_server/teztale_server_main.ml +++ b/teztale/bin_teztale_server/teztale_server_main.ml @@ -95,18 +95,17 @@ let get_only_endpoint meth f = | _ -> method_not_allowed_respond methods let with_json_body body f = - Lwt.bind (Cohttp_lwt.Body.to_string body) (fun str -> - match Ezjsonm.from_string_result str with - | Error err -> - Cohttp_lwt_unix.Server.respond_string - ~headers: - (Cohttp.Header.init_with - "content-type" - "text/plain; charset=UTF-8") - ~status:`Bad_request - ~body:(Ezjsonm.read_error_description err) - () - | Ok json -> f json) + let open Lwt.Syntax in + let* str = Cohttp_lwt.Body.to_string body in + match Ezjsonm.from_string_result str with + | Error err -> + Cohttp_lwt_unix.Server.respond_string + ~headers: + (Cohttp.Header.init_with "content-type" "text/plain; charset=UTF-8") + ~status:`Bad_request + ~body:(Ezjsonm.read_error_description err) + () + | Ok json -> f json let with_data encoding body f = with_json_body body (fun json -> @@ -312,6 +311,7 @@ let get_users ~logger conf db_pool = (fun users -> reply_public_json Data_encoding.(list string) users) let get_levels_at_timestamp0 db_pool timestamp = + let open Lwt_result.Syntax in let lower = Caqti_request.Infix.(Caqti_type.(int32 ->* t2 int32 int32)) "SELECT level, MIN(timestamp) FROM blocks, (SELECT MAX(level) AS m FROM \ @@ -324,45 +324,47 @@ let get_levels_at_timestamp0 db_pool timestamp = blocks WHERE timestamp >= ?) WHERE level = m OR level = m - 1 GROUP BY \ level" in - Lwt_result.bind - (Caqti_lwt_unix.Pool.use - (fun (module Db : Caqti_lwt.CONNECTION) -> - Db.collect_list lower timestamp) - db_pool) - (function - | [] -> - Caqti_lwt_unix.Pool.use - (fun (module Db : Caqti_lwt.CONNECTION) -> - Db.collect_list upper timestamp) - db_pool - | l -> Lwt_result.return l) + let* res = + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + Db.collect_list lower timestamp) + db_pool + in + match res with + | [] -> + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + Db.collect_list upper timestamp) + db_pool + | l -> Lwt_result.return l let get_levels_at_timestamp db_pool timestamp = - Lwt_result.map - (function - | [] -> (None, None) - | [((_, t) as r)] -> - if Int32.compare timestamp t = -1 then (None, Some r) - else (Some r, None) - | ((_, t1) as a) :: ((_, t2) as b) :: _ -> - if Int32.compare t1 t2 = -1 then (Some a, Some b) else (Some b, Some a)) - (get_levels_at_timestamp0 db_pool timestamp) + let open Lwt_result.Syntax in + let+ levels = get_levels_at_timestamp0 db_pool timestamp in + match levels with + | [] -> (None, None) + | [((_, t) as r)] -> + if Int32.compare timestamp t = -1 then (None, Some r) else (Some r, None) + | ((_, t1) as a) :: ((_, t2) as b) :: _ -> + if Int32.compare t1 t2 = -1 then (Some a, Some b) else (Some b, Some a) (** Fetch the list of allowed logins from db and update the list ref given as parameter. *) let refresh_users conf db_pool users = + let open Lwt_result.Syntax in Lwt_mutex.with_lock Sql_requests.Mutex.nodes (fun () -> let query = Caqti_request.Infix.( Caqti_type.(unit ->* t2 string Sql_requests.Type.bcrypt_hash)) "SELECT name, password FROM nodes WHERE password IS NOT NULL" in - Lwt_result.map - (fun users_from_db -> users := users_from_db) - (Caqti_lwt_unix.Pool.use - (fun (module Db : Caqti_lwt.CONNECTION) -> - maybe_with_metrics conf "refresh_users" @@ fun () -> - Db.collect_list query ()) - db_pool)) + let+ users_from_db = + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + maybe_with_metrics conf "refresh_users" @@ fun () -> + Db.collect_list query ()) + db_pool + in + users := users_from_db) (** Insert a new user (i.e. a teztale archiver) into the database. If the user already exists, password is updated. @@ -402,61 +404,66 @@ let maybe_create_tables db_pool = db_pool let maybe_alter_tables db_pool = + let open Lwt.Syntax in Caqti_lwt_unix.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) -> - Lwt.map - (fun () -> Ok ()) - (Lwt_list.iter_s - (fun reqs -> - Lwt.bind - (Db.with_transaction (fun () -> - Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es - (fun req -> - Db.exec - (Caqti_request.Infix.(Caqti_type.(unit ->. unit)) req) - ()) - reqs)) - (function - | Ok () -> Lwt.return_unit - | Error e -> - Lwt_io.eprintlf - "\"ALTER TABLE ADD COLUMN IF NOT EXISTS\" expression is \ - not supported by sqlite, if the following error is \ - because the column already exists ignore it:\n\ - %s" - (Caqti_error.show e))) - Sql_requests.alter_tables)) + Lwt_list.iter_s + (fun reqs -> + let* res = + Db.with_transaction (fun () -> + Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es + (fun req -> + Db.exec + (Caqti_request.Infix.(Caqti_type.(unit ->. unit)) req) + ()) + reqs) + in + match res with + | Ok () -> Lwt.return_unit + | Error e -> + Lwt_io.eprintlf + "\"ALTER TABLE ADD COLUMN IF NOT EXISTS\" expression is not \ + supported by sqlite, if the following error is because the \ + column already exists ignore it:\n\ + %s" + (Caqti_error.show e)) + Sql_requests.alter_tables + |> Lwt_result.ok) db_pool let maybe_alter_and_create_tables db_pool = - Lwt.bind (maybe_alter_tables db_pool) (function - | Error e -> Lwt.return_error e - | Ok () -> maybe_create_tables db_pool) + let open Lwt_result.Syntax in + let* () = maybe_alter_tables db_pool in + maybe_create_tables db_pool let with_cache mutex request mem add (module Db : Caqti_lwt.CONNECTION) conf list = + let open Lwt_result.Syntax in (* Note: even if data is already in cache, we add it again at the end in order to mark it as recent. *) if conf.Config.with_transaction = FULL then Lwt_mutex.with_lock mutex @@ fun () -> Db.with_transaction @@ fun () -> - Lwt_result.map (fun () -> List.iter add list) - @@ Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es - (fun x -> - if mem x then - Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax - .return_unit - else Db.exec request x) - list + let+ () = + Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es + (fun x -> + if mem x then + Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax + .return_unit + else Db.exec request x) + list + in + List.iter add list else Tezos_lwt_result_stdlib.Lwtreslib.Bare.List.iter_es (fun x -> - Lwt_result.map - (fun () -> add x) - (if mem x then - Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax - .return_unit - else Db.exec request x)) + let+ () = + if mem x then + Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax + .return_unit + else Db.exec request x + in + add x) list let without_cache mutex request = @@ -513,49 +520,50 @@ let endorsing_rights_callback = let open Tezos_lwt_result_stdlib.Lwtreslib.Bare.Monad.Lwt_result_syntax in (* Note: even if data is already in cache, we add it again in order to mark it as recent. *) - Lwt_result.map - (fun () -> Cache.add cache level) - (if Cache.mem cache level then - let () = - Lib_teztale_base.Log.debug logger (fun () -> - Format.asprintf "(%ld) CACHE.mem Level rights" level) - in - return_unit - else - let* () = - maybe_with_metrics conf "maybe_insert_endorsing_right__list" - @@ fun () -> - Caqti_lwt_unix.Pool.use - (fun (module Db : Caqti_lwt.CONNECTION) -> - let* () = - let delegates = - List.map - (fun {Lib_teztale_base.Consensus_ops.address; _} -> - address) - rights - in - may_insert_delegates (module Db) conf delegates - in - let rights = - List.map - (fun Lib_teztale_base.Consensus_ops. - {address; first_slot; power} -> - (level, first_slot, power, address)) - rights - in - without_cache - Sql_requests.Mutex.endorsing_rights - Sql_requests.maybe_insert_endorsing_right - (module Db) - conf - rights) - db_pool - in - let () = - Lib_teztale_base.Log.debug logger (fun () -> - Format.asprintf "(%ld) CACHE.add Level rights" level) - in - return_unit) + let+ () = + if Cache.mem cache level then + let () = + Lib_teztale_base.Log.debug logger (fun () -> + Format.asprintf "(%ld) CACHE.mem Level rights" level) + in + return_unit + else + let* () = + maybe_with_metrics conf "maybe_insert_endorsing_right__list" + @@ fun () -> + Caqti_lwt_unix.Pool.use + (fun (module Db : Caqti_lwt.CONNECTION) -> + let* () = + let delegates = + List.map + (fun {Lib_teztale_base.Consensus_ops.address; _} -> + address) + rights + in + may_insert_delegates (module Db) conf delegates + in + let rights = + List.map + (fun Lib_teztale_base.Consensus_ops. + {address; first_slot; power} -> + (level, first_slot, power, address)) + rights + in + without_cache + Sql_requests.Mutex.endorsing_rights + Sql_requests.maybe_insert_endorsing_right + (module Db) + conf + rights) + db_pool + in + let () = + Lib_teztale_base.Log.debug logger (fun () -> + Format.asprintf "(%ld) CACHE.add Level rights" level) + in + return_unit + in + Cache.add cache level in with_caqti_error ~logger out (fun () -> Cohttp_lwt_unix.Server.respond_string @@ -618,88 +626,87 @@ let block_callback = let* () = (* Note: even if data is already in cache, we add it again in order to mark it as recent. *) - Lwt_result.map - (fun () -> Block_lru_cache.add block_cache hash) - (if Block_lru_cache.mem block_cache hash then - let () = - Lib_teztale_base.Log.debug logger (fun () -> - Format.asprintf - "(%ld) CACHE.mem %a" - level - Tezos_base.TzPervasives.Block_hash.pp - hash) - in - return_unit - else - let* () = may_insert_delegates (module Db) conf [delegate] in - let* () = - maybe_with_metrics conf "maybe_insert_block" @@ fun () -> - without_cache - Sql_requests.Mutex.blocks - Sql_requests.maybe_insert_block - (module Db) - conf - [ - ((level, timestamp, hash, round), (predecessor, delegate)); - ] - in - let* () = - match cycle_info with - | Some - Lib_teztale_base.Data.{cycle; cycle_position; cycle_size} - -> - maybe_with_metrics conf "maybe_insert_cycle" @@ fun () -> - without_cache - Sql_requests.Mutex.cycles - Sql_requests.maybe_insert_cycle - (module Db) - conf - [(cycle, Int32.sub level cycle_position, cycle_size)] - | _ -> Lwt.return_ok () - in - Lib_teztale_base.Log.debug logger (fun () -> - Format.asprintf - "(%ld) CACHE.add (block) %a" - level - Tezos_base.TzPervasives.Block_hash.pp - hash) ; - return_unit) + let+ () = + if Block_lru_cache.mem block_cache hash then + let () = + Lib_teztale_base.Log.debug logger (fun () -> + Format.asprintf + "(%ld) CACHE.mem %a" + level + Tezos_base.TzPervasives.Block_hash.pp + hash) + in + return_unit + else + let* () = may_insert_delegates (module Db) conf [delegate] in + let* () = + maybe_with_metrics conf "maybe_insert_block" @@ fun () -> + without_cache + Sql_requests.Mutex.blocks + Sql_requests.maybe_insert_block + (module Db) + conf + [((level, timestamp, hash, round), (predecessor, delegate))] + in + let* () = + match cycle_info with + | Some + Lib_teztale_base.Data.{cycle; cycle_position; cycle_size} + -> + maybe_with_metrics conf "maybe_insert_cycle" @@ fun () -> + without_cache + Sql_requests.Mutex.cycles + Sql_requests.maybe_insert_cycle + (module Db) + conf + [(cycle, Int32.sub level cycle_position, cycle_size)] + | _ -> Lwt.return_ok () + in + Lib_teztale_base.Log.debug logger (fun () -> + Format.asprintf + "(%ld) CACHE.add (block) %a" + level + Tezos_base.TzPervasives.Block_hash.pp + hash) ; + return_unit + in + Block_lru_cache.add block_cache hash in let* () = (* Validated blocks do not provide operations only applied one does so we need a second cache *) (* Note: even if data is already in cache, we add it again in order to mark it as recent. *) - Lwt_result.map - (fun () -> - if attestations <> [] then - Block_lru_cache.add block_operations_cache hash) - (if Block_lru_cache.mem block_operations_cache hash then - return_unit - else - let* () = - insert_operations_from_block - (module Db) - conf - (Int32.pred level) - hash - attestations - in - let* () = - insert_operations_from_block - (module Db) - conf - level - hash - preattestations - in - Lib_teztale_base.Log.debug logger (fun () -> - Format.asprintf - "(%ld) CACHE.add (block operations) %a" - level - Tezos_base.TzPervasives.Block_hash.pp - hash) ; - return_unit) + let+ () = + if Block_lru_cache.mem block_operations_cache hash then + return_unit + else + let* () = + insert_operations_from_block + (module Db) + conf + (Int32.pred level) + hash + attestations + in + let* () = + insert_operations_from_block + (module Db) + conf + level + hash + preattestations + in + Lib_teztale_base.Log.debug logger (fun () -> + Format.asprintf + "(%ld) CACHE.add (block operations) %a" + level + Tezos_base.TzPervasives.Block_hash.pp + hash) ; + return_unit + in + if attestations <> [] then + Block_lru_cache.add block_operations_cache hash in let* () = maybe_with_metrics conf "insert_received_block__list" @@ fun () -> @@ -1124,23 +1131,23 @@ let routes : match meth with | `PUT -> with_data Config.login_encoding body (fun (login, password) -> + let open Lwt_result.Syntax in let password = Bcrypt.hash password in with_caqti_error ~logger - (Lwt_result.bind - (upsert_user conf db_pool login password) - (fun () -> refresh_users conf db_pool users)) + (let* () = upsert_user conf db_pool login password in + refresh_users conf db_pool users) (reply_ok login)) | `DELETE -> with_data Data_encoding.(obj1 (req "login" string)) body (fun login -> + let open Lwt_result.Syntax in with_caqti_error ~logger - (Lwt_result.bind - (delete_user conf db_pool login) - (fun () -> refresh_users conf db_pool users)) + (let* () = delete_user conf db_pool login in + refresh_users conf db_pool users) (reply_ok login)) | `OPTIONS -> options_respond methods | _ -> method_not_allowed_respond methods) ); @@ -1177,17 +1184,15 @@ let routes : get_head conf ~logger db_pool ); ( Re.str "/metrics", fun _g ~logger:_ ~conf:_ ~admins:_ ~users:_ _db_pool _header _meth _body -> - Lwt.bind - Prometheus.CollectorRegistry.(collect default) - (fun data -> - let body = - Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output data - in - let headers = - Cohttp.Header.init_with "Content-Type" "text/plain; version=0.0.4" - in - Cohttp_lwt_unix.Server.respond_string ~status:`OK ~headers ~body ()) - ); + let open Lwt.Syntax in + let* data = Prometheus.CollectorRegistry.(collect default) in + let body = + Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output data + in + let headers = + Cohttp.Header.init_with "Content-Type" "text/plain; version=0.0.4" + in + Cohttp_lwt_unix.Server.respond_string ~status:`OK ~headers ~body () ); ] |> List.map (fun (r, fn) -> (r |> Re.whole_string |> Re.compile, fn)) @@ -1262,6 +1267,7 @@ let parse_conf file = exit 1) let run (conf : Config.t) = + let open Lwt.Syntax in Lib_teztale_base.Log.verbosity := conf.Config.verbosity ; let logger = Lib_teztale_base.Log.logger () in let uri = Uri.of_string conf.Config.db_uri in @@ -1271,88 +1277,74 @@ let run (conf : Config.t) = Lwt_main.run (match Caqti_lwt_unix.connect_pool ~env:Sql_requests.env uri with | Error e -> - Lwt.bind - (Lwt_io.eprintl (Caqti_error.show e)) - (fun () -> Lwt.return 1) - | Ok pool -> - Lwt.bind (maybe_alter_and_create_tables pool) (function - | Error e -> - Lwt.bind - (Lwt_io.eprintl (Caqti_error.show e)) - (fun () -> Lwt.return 1) - | Ok () -> - let stop, paf = Lwt.task () in - let shutdown _ = Lwt.wakeup paf () in - let _ = Sys.signal Sys.sigint (Sys.Signal_handle shutdown) in - Lwt.bind - (Lwt.map - (List.find_map (function - | Error e -> Some e - | _ -> None)) - (Lwt_list.map_s - (fun (login, password) -> - let password = Bcrypt.hash password in - upsert_user conf pool login password) - conf.Config.users)) - (function - | Some e -> - Lwt.bind - (Lwt_io.eprintl (Caqti_error.show e)) - (fun () -> Lwt.return 1) - | None -> - Lwt.bind (refresh_users conf pool users) (function - | Error e -> - Lwt.bind - (Lwt_io.eprintl (Caqti_error.show e)) - (fun () -> Lwt.return 1) - | Ok () -> - let servers = - List.map - (fun con -> - Lwt.bind - (Conduit_lwt_unix.init - ?src:con.Config.source - ()) - (fun ctx -> - let ctx = - Cohttp_lwt_unix.Net.init ~ctx () - in - let mode = - match con.Config.tls with - | Some Config.{crt; key} -> - `TLS - ( `Crt_file_path crt, - `Key_file_path key, - `No_password, - `Port con.Config.port ) - | None -> - `TCP (`Port con.Config.port) - in - Lib_teztale_base.Log.info - logger - (fun () -> - Printf.sprintf - "Server listening at %s:%d." - (match con.Config.source with - | None -> "" - | Some s -> s) - con.port) ; - Cohttp_lwt_unix.Server.create - ~stop - ~ctx - ~mode - (Cohttp_lwt_unix.Server.make - ~callback: - (callback - ~conf - ~admins - ~users - pool) - ()))) - conf.Config.network_interfaces - in - Lwt.bind (Lwt.join servers) (fun () -> - Lwt.return 0))))) + let* () = Lwt_io.eprintl (Caqti_error.show e) in + Lwt.return 1 + | Ok pool -> ( + let* res = maybe_alter_and_create_tables pool in + match res with + | Error e -> + let* () = Lwt_io.eprintl (Caqti_error.show e) in + Lwt.return 1 + | Ok () -> ( + let stop, paf = Lwt.task () in + let shutdown _ = Lwt.wakeup paf () in + let _ = Sys.signal Sys.sigint (Sys.Signal_handle shutdown) in + let* res = + let+ list = + Lwt_list.map_s + (fun (login, password) -> + let password = Bcrypt.hash password in + upsert_user conf pool login password) + conf.Config.users + in + List.find_map (function Error e -> Some e | _ -> None) list + in + match res with + | Some e -> + let* () = Lwt_io.eprintl (Caqti_error.show e) in + Lwt.return 1 + | None -> ( + let* res = refresh_users conf pool users in + match res with + | Error e -> + let* () = Lwt_io.eprintl (Caqti_error.show e) in + Lwt.return 1 + | Ok () -> + let servers = + List.map + (fun con -> + let* ctx = + Conduit_lwt_unix.init ?src:con.Config.source () + in + let ctx = Cohttp_lwt_unix.Net.init ~ctx () in + let mode = + match con.Config.tls with + | Some Config.{crt; key} -> + `TLS + ( `Crt_file_path crt, + `Key_file_path key, + `No_password, + `Port con.Config.port ) + | None -> `TCP (`Port con.Config.port) + in + Lib_teztale_base.Log.info logger (fun () -> + Printf.sprintf + "Server listening at %s:%d." + (match con.Config.source with + | None -> "" + | Some s -> s) + con.port) ; + Cohttp_lwt_unix.Server.create + ~stop + ~ctx + ~mode + (Cohttp_lwt_unix.Server.make + ~callback:(callback ~conf ~admins ~users pool) + ())) + conf.Config.network_interfaces + in + let* () = Lwt.join servers in + Lwt.return 0)))) in exit code -- GitLab From 72e92f07f883e7ba677116857d9dc31323ab20b1 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Wed, 11 Sep 2024 14:52:14 +0200 Subject: [PATCH 2/3] Teztale/Server: show log message after server start --- .../bin_teztale_server/teztale_server_main.ml | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/teztale/bin_teztale_server/teztale_server_main.ml b/teztale/bin_teztale_server/teztale_server_main.ml index 650d8cc4a2c2..0ea291313ef7 100644 --- a/teztale/bin_teztale_server/teztale_server_main.ml +++ b/teztale/bin_teztale_server/teztale_server_main.ml @@ -1327,6 +1327,16 @@ let run (conf : Config.t) = `Port con.Config.port ) | None -> `TCP (`Port con.Config.port) in + let promise = + Cohttp_lwt_unix.Server.create + ~stop + ~ctx + ~mode + (Cohttp_lwt_unix.Server.make + ~callback: + (callback ~conf ~admins ~users pool) + ()) + in Lib_teztale_base.Log.info logger (fun () -> Printf.sprintf "Server listening at %s:%d." @@ -1334,13 +1344,7 @@ let run (conf : Config.t) = | None -> "" | Some s -> s) con.port) ; - Cohttp_lwt_unix.Server.create - ~stop - ~ctx - ~mode - (Cohttp_lwt_unix.Server.make - ~callback:(callback ~conf ~admins ~users pool) - ())) + promise) conf.Config.network_interfaces in let* () = Lwt.join servers in -- GitLab From 2c4501238dd154778d3a8a2c9a920f46a5e1afa8 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Wed, 11 Sep 2024 15:23:14 +0200 Subject: [PATCH 3/3] Teztale/Server: more refactoring --- .../bin_teztale_server/teztale_server_main.ml | 142 ++++++++---------- 1 file changed, 65 insertions(+), 77 deletions(-) diff --git a/teztale/bin_teztale_server/teztale_server_main.ml b/teztale/bin_teztale_server/teztale_server_main.ml index 0ea291313ef7..41eb4c954f5c 100644 --- a/teztale/bin_teztale_server/teztale_server_main.ml +++ b/teztale/bin_teztale_server/teztale_server_main.ml @@ -1273,84 +1273,72 @@ let run (conf : Config.t) = let uri = Uri.of_string conf.Config.db_uri in let users = ref [] in let admins = List.map (fun (u, p) -> (u, Bcrypt.hash p)) conf.Config.admins in - let code = - Lwt_main.run - (match Caqti_lwt_unix.connect_pool ~env:Sql_requests.env uri with - | Error e -> - let* () = Lwt_io.eprintl (Caqti_error.show e) in - Lwt.return 1 - | Ok pool -> ( - let* res = maybe_alter_and_create_tables pool in - match res with - | Error e -> - let* () = Lwt_io.eprintl (Caqti_error.show e) in - Lwt.return 1 - | Ok () -> ( - let stop, paf = Lwt.task () in - let shutdown _ = Lwt.wakeup paf () in - let _ = Sys.signal Sys.sigint (Sys.Signal_handle shutdown) in - let* res = - let+ list = - Lwt_list.map_s - (fun (login, password) -> - let password = Bcrypt.hash password in - upsert_user conf pool login password) - conf.Config.users - in - List.find_map (function Error e -> Some e | _ -> None) list - in - match res with - | Some e -> - let* () = Lwt_io.eprintl (Caqti_error.show e) in - Lwt.return 1 - | None -> ( - let* res = refresh_users conf pool users in - match res with - | Error e -> - let* () = Lwt_io.eprintl (Caqti_error.show e) in - Lwt.return 1 - | Ok () -> - let servers = - List.map - (fun con -> - let* ctx = - Conduit_lwt_unix.init ?src:con.Config.source () - in - let ctx = Cohttp_lwt_unix.Net.init ~ctx () in - let mode = - match con.Config.tls with - | Some Config.{crt; key} -> - `TLS - ( `Crt_file_path crt, - `Key_file_path key, - `No_password, - `Port con.Config.port ) - | None -> `TCP (`Port con.Config.port) - in - let promise = - Cohttp_lwt_unix.Server.create - ~stop - ~ctx - ~mode - (Cohttp_lwt_unix.Server.make - ~callback: - (callback ~conf ~admins ~users pool) - ()) - in - Lib_teztale_base.Log.info logger (fun () -> - Printf.sprintf - "Server listening at %s:%d." - (match con.Config.source with - | None -> "" - | Some s -> s) - con.port) ; - promise) - conf.Config.network_interfaces - in - let* () = Lwt.join servers in - Lwt.return 0)))) + let show_error_and_exit e = + let* () = Lwt_io.eprintl (Caqti_error.show e) in + Lwt.return 1 + in + let ( let*! ) v f = + let* res = v in + match res with Error e -> show_error_and_exit e | Ok v -> f v in - exit code + Lwt_main.run + (let res = Caqti_lwt_unix.connect_pool ~env:Sql_requests.env uri in + match res with + | Error e -> show_error_and_exit e + | Ok pool -> + let*! () = maybe_alter_and_create_tables pool in + let stop, paf = Lwt.task () in + let shutdown _ = Lwt.wakeup paf () in + let _ = Sys.signal Sys.sigint (Sys.Signal_handle shutdown) in + let*! () = + let+ list = + Lwt_list.map_s + (fun (login, password) -> + let password = Bcrypt.hash password in + upsert_user conf pool login password) + conf.Config.users + in + List.find_map (function Error e -> Some e | _ -> None) list + |> Option.fold ~none:(Ok ()) ~some:(fun e -> Error e) + in + let*! () = refresh_users conf pool users in + let servers = + List.map + (fun con -> + let* ctx = Conduit_lwt_unix.init ?src:con.Config.source () in + let ctx = Cohttp_lwt_unix.Net.init ~ctx () in + let mode = + match con.Config.tls with + | Some Config.{crt; key} -> + `TLS + ( `Crt_file_path crt, + `Key_file_path key, + `No_password, + `Port con.Config.port ) + | None -> `TCP (`Port con.Config.port) + in + let promise = + Cohttp_lwt_unix.Server.create + ~stop + ~ctx + ~mode + (Cohttp_lwt_unix.Server.make + ~callback:(callback ~conf ~admins ~users pool) + ()) + in + Lib_teztale_base.Log.info logger (fun () -> + Printf.sprintf + "Server listening at %s:%d." + (match con.Config.source with + | None -> "" + | Some s -> s) + con.port) ; + promise) + conf.Config.network_interfaces + in + let* () = Lwt.join servers in + Lwt.return 0) + |> exit let () = if Array.length Sys.argv <> 2 then ( -- GitLab