paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
async_client.h
Go to the documentation of this file.
1 
8 /*******************************************************************************
9  * Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
10  *
11  * All rights reserved. This program and the accompanying materials
12  * are made available under the terms of the Eclipse Public License v2.0
13  * and Eclipse Distribution License v1.0 which accompany this distribution.
14  *
15  * The Eclipse Public License is available at
16  * http://www.eclipse.org/legal/epl-v20.html
17  * and the Eclipse Distribution License is available at
18  * http://www.eclipse.org/org/documents/edl-v10.php.
19  *
20  * Contributors:
21  * Frank Pagliughi - initial implementation and documentation
22  * Frank Pagliughi - MQTT v5 support
23  *******************************************************************************/
24 
25 #ifndef __mqtt_async_client_h
26 #define __mqtt_async_client_h
27 
28 #include "MQTTAsync.h"
29 #include "mqtt/types.h"
30 #include "mqtt/token.h"
31 #include "mqtt/create_options.h"
32 #include "mqtt/string_collection.h"
33 #include "mqtt/delivery_token.h"
35 #include "mqtt/iaction_listener.h"
36 #include "mqtt/properties.h"
37 #include "mqtt/exception.h"
38 #include "mqtt/message.h"
39 #include "mqtt/callback.h"
40 #include "mqtt/thread_queue.h"
41 #include "mqtt/iasync_client.h"
42 #include <vector>
43 #include <list>
44 #include <memory>
45 #include <tuple>
46 #include <functional>
47 #include <stdexcept>
48 
49 namespace mqtt {
50 
51 // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
52 // clashed with #define's from other libraries and will be removed at the
53 // next major version upgrade.
54 
55 #if defined(PAHO_MQTTPP_VERSIONS)
56 
57  const uint32_t PAHO_MQTTPP_VERSION = 0x01030002;
59  const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2");
61  const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
62 #else
63 
64  const uint32_t VERSION = 0x01030002;
66  const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2");
68  const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
69 #endif
70 
72 
107 class async_client : public virtual iasync_client
108 {
109 public:
111  using ptr_t = std::shared_ptr<async_client>;
113  using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
114 
116  using message_handler = std::function<void(const_message_ptr)>;
118  using connection_handler = std::function<void(const string& cause)>;
120  using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
122  using update_connection_handler = std::function<bool(connect_data&)>;
123 
124 private:
126  using guard = std::unique_lock<std::mutex>;
128  using unique_lock = std::unique_lock<std::mutex>;
129 
131  mutable std::mutex lock_;
133  MQTTAsync cli_;
135  string serverURI_;
137  string clientId_;
139  int mqttVersion_;
141  std::unique_ptr<MQTTClient_persistence> persist_;
143  callback* userCallback_;
145  connection_handler connHandler_;
147  connection_handler connLostHandler_;
149  disconnected_handler disconnectedHandler_;
151  update_connection_handler updateConnectionHandler_;
153  message_handler msgHandler_;
155  connect_options connOpts_;
157  token_ptr connTok_;
159  std::list<token_ptr> pendingTokens_;
161  std::list<delivery_token_ptr> pendingDeliveryTokens_;
163  consumer_queue_type que_;
164 
166  static void on_connected(void* context, char* cause);
167  static void on_connection_lost(void *context, char *cause);
168  static void on_disconnected(void* context, MQTTProperties* cprops,
169  MQTTReasonCodes reasonCode);
170  static int on_message_arrived(void* context, char* topicName, int topicLen,
171  MQTTAsync_message* msg);
172  static void on_delivery_complete(void* context, MQTTAsync_token tok);
173  static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
174 
176  friend class token;
177  virtual void add_token(token_ptr tok);
178  virtual void add_token(delivery_token_ptr tok);
179  virtual void remove_token(token* tok) override;
180  virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
181  void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
182 
184  async_client() =delete;
185  async_client(const async_client&) =delete;
186  async_client& operator=(const async_client&) =delete;
187 
189  static void check_ret(int rc) {
190  if (rc != MQTTASYNC_SUCCESS)
191  throw exception(rc);
192  }
193 
194 public:
206  async_client(const string& serverURI, const string& clientId,
207  const string& persistDir);
221  async_client(const string& serverURI, const string& clientId,
222  iclient_persistence* persistence=nullptr);
236  async_client(const string& serverURI, const string& clientId,
237  int maxBufferedMessages, const string& persistDir);
253  async_client(const string& serverURI, const string& clientId,
254  int maxBufferedMessages,
255  iclient_persistence* persistence=nullptr);
268  async_client(const string& serverURI, const string& clientId,
269  const create_options& opts, const string& persistDir);
284  async_client(const string& serverURI, const string& clientId,
285  const create_options& opts,
286  iclient_persistence* persistence=nullptr);
290  ~async_client() override;
297  void set_callback(callback& cb) override;
303  void disable_callbacks() override;
308  void set_connected_handler(connection_handler cb) /*override*/;
313  void set_connection_lost_handler(connection_handler cb) /*override*/;
318  void set_disconnected_handler(disconnected_handler cb) /*override*/;
326  void set_message_callback(message_handler cb) /*override*/;
340  token_ptr connect() override;
350  token_ptr connect(connect_options options) override;
364  token_ptr connect(connect_options options, void* userContext,
365  iaction_listener& cb) override;
377  token_ptr connect(void* userContext, iaction_listener& cb) override {
378  return connect(connect_options{}, userContext, cb);
379  }
385  token_ptr reconnect() override;
400  token_ptr disconnect(disconnect_options opts) override;
411  token_ptr disconnect(int timeout) override {
412  return disconnect(disconnect_options(timeout));
413  }
424  template <class Rep, class Period>
425  token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
426  // TODO: check range
427  return disconnect((int) to_milliseconds_count(timeout));
428  }
443  token_ptr disconnect(int timeout, void* userContext,
444  iaction_listener& cb) override;
459  template <class Rep, class Period>
460  token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
461  void* userContext, iaction_listener& cb) {
462  // TODO: check range
463  return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
464  }
476  token_ptr disconnect(void* userContext, iaction_listener& cb) override {
477  return disconnect(0L, userContext, cb);
478  }
483  delivery_token_ptr get_pending_delivery_token(int msgID) const override;
488  std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
493  string get_client_id() const override { return clientId_; }
498  string get_server_uri() const override { return serverURI_; }
508  int mqtt_version() const noexcept { return mqttVersion_; }
513  bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
526  delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
527  int qos, bool retained) override;
536  delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
537  return publish(std::move(topic), payload, n,
539  }
552  int qos, bool retained) override;
561  return publish(std::move(topic), std::move(payload),
563  }
580  const void* payload, size_t n,
581  int qos, bool retained,
582  void* userContext, iaction_listener& cb) override;
604  void* userContext, iaction_listener& cb) override;
615  token_ptr subscribe(const string& topicFilter, int qos,
616  const subscribe_options& opts=subscribe_options(),
617  const properties& props=properties()) override;
635  token_ptr subscribe(const string& topicFilter, int qos,
636  void* userContext, iaction_listener& cb,
637  const subscribe_options& opts=subscribe_options(),
638  const properties& props=properties()) override;
653  const qos_collection& qos,
654  const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
655  const properties& props=properties()) override;
673  const qos_collection& qos,
674  void* userContext, iaction_listener& cb,
675  const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
676  const properties& props=properties()) override;
685  token_ptr unsubscribe(const string& topicFilter,
686  const properties& props=properties()) override;
697  const properties& props=properties()) override;
710  void* userContext, iaction_listener& cb,
711  const properties& props=properties()) override;
724  token_ptr unsubscribe(const string& topicFilter,
725  void* userContext, iaction_listener& cb,
726  const properties& props=properties()) override;
732  void start_consuming() override;
738  void stop_consuming() override;
744  const_message_ptr consume_message() override { return que_->get(); }
752  return que_->try_get(msg);
753  }
761  template <typename Rep, class Period>
763  const std::chrono::duration<Rep, Period>& relTime) {
764  return que_->try_get_for(msg, relTime);
765  }
772  template <typename Rep, class Period>
773  const_message_ptr try_consume_message_for(const std::chrono::duration<Rep, Period>& relTime) {
774  const_message_ptr msg;
775  que_->try_get_for(&msg, relTime);
776  return msg;
777  }
785  template <class Clock, class Duration>
787  const std::chrono::time_point<Clock,Duration>& absTime) {
788  return que_->try_get_until(msg, absTime);
789  }
795  template <class Clock, class Duration>
796  const_message_ptr try_consume_message_until(const std::chrono::time_point<Clock,Duration>& absTime) {
797  const_message_ptr msg;
798  que_->try_get_until(&msg, absTime);
799  return msg;
800  }
801 };
802 
805 
807 // end namespace mqtt
808 }
809 
810 #endif // __mqtt_async_client_h
811 
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:476
message::const_ptr_t const_message_ptr
Definition: message.h:368
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
Definition: async_client.h:107
string get_server_uri() const override
Definition: async_client.h:498
std::shared_ptr< async_client > ptr_t
Definition: async_client.h:111
token_ptr reconnect() override
std::function< void(const_message_ptr)> message_handler
Definition: async_client.h:116
Definition: iaction_listener.h:48
bool to_bool(int n)
Definition: types.h:161
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:377
std::function< void(const string &cause)> connection_handler
Definition: async_client.h:118
std::unique_ptr< thread_queue< const_message_ptr > > consumer_queue_type
Definition: async_client.h:113
void set_message_callback(message_handler cb)
token_ptr disconnect(int timeout) override
Definition: async_client.h:411
delivery_token_ptr get_pending_delivery_token(int msgID) const override
string_collection::const_ptr_t const_string_collection_ptr
Definition: string_collection.h:234
bool is_connected() const override
Definition: async_client.h:513
const_message_ptr consume_message() override
Definition: async_client.h:744
token_ptr connect() override
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2")
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition: async_client.h:425
Definition: connect_options.h:48
const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi")
Definition: topic.h:43
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition: async_client.h:773
void set_disconnected_handler(disconnected_handler cb)
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition: types.h:149
const uint32_t VERSION
Definition: async_client.h:64
void stop_consuming() override
int mqtt_version() const noexcept
Definition: async_client.h:508
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
void set_update_connection_handler(update_connection_handler cb)
Definition: properties.h:255
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition: async_client.h:120
Definition: iasync_client.h:58
void disable_callbacks() override
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition: async_client.h:762
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
~async_client() override
token::ptr_t token_ptr
Definition: token.h:506
delivery_token::ptr_t delivery_token_ptr
Definition: delivery_token.h:125
static PAHO_MQTTPP_EXPORT const int DFLT_QOS
Definition: message.h:59
Definition: subscribe_options.h:41
void set_connected_handler(connection_handler cb)
Definition: callback.h:41
void start_consuming() override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: async_client.h:786
std::function< bool(connect_data &)> update_connection_handler
Definition: async_client.h:122
string get_client_id() const override
Definition: async_client.h:493
async_client::ptr_t async_client_ptr
Definition: async_client.h:804
Definition: token.h:52
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition: async_client.h:796
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition: async_client.h:560
Definition: async_client.h:49
static PAHO_MQTTPP_EXPORT const bool DFLT_RETAINED
Definition: message.h:61
std::vector< int > qos_collection
Definition: iasync_client.h:65
token_ptr disconnect() override
Definition: async_client.h:392
void set_callback(callback &cb) override
Definition: disconnect_options.h:39
void set_connection_lost_handler(connection_handler cb)
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
bool try_consume_message(const_message_ptr *msg) override
Definition: async_client.h:751
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition: async_client.h:536
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition: async_client.h:460