8 #include <ros/master.h> 10 #include <boost/thread.hpp> 14 std::unordered_map<std::string, std::weak_ptr<Topic>> topic_map;
15 std::mutex topic_map_mutex;
17 std::unordered_map<std::string, size_t> topic_bag_counters;
18 std::mutex topic_bag_counter_mutex;
20 static std::shared_ptr<TopicRegistry> instance() {
21 static auto instance = std::make_shared<TopicRegistry>();
26 std::shared_ptr<MessageType>
27 MessageType::instance(
const std::string &hash,
const std::string &name,
28 const std::string &definition) {
29 static std::map<std::string, std::shared_ptr<MessageType>> _cache;
30 static std::mutex _mutex;
31 std::lock_guard<std::mutex> lock(_mutex);
32 auto &instance = _cache[hash];
34 instance = std::shared_ptr<MessageType>(
new MessageType());
36 instance->_hash = hash;
37 instance->_name = name;
38 instance->_definition = definition;
43 std::vector<std::string>
44 TopicManager::listTopics(
const std::string &type_name) {
45 std::set<std::string> topic_set;
47 ros::master::V_TopicInfo ros_topics;
48 ros::master::getTopics(ros_topics);
49 for (
auto &topic : ros_topics) {
50 if (type_name.empty() || type_name ==
"*" ||
51 topic.datatype == type_name) {
52 topic_set.insert(topic.name);
57 auto registry = TopicRegistry::instance();
58 std::lock_guard<std::mutex> lock(registry->topic_map_mutex);
59 for (
auto &pair : registry->topic_map) {
60 if (
auto topic = pair.second.lock()) {
61 auto message = topic->message();
62 if (type_name.empty() || type_name ==
"*" ||
63 message && message->type()->name() == type_name) {
64 topic_set.insert(pair.first);
69 std::vector<std::string> ret;
70 for (
auto &t : topic_set) {
76 std::vector<std::string> TopicManager::listTopics() {
77 return listTopics(std::string());
80 const std::shared_ptr<TopicManager> &TopicManager::instance() {
81 static auto instance = std::make_shared<TopicManager>();
85 void Topic::_subscribe(
const std::shared_ptr<Topic> &topic) {
86 static ros::NodeHandle _ros_node;
87 std::weak_ptr<Topic> topic_weak = topic;
89 std::lock_guard<std::mutex> lock(topic->_message_mutex);
90 topic->_message_instance =
nullptr;
93 topic->_ros_subscriber = _ros_node.subscribe<
Message>(
94 topic->_topic_name, 5,
95 boost::function<void(const Message &)>([topic_weak](
const Message &msg) {
96 if (
auto topic = topic_weak.lock()) {
98 std::lock_guard<std::mutex> lock(
99 topic->_registry->topic_bag_counter_mutex);
100 if (topic->_registry->topic_bag_counters[topic->name()] != 0) {
104 auto m = std::make_shared<Message>(msg);
105 m->time(ros::Time::now());
110 void Topic::_unsubscribe(
const std::shared_ptr<Topic> &topic) {
111 topic->_ros_subscriber.shutdown();
112 topic->_ros_subscriber = ros::Subscriber();
114 std::lock_guard<std::mutex> lock(topic->_message_mutex);
115 topic->_message_instance =
nullptr;
120 MessagePlaybackScope::MessagePlaybackScope(
121 const std::vector<std::string> &topics)
122 : _topics(topics), _registry(TopicRegistry::instance()) {
123 std::lock_guard<std::mutex> lock(_registry->topic_map_mutex);
124 for (
auto &topic_name : _topics) {
125 bool unsubscribe =
false;
127 std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
128 _registry->topic_bag_counters[topic_name]++;
129 if (_registry->topic_bag_counters[topic_name] == 1) {
134 if (
auto topic_instance = _registry->topic_map[topic_name].lock()) {
135 Topic::_unsubscribe(topic_instance);
141 MessagePlaybackScope::~MessagePlaybackScope() {
142 std::lock_guard<std::mutex> lock(_registry->topic_map_mutex);
143 for (
auto &topic_name : _topics) {
144 bool subscribe =
false;
146 std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
147 _registry->topic_bag_counters[topic_name]--;
148 if (_registry->topic_bag_counters[topic_name] == 0) {
153 if (
auto topic_instance = _registry->topic_map[topic_name].lock()) {
154 Topic::_subscribe(topic_instance);
160 Topic::Topic(
const std::string &name)
161 : _topic_name(name), _registry(TopicRegistry::instance()) {
165 std::shared_ptr<Topic> Topic::instance(
const std::string &name) {
166 auto registry = TopicRegistry::instance();
167 std::lock_guard<std::mutex> lock(registry->topic_map_mutex);
168 auto topic = registry->topic_map[name].lock();
170 topic = std::shared_ptr<Topic>(
new Topic(name));
174 std::lock_guard<std::mutex> lock(registry->topic_bag_counter_mutex);
175 sub = (registry->topic_bag_counters[topic->name()] == 0);
181 registry->topic_map[name] = topic;
186 bool Topic::isFromBag()
const {
187 std::lock_guard<std::mutex> lock(_registry->topic_bag_counter_mutex);
188 return _registry->topic_bag_counters[_topic_name] > 0;
199 static std::shared_ptr<boost::shared_mutex> topicMessageMutex() {
200 static auto instance = std::make_shared<boost::shared_mutex>();
203 void Topic::publish(
const std::shared_ptr<const Message> &message) {
204 auto mutex = topicMessageMutex();
205 boost::shared_lock<boost::shared_mutex> lock(*mutex);
207 std::unique_lock<std::mutex> lock(_message_mutex);
208 _message_instance = message;
211 if (_connections > 0) {
212 TopicManager::instance()->received();
215 NoMessageScope::NoMessageScope() {
216 _instance = topicMessageMutex();
217 topicMessageMutex()->lock();
219 NoMessageScope::~NoMessageScope() { topicMessageMutex()->unlock(); }