TAMSVIZ
Visualization and annotation tool for ROS
topic.h
1 // TAMSVIZ
2 // (c) 2020 Philipp Ruppel
3 
4 #pragma once
5 
6 #include "event.h"
7 #include "log.h"
8 #include "message.h"
9 #include "property.h"
10 #include "watcher.h"
11 
12 #include <ros/ros.h>
13 
14 #include <atomic>
15 #include <memory>
16 #include <mutex>
17 #include <unordered_map>
18 #include <vector>
19 
20 class TopicManager {
21 public:
22  std::vector<std::string> listTopics(const std::string &type_name);
23  std::vector<std::string> listTopics();
24  Event<void()> received{"received"};
25  static const std::shared_ptr<TopicManager> &instance();
26 };
27 
28 class TopicRegistry;
29 
30 class Topic {
31  std::string _topic_name;
32  ros::Subscriber _ros_subscriber;
33  std::mutex _message_mutex;
34  std::shared_ptr<const Message> _message_instance;
35  std::atomic<size_t> _connections;
36  std::shared_ptr<TopicRegistry> _registry;
37  Topic(const std::string &name);
38  Topic(const Topic &) = delete;
39  Topic &operator=(const Topic &) = delete;
40  static void _subscribe(const std::shared_ptr<Topic> &topic);
41  static void _unsubscribe(const std::shared_ptr<Topic> &topic);
42 
43 public:
44  ~Topic();
45  void countConnection(size_t d) { _connections += d; }
46  void publish(const std::shared_ptr<const Message> &message);
47  inline const std::string &name() const { return _topic_name; }
48  std::shared_ptr<const Message> message() {
49  std::unique_lock<std::mutex> lock(_message_mutex);
50  return _message_instance;
51  }
52  static std::shared_ptr<Topic> instance(const std::string &name);
54  Event<void()> connected;
55  bool isFromBag() const;
56  // static bool isFromBag(const std::string &topic);
57  friend class MessagePlaybackScope;
58 };
59 
61  std::vector<std::string> _topics;
62  std::shared_ptr<TopicRegistry> _registry;
63 
64 public:
65  MessagePlaybackScope(const std::vector<std::string> &topics);
68  MessagePlaybackScope &operator=(const MessagePlaybackScope &) = delete;
69 };
70 
71 template <class Message> class Publisher {
72  std::string _topic;
73  std::unique_ptr<ros::Publisher> _publisher;
74 
75 public:
76  Publisher() {}
77  Publisher(const std::string &topic) { this->topic(topic); }
78  Publisher(const Publisher &) = delete;
79  Publisher &operator=(const Publisher &) = delete;
80  const std::string &topic() const { return _topic; }
81  void topic(const std::string &topic) {
82  if (topic != _topic) {
83  _topic = topic;
84  if (_topic.empty()) {
85  _publisher.reset();
86  } else {
87  static ros::NodeHandle ros_node;
88  _publisher.reset(
89  new ros::Publisher(ros_node.advertise<Message>(topic, 1000)));
90  }
91  _topic = topic;
92  }
93  }
94  bool connected() const {
95  return _publisher && _publisher->getNumSubscribers() > 0;
96  }
97  void publish(const Message &message) {
98  if (connected()) {
99  _publisher->publish(message);
100  }
101  }
102  void publish(const std::shared_ptr<const Message> &message) {
103  if (connected()) {
104  _publisher->publish(*message);
105  }
106  }
107  void publish(const std::string &topic, const Message &message) {
108  this->topic(topic);
109  if (connected()) {
110  _publisher->publish(message);
111  }
112  }
113  void publish(const std::string &topic,
114  const std::shared_ptr<const Message> &message) {
115  this->topic(topic);
116  if (connected()) {
117  _publisher->publish(*message);
118  }
119  }
120 };
121 
122 template <class M> class Subscriber {
123  struct Callback {
124  std::weak_ptr<const void> receiver;
125  };
126  bool _visible = true;
127  std::shared_ptr<Callback> _callback;
128  std::shared_ptr<Topic> _topic;
129  std::shared_ptr<const M> _message_instance;
130  Watcher _watcher;
131  Subscriber() {}
132  class MessageCast {
133  std::shared_ptr<const Message> _message;
134 
135  public:
136  inline MessageCast(const std::shared_ptr<const Message> &message)
137  : _message(message) {}
138  inline operator std::shared_ptr<const Message>() { return _message; }
139  template <class T> inline operator std::shared_ptr<T>() {
140  return _message ? _message->instantiate<T>() : nullptr;
141  }
142  };
143  template <class F>
144  static void invoke(const F &callback,
145  const std::shared_ptr<const Message> &message) {
146  if (!message) {
147  return;
148  }
149  callback(MessageCast(message));
150  }
151 
152 public:
153  Subscriber(const std::string &name, bool visible = true) {
154  _topic = Topic::instance(name);
155  _visible = visible;
156  if (_visible) {
157  _topic->countConnection(+1);
158  }
159  }
160  template <class F>
161  Subscriber(const std::string &name,
162  const std::shared_ptr<const void> &receiver, const F &function,
163  bool visible = true) {
164  _topic = Topic::instance(name);
165  _callback = std::make_shared<Callback>();
166  _callback->receiver = receiver;
167  auto *callback_ptr = _callback.get();
168  _topic->received.connect(
169  _callback, [callback_ptr,
170  function](const std::shared_ptr<const Message> &message) {
171  if (auto ptr = callback_ptr->receiver.lock()) {
172  invoke(function, message);
173  }
174  });
175  invoke(function, _topic->message());
176  _visible = visible;
177  if (_visible) {
178  _topic->countConnection(+1);
179  }
180  }
181  ~Subscriber() {
182  if (_visible) {
183  _topic->countConnection(-1);
184  }
185  }
186  Subscriber(const Subscriber &) = delete;
187  Subscriber &operator=(const Subscriber &) = delete;
188  const std::shared_ptr<Topic> &topic() const { return _topic; }
189  std::shared_ptr<const M> message() {
190  auto msg = _topic->message();
191  if (_watcher.changed(msg)) {
192  if (msg) {
193  _message_instance = msg->instantiate<M>();
194  } else {
195  _message_instance = nullptr;
196  }
197  LOG_DEBUG("message instance " << msg.get() << " "
198  << _message_instance.get());
199  }
200  return _message_instance;
201  }
202 };
203 
205  std::shared_ptr<void> _instance;
206 
207 public:
208  NoMessageScope();
209  ~NoMessageScope();
210  NoMessageScope(const NoMessageScope &) = delete;
211  NoMessageScope &operator=(const NoMessageScope &) = delete;
212 };
213 
214 template <class Message> class TopicProperty {
215 private:
216  std::string _topic;
217  bool _used = false;
218  std::shared_ptr<void> _callback_object;
219  std::function<void(const std::shared_ptr<const Message> &)>
220  _callback_function;
221  std::shared_ptr<Subscriber<Message>> _subscriber;
222  void _sync() {
223  if ((_topic.empty() || !_used) && _subscriber) {
224  _subscriber = nullptr;
225  }
226  if (!_topic.empty() && _used) {
227  if (!_subscriber || _subscriber->topic()->name() != _topic) {
228  if (_callback_object && _callback_function) {
229  _subscriber = std::make_shared<Subscriber<Message>>(
230  _topic, _callback_object, _callback_function);
231  } else {
232  _subscriber = std::make_shared<Subscriber<Message>>(_topic);
233  }
234  }
235  }
236  }
237 
238 public:
239  TopicProperty() {}
240  TopicProperty(const std::string &topic) : _topic(topic) {}
241  TopicProperty(const TopicProperty &other) {
242  _topic = other._topic;
243  _sync();
244  }
245  TopicProperty &operator=(const TopicProperty &other) {
246  _topic = other._topic;
247  _sync();
248  return *this;
249  }
250  void connect(const std::shared_ptr<void> &callback_object,
251  const std::function<void(const std::shared_ptr<const Message> &)>
252  &callback_function) {
253  _used = true;
254  _callback_object = callback_object;
255  _callback_function = callback_function;
256  _sync();
257  }
258  void connect() {
259  _used = true;
260  _sync();
261  }
262  const std::string &topic() const { return _topic; }
263  void topic(const std::string &topic) {
264  if (topic != _topic) {
265  _topic = topic;
266  _sync();
267  }
268  }
269  std::shared_ptr<const Message> message() {
270  _used = true;
271  _sync();
272  if (_subscriber) {
273  return _subscriber->message();
274  }
275  return nullptr;
276  }
277  std::shared_ptr<Subscriber<Message>> subscriber() {
278  _used = true;
279  _sync();
280  return _subscriber;
281  }
282 };
283 template <class T>
284 inline void toString(const TopicProperty<T> &v, std::string &s) {
285  s = v.topic();
286 }
287 template <class T>
288 inline void fromString(TopicProperty<T> &v, const std::string &s) {
289  v.topic(s);
290 }
291 template <class T>
292 bool operator==(const TopicProperty<T> &a, const TopicProperty<T> &b) {
293  return a.topic() == b.topic();
294 }
295 template <class T>
296 bool operator!=(const TopicProperty<T> &a, const TopicProperty<T> &b) {
297  return a.topic() != b.topic();
298 }
299 template <class T> struct DefaultPropertyAttributes<TopicProperty<T>> {
300  static inline void initialize(PropertyAttributes *attributes) {
301  attributes->list = [](const Property &property) {
302  // return std::vector<std::string>({"test", "bla"});
303  return TopicManager::instance()->listTopics(
304  ros::message_traits::DataType<T>::value());
305  };
306  }
307 };
Definition: topic.h:30
Definition: watcher.h:8