From 2265e3aaf67f6180614c48acec6dadb469b30275 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Mon, 1 Oct 2018 14:41:30 +0200 Subject: [PATCH] API changes - (breaking) (client "broker" {}) instead of (client {:broker-url ...}) - (breaking) received messages have :retain? instead of :retained? - client-id-prefix added as client-opt - retain? as publish-opt --- src/otarta/core.cljs | 63 ++++++++++++++++++++++++++------------ src/otarta/main.cljs | 6 ++-- test/otarta/core_test.cljs | 19 ++++++++++-- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 09c95eb..d37c329 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -229,22 +229,44 @@ [nil client])) +(defn- client-id + "Yields `client-id` if provided. Otherwise generated of `client-id-prefix` (default \"otarta\") and random characters. The generated id is compliant with [MQTT-3.1.3-5] (ie matches [0-9a-zA-Z]{1-23})." + [{:keys [client-id client-id-prefix]}] + (if (some? client-id) + client-id + (let [prefix (or client-id-prefix "otarta")] + (-> prefix + (str (random-uuid)) + (string/replace #"-" "") + (subs 0 23))))) + + (defn client - "Accepts the following parameters: - - broker-url (required) - url of the form ws(s)://(user:password@)host:12345/path(#some/root-topic). -The root-topic is prepended to all subscribes/publishes and ensures that the client only needs to care about topics that are relevant for the application, e.g. \"temperature/current\" (instead of \"staging/sensor0/temperature/current\"). You can provide a default-topic-root. + "Initialize a client for publish/subscribe. + + Arguments: + - broker-url - url of the form ws(s)://(user:password@)host:12345/path(#some/root-topic). + The root-topic is prepended to all subscribes/publishes and ensures that the client only needs to care about topics that are relevant for the application, e.g. \"temperature/current\" (instead of \"staging/sensor0/temperature/current\"). You can provide a default-root-topic. + + Accepts the following options: - default-root-topic - root-topic used when broker-url does not contain one. This e.g. allows the client-logic to subscribe to \"#\" knowing that it won't subscribe to the root of a broker. - keep-alive (default 60) - maximum seconds between pings. - - client-id (default \"otarta-\") - Client Identifier used to connect to broker. + - client-id (default \"\" (max. 23 characters as per MQTT-spec)) - Client Identifier used to connect to broker. This should be unique accross all connected clients. +WARNING: Connecting with a client-id that's already in use results in the existing client being disconnected. + - client-id-prefix (default \"otarta\") - convenient to see in the logs of your broker where the client originates from without running the risk of clashing with existing client-id's. " - [{:keys [broker-url default-root-topic] :as opts}] - {:pre [broker-url]} - (let [default-opts {:keep-alive 60 :client-id (str "otarta-" (random-uuid))} - config (-> broker-url - (parse-broker-url {:default-root-topic default-root-topic}) - (merge default-opts) - (merge (select-keys opts [:client-id :keep-alive])))] - {:config config :stream (atom nil) :pinger (atom nil) :packet-identifier (atom 0)})) + ([broker-url] (client broker-url {})) + ([broker-url {:keys [default-root-topic] :as opts}] + (let [default-opts {:keep-alive 60} + config (-> broker-url + (parse-broker-url {:default-root-topic default-root-topic}) + (assoc :client-id (client-id opts)) + (merge default-opts) + (merge (select-keys opts [:keep-alive])))] + {:config config + :stream (atom nil) + :pinger (atom nil) + :packet-identifier (atom 0)}))) (defn connect @@ -264,11 +286,12 @@ The root-topic is prepended to all subscribes/publishes and ensures that the cli -(defn- publish* [{stream :stream :as client} app-topic payload {:keys [format] :or {format :string}}] +(defn- publish* [{stream :stream :as client} app-topic payload {:keys [format retain?] :or {format :string retain? false}}] (info :publish :client client :app-topic app-topic :payload payload :format format) (go (let [{sink :sink} @stream to-publish {:topic (app-topic->broker-topic client app-topic) + :retain? retain? :payload payload :empty? (empty-payload? payload)} [fmt-err formatted] (fmt/write format to-publish)] @@ -292,19 +315,19 @@ The root-topic is prepended to all subscribes/publishes and ensures that the cli (defn- subscription-chan [{stream :stream :as client} topic-filter msg-reader] - (let [{source :source} @stream + (let [{source :source} @stream pkts-for-topic-filter (packet-filter {[:remaining-bytes :topic] (partial topic-filter-matches-topic? topic-filter)}) pkt->msg (fn [{{:keys [retain? dup? qos]} :first-byte {topic :topic} :remaining-bytes {payload :payload} :extra}] - {:dup? dup? - :empty? (empty-payload? payload) - :payload payload - :qos qos - :retained? retain? - :topic (broker-topic->app-topic client topic)}) + {:dup? dup? + :empty? (empty-payload? payload) + :payload payload + :qos qos + :retain? retain? + :topic (broker-topic->app-topic client topic)}) subscription-xf (comp pkts-for-topic-filter (map pkt->msg) (map (comp second msg-reader)) diff --git a/src/otarta/main.cljs b/src/otarta/main.cljs index fc91e58..14e9273 100644 --- a/src/otarta/main.cljs +++ b/src/otarta/main.cljs @@ -17,7 +17,7 @@ (defn handle-sub [broker-url topic-filter] (info :handle-sub :broker-url broker-url :topic-filter topic-filter) (go - (reset! client (mqtt/client {:broker-url broker-url})) + (reset! client (mqtt/client broker-url)) (let [[err {sub-ch :ch}] ( "ws://localhost" (sut/client) :config :client-id)))) + (testing "when prefix provided has correct chars and length" + (is (re-find #"origin[a-zA-Z0-9]{17}$" + (-> "ws://localhost" + (sut/client {:client-id-prefix "origin"}) + :config + :client-id)))) + (testing "when client-id provided it's used as-is" + (is (= "custom-client" + (-> "ws://localhost" + (sut/client {:client-id "custom-client"}) + :config + :client-id))))))) (deftest topic-filter-matches-topic?-test -- GitLab