paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
token.h
Go to the documentation of this file.
1 
8 /*******************************************************************************
9  * Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
10  *
11  * All rights reserved. This program and the accompanying materials
12  * are made available under the terms of the Eclipse Public License v2.0
13  * and Eclipse Distribution License v1.0 which accompany this distribution.
14  *
15  * The Eclipse Public License is available at
16  * http://www.eclipse.org/legal/epl-v20.html
17  * and the Eclipse Distribution License is available at
18  * http://www.eclipse.org/org/documents/edl-v10.php.
19  *
20  * Contributors:
21  * Frank Pagliughi - initial implementation and documentation
22  * Frank Pagliughi - MQTT v5 support & server responses
23  *******************************************************************************/
24 
25 #ifndef __mqtt_token_h
26 #define __mqtt_token_h
27 
28 #include "MQTTAsync.h"
29 #include "mqtt/iaction_listener.h"
30 #include "mqtt/exception.h"
31 #include "mqtt/types.h"
32 #include "mqtt/properties.h"
33 #include "mqtt/buffer_ref.h"
34 #include "mqtt/string_collection.h"
35 #include "mqtt/server_response.h"
36 #include <vector>
37 #include <thread>
38 #include <mutex>
39 #include <condition_variable>
40 #include <chrono>
41 
42 namespace mqtt {
43 
44 class iasync_client;
45 
47 
52 class token
53 {
54 public:
56  using ptr_t = std::shared_ptr<token>;
58  using const_ptr_t = std::shared_ptr<const token>;
60  using weak_ptr_t = std::weak_ptr<token>;
61 
63  enum Type {
69  };
70 
71 private:
73  using guard = std::lock_guard<std::mutex>;
75  using unique_lock = std::unique_lock<std::mutex>;
76 
78  mutable std::mutex lock_;
80  mutable std::condition_variable cond_;
81 
83  Type type_;
85  iasync_client* cli_;
87  int rc_;
89  ReasonCode reasonCode_;
91  string errMsg_;
93  MQTTAsync_token msgId_;
97  void* userContext_;
98 
104  iaction_listener* listener_;
106  size_t nExpected_;
108  bool complete_;
109 
111  //properties props_;
113  std::unique_ptr<connect_response> connRsp_;
115  std::unique_ptr<subscribe_response> subRsp_;
117  std::unique_ptr<unsubscribe_response> unsubRsp_;
118 
120  friend class async_client;
121  friend class mock_async_client;
122 
123  friend class connect_options;
124  friend class response_options;
126  friend class disconnect_options;
127 
131  void reset();
137  void set_message_id(MQTTAsync_token msgId) {
138  guard g(lock_);
139  msgId_ = msgId;
140  }
150  static void on_success(void* tokObj, MQTTAsync_successData* rsp);
151  static void on_success5(void* tokObj, MQTTAsync_successData5* rsp);
161  static void on_failure(void* tokObj, MQTTAsync_failureData* rsp);
162  static void on_failure5(void* tokObj, MQTTAsync_failureData5* rsp);
169  static void on_connected(void* tokObj, char* /*cause*/);
174  void on_success(MQTTAsync_successData* rsp);
175  void on_success5(MQTTAsync_successData5* rsp);
180  void on_failure(MQTTAsync_failureData* rsp);
181  void on_failure5(MQTTAsync_failureData5* rsp);
182 
187  void check_ret() const {
188  if (rc_ != MQTTASYNC_SUCCESS || reasonCode_ > ReasonCode::GRANTED_QOS_2)
189  throw exception(rc_, reasonCode_, errMsg_);
190  }
191 
192 public:
199  : token(typ, cli, MQTTAsync_token(0)) {}
209  token(Type typ, iasync_client& cli, void* userContext, iaction_listener& cb)
210  : token(typ, cli, const_string_collection_ptr(), userContext, cb) {}
211 
218  token(Type typ, iasync_client& cli, const string& topic)
219  : token(typ, cli, string_collection::create(topic)) {}
230  token(Type typ, iasync_client& cli, const string& topic,
231  void* userContext, iaction_listener& cb)
232  : token(typ, cli, string_collection::create(topic), userContext, cb) {}
233 
252  void* userContext, iaction_listener& cb);
259  token(Type typ, iasync_client& cli, MQTTAsync_token tok);
263  virtual ~token() {}
270  static ptr_t create(Type typ, iasync_client& cli) {
271  return std::make_shared<token>(typ, cli);
272  }
282  static ptr_t create(Type typ, iasync_client& cli, void* userContext,
283  iaction_listener& cb) {
284  return std::make_shared<token>(typ, cli, userContext, cb);
285  }
292  static ptr_t create(Type typ, iasync_client& cli, const string& topic) {
293  return std::make_shared<token>(typ, cli, topic);
294  }
305  static ptr_t create(Type typ, iasync_client& cli, const string& topic,
306  void* userContext, iaction_listener& cb) {
307  return std::make_shared<token>(typ, cli, topic, userContext, cb);
308  }
316  return std::make_shared<token>(typ, cli, topics);
317  }
329  void* userContext, iaction_listener& cb) {
330  return std::make_shared<token>(typ, cli, topics, userContext, cb);
331  }
337  Type get_type() const { return type_; }
343  guard g(lock_);
344  return listener_;
345  }
351  virtual iasync_client* get_client() const { return cli_; }
356  virtual int get_message_id() const {
357  static_assert(sizeof(msgId_) <= sizeof(int), "MQTTAsync_token must fit into int");
358  return int(msgId_);
359  }
367  return topics_;
368  }
373  virtual void* get_user_context() const {
374  guard g(lock_);
375  return userContext_;
376  }
381  virtual bool is_complete() const { return complete_; }
388  virtual int get_return_code() const { return rc_; }
393  virtual void set_action_callback(iaction_listener& listener) {
394  guard g(lock_);
395  listener_ = &listener;
396  }
402  virtual void set_user_context(void* userContext) {
403  guard g(lock_);
404  userContext_ = userContext;
405  }
411  void set_num_expected(size_t n) { nExpected_ = n; }
412 
417  //const properties& get_properties() const { return props_; }
422  ReasonCode get_reason_code() const { return reasonCode_; }
427  virtual void wait();
433  virtual bool try_wait() {
434  guard g(lock_);
435  if (complete_)
436  check_ret();
437  return complete_;
438  }
446  virtual bool wait_for(long timeout) {
447  return wait_for(std::chrono::milliseconds(timeout));
448  }
455  template <class Rep, class Period>
456  bool wait_for(const std::chrono::duration<Rep, Period>& relTime) {
457  unique_lock g(lock_);
458  if (!cond_.wait_for(g, std::chrono::milliseconds(relTime),
459  [this]{return complete_;}))
460  return false;
461  check_ret();
462  return true;
463  }
470  template <class Clock, class Duration>
471  bool wait_until( const std::chrono::time_point<Clock, Duration>& absTime) {
472  unique_lock g(lock_);
473  if (!cond_.wait_until(g, absTime, [this]{return complete_;}))
474  return false;
475  check_ret();
476  return true;
477  }
478 
503 };
504 
507 
510 
511 
513 // end namespace mqtt
514 }
515 
516 #endif // __mqtt_token_h
517 
static ptr_t create(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition: token.h:305
unsubscribe_response get_unsubscribe_response() const
Definition: response_options.h:203
Definition: async_client.h:107
token(Type typ, iasync_client &cli)
Definition: token.h:198
Definition: iaction_listener.h:48
Definition: token.h:68
token::const_ptr_t const_token_ptr
Definition: token.h:509
std::weak_ptr< token > weak_ptr_t
Definition: token.h:60
connect_response get_connect_response() const
string_collection::const_ptr_t const_string_collection_ptr
Definition: string_collection.h:234
Type
Definition: token.h:63
virtual iaction_listener * get_action_callback() const
Definition: token.h:342
Definition: connect_options.h:48
Definition: token.h:67
Definition: server_response.h:74
Definition: topic.h:43
subscribe_response get_subscribe_response() const
virtual void wait()
virtual bool is_complete() const
Definition: token.h:381
virtual void set_user_context(void *userContext)
Definition: token.h:402
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
Definition: token.h:328
virtual void set_action_callback(iaction_listener &listener)
Definition: token.h:393
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics)
Definition: token.h:315
void set_num_expected(size_t n)
Definition: token.h:411
Type get_type() const
Definition: token.h:337
virtual void * get_user_context() const
Definition: token.h:373
Definition: token.h:66
Definition: string_collection.h:42
static ptr_t create(Type typ, iasync_client &cli)
Definition: token.h:270
Definition: exception.h:46
bool wait_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition: token.h:471
virtual bool wait_for(long timeout)
Definition: token.h:446
Definition: types.h:62
Definition: iasync_client.h:58
std::shared_ptr< const token > const_ptr_t
Definition: token.h:58
virtual int get_message_id() const
Definition: token.h:356
virtual iasync_client * get_client() const
Definition: token.h:351
token(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition: token.h:209
friend class mock_async_client
Definition: token.h:121
Definition: token.h:65
token::ptr_t token_ptr
Definition: token.h:506
ReasonCode
Definition: types.h:57
virtual bool try_wait()
Definition: token.h:433
virtual const_string_collection_ptr get_topics() const
Definition: token.h:366
static ptr_t create(Type typ, iasync_client &cli, const string &topic)
Definition: token.h:292
Definition: server_response.h:176
ReasonCode get_reason_code() const
Definition: token.h:422
std::shared_ptr< token > ptr_t
Definition: token.h:56
Definition: token.h:64
Definition: response_options.h:34
Definition: token.h:52
virtual int get_return_code() const
Definition: token.h:388
Definition: async_client.h:49
Definition: server_response.h:122
token(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition: token.h:230
token(Type typ, iasync_client &cli, const string &topic)
Definition: token.h:218
virtual ~token()
Definition: token.h:263
Definition: disconnect_options.h:39
bool wait_for(const std::chrono::duration< Rep, Period > &relTime)
Definition: token.h:456
static ptr_t create(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition: token.h:282