25 #ifndef __mqtt_async_client_h 26 #define __mqtt_async_client_h 28 #include "MQTTAsync.h" 55 #if defined(PAHO_MQTTPP_VERSIONS) 57 const uint32_t PAHO_MQTTPP_VERSION = 0x01030002;
59 const string PAHO_MQTTPP_VERSION_STR(
"Paho MQTT C++ (mqttpp) v. 1.3.2");
61 const string PAHO_MQTTPP_COPYRIGHT(
"Copyright (c) 2013-2023 Frank Pagliughi");
66 const string VERSION_STR(
"Paho MQTT C++ (mqttpp) v. 1.3.2");
68 const string COPYRIGHT(
"Copyright (c) 2013-2023 Frank Pagliughi");
111 using ptr_t = std::shared_ptr<async_client>;
126 using guard = std::unique_lock<std::mutex>;
128 using unique_lock = std::unique_lock<std::mutex>;
131 mutable std::mutex lock_;
141 std::unique_ptr<MQTTClient_persistence> persist_;
159 std::list<token_ptr> pendingTokens_;
161 std::list<delivery_token_ptr> pendingDeliveryTokens_;
166 static void on_connected(
void* context,
char* cause);
167 static void on_connection_lost(
void *context,
char *cause);
168 static void on_disconnected(
void* context, MQTTProperties* cprops,
169 MQTTReasonCodes reasonCode);
170 static int on_message_arrived(
void* context,
char* topicName,
int topicLen,
171 MQTTAsync_message* msg);
172 static void on_delivery_complete(
void* context, MQTTAsync_token tok);
173 static int on_update_connection(
void* context, MQTTAsync_connectData* cdata);
179 virtual void remove_token(
token* tok)
override;
180 virtual void remove_token(
token_ptr tok) { remove_token(tok.get()); }
184 async_client() =
delete;
185 async_client(
const async_client&) =
delete;
186 async_client& operator=(
const async_client&) =
delete;
189 static void check_ret(
int rc) {
190 if (rc != MQTTASYNC_SUCCESS)
206 async_client(
const string& serverURI,
const string& clientId,
207 const string& persistDir);
221 async_client(
const string& serverURI,
const string& clientId,
222 iclient_persistence* persistence=
nullptr);
236 async_client(
const string& serverURI,
const string& clientId,
237 int maxBufferedMessages,
const string& persistDir);
253 async_client(
const string& serverURI,
const string& clientId,
254 int maxBufferedMessages,
255 iclient_persistence* persistence=
nullptr);
268 async_client(
const string& serverURI,
const string& clientId,
269 const create_options& opts,
const string& persistDir);
284 async_client(
const string& serverURI,
const string& clientId,
285 const create_options& opts,
286 iclient_persistence* persistence=
nullptr);
365 iaction_listener& cb)
override;
424 template <
class Rep,
class Period>
459 template <
class Rep,
class Period>
527 int qos,
bool retained)
override;
552 int qos,
bool retained)
override;
580 const void* payload,
size_t n,
581 int qos,
bool retained,
654 const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
675 const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
752 return que_->try_get(msg);
761 template <
typename Rep,
class Period>
763 const std::chrono::duration<Rep, Period>& relTime) {
764 return que_->try_get_for(msg, relTime);
772 template <
typename Rep,
class Period>
775 que_->try_get_for(&msg, relTime);
785 template <
class Clock,
class Duration>
787 const std::chrono::time_point<Clock,Duration>& absTime) {
788 return que_->try_get_until(msg, absTime);
795 template <
class Clock,
class Duration>
798 que_->try_get_until(&msg, absTime);
810 #endif // __mqtt_async_client_h token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:476
message::const_ptr_t const_message_ptr
Definition: message.h:368
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
Definition: async_client.h:107
string get_server_uri() const override
Definition: async_client.h:498
std::shared_ptr< async_client > ptr_t
Definition: async_client.h:111
token_ptr reconnect() override
std::function< void(const_message_ptr)> message_handler
Definition: async_client.h:116
Definition: iaction_listener.h:48
bool to_bool(int n)
Definition: types.h:161
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition: async_client.h:377
std::function< void(const string &cause)> connection_handler
Definition: async_client.h:118
std::unique_ptr< thread_queue< const_message_ptr > > consumer_queue_type
Definition: async_client.h:113
void set_message_callback(message_handler cb)
token_ptr disconnect(int timeout) override
Definition: async_client.h:411
delivery_token_ptr get_pending_delivery_token(int msgID) const override
string_collection::const_ptr_t const_string_collection_ptr
Definition: string_collection.h:234
bool is_connected() const override
Definition: async_client.h:513
const_message_ptr consume_message() override
Definition: async_client.h:744
token_ptr connect() override
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2")
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition: async_client.h:425
Definition: connect_options.h:48
const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi")
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition: async_client.h:773
void set_disconnected_handler(disconnected_handler cb)
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition: types.h:149
const uint32_t VERSION
Definition: async_client.h:64
void stop_consuming() override
int mqtt_version() const noexcept
Definition: async_client.h:508
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
void set_update_connection_handler(update_connection_handler cb)
Definition: properties.h:255
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition: async_client.h:120
Definition: iasync_client.h:58
void disable_callbacks() override
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition: async_client.h:762
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
token::ptr_t token_ptr
Definition: token.h:506
delivery_token::ptr_t delivery_token_ptr
Definition: delivery_token.h:125
static PAHO_MQTTPP_EXPORT const int DFLT_QOS
Definition: message.h:59
Definition: subscribe_options.h:41
void set_connected_handler(connection_handler cb)
Definition: callback.h:41
void start_consuming() override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: async_client.h:786
std::function< bool(connect_data &)> update_connection_handler
Definition: async_client.h:122
string get_client_id() const override
Definition: async_client.h:493
async_client::ptr_t async_client_ptr
Definition: async_client.h:804
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition: async_client.h:796
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition: async_client.h:560
Definition: async_client.h:49
static PAHO_MQTTPP_EXPORT const bool DFLT_RETAINED
Definition: message.h:61
std::vector< int > qos_collection
Definition: iasync_client.h:65
token_ptr disconnect() override
Definition: async_client.h:392
void set_callback(callback &cb) override
Definition: disconnect_options.h:39
void set_connection_lost_handler(connection_handler cb)
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
bool try_consume_message(const_message_ptr *msg) override
Definition: async_client.h:751
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition: async_client.h:536
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition: async_client.h:460