TAMSVIZ
Visualization and annotation tool for ROS
topic.cpp
1 // TAMSVIZ
2 // (c) 2020 Philipp Ruppel
3 
4 #include "topic.h"
5 
6 #include "workspace.h"
7 
8 #include <ros/master.h>
9 
10 #include <boost/thread.hpp>
11 
12 struct TopicRegistry {
13 
14  std::unordered_map<std::string, std::weak_ptr<Topic>> topic_map;
15  std::mutex topic_map_mutex;
16 
17  std::unordered_map<std::string, size_t> topic_bag_counters;
18  std::mutex topic_bag_counter_mutex;
19 
20  static std::shared_ptr<TopicRegistry> instance() {
21  static auto instance = std::make_shared<TopicRegistry>();
22  return instance;
23  }
24 };
25 
26 std::shared_ptr<MessageType>
27 MessageType::instance(const std::string &hash, const std::string &name,
28  const std::string &definition) {
29  static std::map<std::string, std::shared_ptr<MessageType>> _cache;
30  static std::mutex _mutex;
31  std::lock_guard<std::mutex> lock(_mutex);
32  auto &instance = _cache[hash];
33  if (!instance) {
34  instance = std::shared_ptr<MessageType>(new MessageType());
35  // LOG_DEBUG("message type " << hash << " " << name << "\n" << definition);
36  instance->_hash = hash;
37  instance->_name = name;
38  instance->_definition = definition;
39  }
40  return instance;
41 }
42 
43 std::vector<std::string>
44 TopicManager::listTopics(const std::string &type_name) {
45  std::set<std::string> topic_set;
46  {
47  ros::master::V_TopicInfo ros_topics;
48  ros::master::getTopics(ros_topics);
49  for (auto &topic : ros_topics) {
50  if (type_name.empty() || type_name == "*" ||
51  topic.datatype == type_name) {
52  topic_set.insert(topic.name);
53  }
54  }
55  }
56  {
57  auto registry = TopicRegistry::instance();
58  std::lock_guard<std::mutex> lock(registry->topic_map_mutex);
59  for (auto &pair : registry->topic_map) {
60  if (auto topic = pair.second.lock()) {
61  auto message = topic->message();
62  if (type_name.empty() || type_name == "*" ||
63  message && message->type()->name() == type_name) {
64  topic_set.insert(pair.first);
65  }
66  }
67  }
68  }
69  std::vector<std::string> ret;
70  for (auto &t : topic_set) {
71  ret.push_back(t);
72  }
73  return ret;
74 }
75 
76 std::vector<std::string> TopicManager::listTopics() {
77  return listTopics(std::string());
78 }
79 
80 const std::shared_ptr<TopicManager> &TopicManager::instance() {
81  static auto instance = std::make_shared<TopicManager>();
82  return instance;
83 }
84 
85 void Topic::_subscribe(const std::shared_ptr<Topic> &topic) {
86  static ros::NodeHandle _ros_node;
87  std::weak_ptr<Topic> topic_weak = topic;
88  {
89  std::lock_guard<std::mutex> lock(topic->_message_mutex);
90  topic->_message_instance = nullptr;
91  }
92  topic->connected();
93  topic->_ros_subscriber = _ros_node.subscribe<Message>(
94  topic->_topic_name, 5,
95  boost::function<void(const Message &)>([topic_weak](const Message &msg) {
96  if (auto topic = topic_weak.lock()) {
97  {
98  std::lock_guard<std::mutex> lock(
99  topic->_registry->topic_bag_counter_mutex);
100  if (topic->_registry->topic_bag_counters[topic->name()] != 0) {
101  return;
102  }
103  }
104  auto m = std::make_shared<Message>(msg);
105  m->time(ros::Time::now());
106  topic->publish(m);
107  }
108  }));
109 }
110 void Topic::_unsubscribe(const std::shared_ptr<Topic> &topic) {
111  topic->_ros_subscriber.shutdown();
112  topic->_ros_subscriber = ros::Subscriber();
113  {
114  std::lock_guard<std::mutex> lock(topic->_message_mutex);
115  topic->_message_instance = nullptr;
116  }
117  topic->connected();
118 }
119 
120 MessagePlaybackScope::MessagePlaybackScope(
121  const std::vector<std::string> &topics)
122  : _topics(topics), _registry(TopicRegistry::instance()) {
123  std::lock_guard<std::mutex> lock(_registry->topic_map_mutex);
124  for (auto &topic_name : _topics) {
125  bool unsubscribe = false;
126  {
127  std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
128  _registry->topic_bag_counters[topic_name]++;
129  if (_registry->topic_bag_counters[topic_name] == 1) {
130  unsubscribe = true;
131  }
132  }
133  if (unsubscribe) {
134  if (auto topic_instance = _registry->topic_map[topic_name].lock()) {
135  Topic::_unsubscribe(topic_instance);
136  }
137  }
138  }
139 }
140 
141 MessagePlaybackScope::~MessagePlaybackScope() {
142  std::lock_guard<std::mutex> lock(_registry->topic_map_mutex);
143  for (auto &topic_name : _topics) {
144  bool subscribe = false;
145  {
146  std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
147  _registry->topic_bag_counters[topic_name]--;
148  if (_registry->topic_bag_counters[topic_name] == 0) {
149  subscribe = true;
150  }
151  }
152  if (subscribe) {
153  if (auto topic_instance = _registry->topic_map[topic_name].lock()) {
154  Topic::_subscribe(topic_instance);
155  }
156  }
157  }
158 }
159 
160 Topic::Topic(const std::string &name)
161  : _topic_name(name), _registry(TopicRegistry::instance()) {
162  _connections = 0;
163 }
164 Topic::~Topic() {}
165 std::shared_ptr<Topic> Topic::instance(const std::string &name) {
166  auto registry = TopicRegistry::instance();
167  std::lock_guard<std::mutex> lock(registry->topic_map_mutex);
168  auto topic = registry->topic_map[name].lock();
169  if (!topic) {
170  topic = std::shared_ptr<Topic>(new Topic(name));
171  {
172  bool sub = false;
173  {
174  std::lock_guard<std::mutex> lock(registry->topic_bag_counter_mutex);
175  sub = (registry->topic_bag_counters[topic->name()] == 0);
176  }
177  if (sub) {
178  _subscribe(topic);
179  }
180  }
181  registry->topic_map[name] = topic;
182  }
183  return topic;
184 }
185 
186 bool Topic::isFromBag() const {
187  std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
188  return _registry->topic_bag_counters[_topic_name] > 0;
189 }
190 
191 /*
192 bool Topic::isFromBag(const std::string &topic) {
193  auto registry = TopicRegistry::instance();
194  std::lock_guard<std::mutex> lock(registry->topic_bag_counter_mutex);
195  return registry->topic_bag_counters[topic] > 0;
196 }
197 */
198 
199 static std::shared_ptr<boost::shared_mutex> topicMessageMutex() {
200  static auto instance = std::make_shared<boost::shared_mutex>();
201  return instance;
202 }
203 void Topic::publish(const std::shared_ptr<const Message> &message) {
204  auto mutex = topicMessageMutex();
205  boost::shared_lock<boost::shared_mutex> lock(*mutex);
206  {
207  std::unique_lock<std::mutex> lock(_message_mutex);
208  _message_instance = message;
209  }
210  received(message);
211  if (_connections > 0) {
212  TopicManager::instance()->received();
213  }
214 }
215 NoMessageScope::NoMessageScope() {
216  _instance = topicMessageMutex();
217  topicMessageMutex()->lock();
218 }
219 NoMessageScope::~NoMessageScope() { topicMessageMutex()->unlock(); }
Definition: topic.h:30