TAMSVIZ
Visualization and annotation tool for ROS
All Classes Pages
bagplayer.cpp
1 // TAMSVIZ
2 // (c) 2020 Philipp Ruppel
3 
4 #include "bagplayer.h"
5 
6 #include "log.h"
7 #include "topic.h"
8 
9 #include <rosbag/view.h>
10 
11 #include <chrono>
12 
14 protected:
15  rosbag::Bag _bag;
16  RosBagViewBase(const std::string &filename) { _bag.open(filename); }
17 };
18 
19 class RosBagView : protected RosBagViewBase, protected rosbag::View {
20 
21  std::vector<std::pair<std::string, std::string>> _viz_topic_list;
22  std::unordered_map<std::string, std::string> _topic_bag_to_viz;
23  std::unordered_map<std::string, std::string> _topic_viz_to_bag;
24 
25  std::shared_ptr<const Message> loadMessage(const rosbag::ConnectionInfo *conn,
26  const rosbag::IndexEntry &index,
27  const rosbag::Bag &bag) {
28  auto *msg = newMessageInstance(conn, index, bag);
29  boost::shared_ptr<Message> inst = msg->instantiate<Message>();
30  auto m = std::make_shared<Message>(*inst);
31  m->time(msg->getTime());
32  delete msg;
33  return m;
34  }
35 
36 public:
37  RosBagView(const std::string &filename)
38  : RosBagViewBase(filename), rosbag::View(_bag) {
39  for (auto &conn : getConnections()) {
40  std::string viz_name = conn->topic;
41  if (viz_name.empty() || viz_name[0] != '/') {
42  viz_name = "/" + viz_name;
43  }
44  _topic_bag_to_viz[conn->topic] = viz_name;
45  _topic_viz_to_bag[viz_name] = conn->topic;
46  _viz_topic_list.emplace_back(conn->datatype, viz_name);
47  }
48  }
49 
50  ros::Time startTime() { return getBeginTime(); }
51  ros::Time endTime() { return getEndTime(); }
52 
53  std::vector<std::pair<std::string, std::string>> topics() {
54  return _viz_topic_list;
55  }
56 
57  void
58  findMessages(const ros::Time &t,
59  const std::function<void(const std::string &,
60  const std::shared_ptr<const Message> &)>
61  &callback) {
62  for (auto *range : ranges_) {
63  if (range->begin != range->end) {
64  auto iter = std::upper_bound(range->begin, range->end, t,
65  rosbag::IndexEntryCompare());
66  if (iter != range->begin) {
67  --iter;
68  }
69  if (iter == range->end) {
70  iter = range->end;
71  --iter;
72  }
73  callback(
74  _topic_bag_to_viz[range->connection_info->topic],
75  loadMessage(range->connection_info, *iter, *range->bag_query->bag));
76  continue;
77  }
78  }
79  }
80 
81  bool findTimeSpan(const std::string &viz_topic, double t, double *start,
82  double *duration) {
83  auto &bag_topic = _topic_viz_to_bag[viz_topic];
84  for (auto *range : ranges_) {
85  if (range->connection_info->topic != bag_topic) {
86  continue;
87  }
88  auto iter2 = std::upper_bound(range->begin, range->end,
89  getBeginTime() + ros::Duration(t),
90  rosbag::IndexEntryCompare());
91  if (iter2 == range->end) {
92  continue;
93  }
94  auto iter = iter2;
95 
96  --iter;
97  if (iter == range->end) {
98  continue;
99  }
100 
101  *start = (iter->time - getBeginTime()).toSec();
102  *duration = (iter2->time - getBeginTime()).toSec() - *start;
103  return true;
104  }
105  return false;
106  }
107 
108  class Iterator {
109  RosBagView *_view = nullptr;
110  std::multimap<ros::Time,
111  std::pair<std::multiset<rosbag::IndexEntry>::const_iterator,
112  rosbag::MessageRange *>>
113  _iterators;
114 
115  public:
116  Iterator() {}
117  Iterator(RosBagView &view) : _view(&view) {
118  for (auto *range : _view->ranges_) {
119  _iterators.emplace(range->begin->time,
120  std::make_pair(range->begin, range));
121  }
122  }
123  Iterator(RosBagView &view, double time_from_start) : _view(&view) {
124  for (auto *range : _view->ranges_) {
125  auto iter = std::lower_bound(range->begin, range->end,
126  _view->getBeginTime() +
127  ros::Duration(time_from_start),
128  rosbag::IndexEntryCompare());
129  if (iter != range->end) {
130  _iterators.emplace(range->begin->time, std::make_pair(iter, range));
131  }
132  }
133  }
134  bool operator==(const Iterator &other) {
135  return _iterators == other._iterators;
136  }
137  bool operator!=(const Iterator &other) {
138  return _iterators != other._iterators;
139  }
140  Iterator &operator++() {
141  auto iter = _iterators.begin();
142  auto pair = iter->second;
143  _iterators.erase(iter);
144  ++pair.first;
145  if (pair.first != pair.second->end) {
146  _iterators.emplace(pair.first->time, pair);
147  }
148  return *this;
149  }
150  std::pair<std::string, std::shared_ptr<const Message>> operator*() const {
151  auto &pair = _iterators.begin()->second;
152  return std::make_pair(
153  _view->_topic_bag_to_viz[pair.second->connection_info->topic],
154  _view->loadMessage(pair.second->connection_info, *pair.first,
155  *(pair.second->bag_query->bag)));
156  }
157  };
158 };
159 
160 bool BagPlayer::findMessageTimeSpan(const std::string &topic, double time,
161  double *start, double *duration) const {
162  std::unique_lock<std::mutex> lock(_view_mutex);
163  return _view->findTimeSpan(topic, time, start, duration);
164 }
165 
166 void BagPlayer::publish(const std::string &topic,
167  const std::shared_ptr<const Message> &message) {
168  if (message) {
169  if (0) {
170  LOG_DEBUG("bag message topic:"
171  << topic
172  << " time:" << (message->time() - _bag_start_time).toSec());
173  }
174  _topics[topic]->publish(message);
175  }
176 }
177 
178 /*
179 std::vector<std::shared_ptr<const Message>>
180 BagPlayer::readMessageSamples(const std::string &topic, double start,
181  double stop) {
182  std::unique_lock<std::mutex> lock(_view_mutex);
183  std::vector<std::shared_ptr<const Message>> ret;
184  readMessageSamples(topic, start, stop,
185  [&](const std::shared_ptr<const Message> &msg) {
186  ret.push_back(msg);
187  return true;
188  });
189  return ret;
190 }
191 */
192 
193 void BagPlayer::readMessageSamples(
194  const std::string &topic, double start, double stop,
195  const std::function<bool(const std::shared_ptr<const Message> &)>
196  &callback) {
198  RosBagView::Iterator it_end;
199  {
200  std::unique_lock<std::mutex> lock(_view_mutex);
201  it = RosBagView::Iterator(*_view, start);
202  }
203  while (it != it_end) {
204  std::pair<std::string, std::shared_ptr<const Message>> p;
205  {
206  std::unique_lock<std::mutex> lock(_view_mutex);
207  p = *it;
208  if ((p.second->time() - _bag_start_time).toSec() > stop) {
209  break;
210  }
211  }
212  bool ok = callback(p.second);
213  if (!ok) {
214  break;
215  }
216  {
217  std::unique_lock<std::mutex> lock(_view_mutex);
218  ++it;
219  }
220  }
221  {
222  std::unique_lock<std::mutex> lock(_view_mutex);
223  it = RosBagView::Iterator();
224  }
225 }
226 
227 BagPlayer::BagPlayer(const std::string &path) : _path(path) {
228  LOG_DEBUG("opening bag " << path);
229  _file_name = path;
230  if (auto *n = std::strrchr(path.c_str(), '/')) {
231  _file_name = n + 1;
232  }
233  _time_function = []() { return 0.0; };
234  LOG_DEBUG("creating bag view");
235  _view = std::make_shared<RosBagView>(path);
236  LOG_DEBUG("listing topics");
237  for (auto &topic : _view->topics()) {
238  _topic_type_name_list.push_back(topic);
239  _topic_names.push_back(topic.second);
240  _topics[topic.second] = Topic::instance(topic.second);
241  }
242  LOG_DEBUG("done listing topics");
243  _message_playback_scope =
244  std::make_shared<MessagePlaybackScope>(_topic_names);
245  _duration = (_view->endTime() - _view->startTime()).toSec();
246  _bag_start_time = _view->startTime();
247  LOG_DEBUG("rosbag opened path:" << path << " duration:" << _duration);
248  _thread = std::thread([this]() {
249  LOG_DEBUG("bag thread started");
250  while (true) {
251  std::function<void()> action;
252  {
253  std::unique_lock<std::mutex> lock(_action_mutex);
254  while (true) {
255  if (_exit) {
256  LOG_DEBUG("exiting bag player thread");
257  return;
258  }
259  if (_pending_action) {
260  LOG_DEBUG("new bag player action");
261  action = _pending_action;
262  _pending_action = nullptr;
263  break;
264  }
265  _action_condition.wait(lock);
266  }
267  }
268  LOG_DEBUG("executing bag player action");
269  action();
270  LOG_DEBUG("bag player action finished");
271  }
272  LOG_DEBUG("exiting bag thread");
273  });
274  rewind();
275  LOG_DEBUG("bag opened");
276 }
277 
278 double BagPlayer::time() {
279  std::unique_lock<std::mutex> lock(_action_mutex);
280  return _time_function();
281 }
282 
283 void BagPlayer::startAction(const std::function<void()> &action) {
284  std::unique_lock<std::mutex> lock(_action_mutex);
285  _pending_action = action;
286  _action_condition.notify_one();
287 }
288 
289 void BagPlayer::stop() {
290  LOG_DEBUG("bag stop");
291  startAction([this]() { changed(); });
292 }
293 
294 void BagPlayer::play(const std::vector<double> &notification_times) {
295  LOG_DEBUG("bag play");
296  startAction([this, notification_times]() {
297  _is_playing = true;
298  double time_from_start = this->time();
299  auto it_notify = notification_times.begin();
300  while (it_notify != notification_times.end() &&
301  *it_notify < time_from_start) {
302  ++it_notify;
303  }
304  if (time_from_start + 1e-6 >= _duration) {
305  time_from_start = 0.0;
306  }
307  auto playback_start_time = std::chrono::steady_clock::now();
308  int64_t view_start_time =
309  (_bag_start_time + ros::Duration(time_from_start)).toNSec();
310  {
311  std::unique_lock<std::mutex> lock(_action_mutex);
312  double duration = this->duration();
313  _time_function = [duration, playback_start_time, time_from_start]() {
314  return std::min(duration, std::chrono::duration<double>(
315  std::chrono::steady_clock::now() -
316  playback_start_time)
317  .count() +
318  time_from_start);
319  };
320  }
322  RosBagView::Iterator it_end;
323  {
324  std::unique_lock<std::mutex> lock(_view_mutex);
325  it = RosBagView::Iterator(*_view, time_from_start);
326  }
327  while (it != it_end) {
328  std::pair<std::string, std::shared_ptr<const Message>> msg;
329  {
330  std::lock_guard<std::mutex> lock(_view_mutex);
331  msg = *it;
332  }
333  int64_t message_time = msg.second->time().toNSec();
334  auto send_time = playback_start_time +
335  std::chrono::nanoseconds(message_time - view_start_time);
336  bool interrupted = false;
337  while (true) {
338  changed();
339  {
340  std::unique_lock<std::mutex> lock(_action_mutex);
341  while (true) {
342  auto current_time = std::chrono::steady_clock::now();
343  if (interrupted_nolock()) {
344  interrupted = true;
345  break;
346  }
347  if (current_time >= send_time) {
348  break;
349  }
350  if (it_notify != notification_times.end()) {
351  if (current_time >= playback_start_time +
352  std::chrono::nanoseconds(
353  int64_t(*it_notify * 1000000000.0))) {
354 
355  it_notify++;
356  break;
357  }
358  }
359  auto wakeup_time = send_time;
360  if (it_notify != notification_times.end()) {
361  wakeup_time =
362  std::min(send_time, playback_start_time +
363  std::chrono::nanoseconds(int64_t(
364  *it_notify * 1000000000.0)));
365  }
366  _action_condition.wait_until(lock, wakeup_time);
367  }
368  }
369  if (interrupted) {
370  break;
371  }
372  if (std::chrono::steady_clock::now() >= send_time) {
373  break;
374  }
375  }
376  if (interrupted) {
377  break;
378  }
379  publish(msg.first, msg.second);
380  changed();
381  {
382  std::lock_guard<std::mutex> lock(_view_mutex);
383  ++it;
384  }
385  }
386  {
387  std::lock_guard<std::mutex> lock(_view_mutex);
388  it = RosBagView::Iterator();
389  it_end = RosBagView::Iterator();
390  }
391  {
392  std::unique_lock<std::mutex> lock(_action_mutex);
393  double t = _time_function();
394  _time_function = [t]() { return t; };
395  }
396  _is_playing = false;
397  changed();
398  });
399 }
400 
401 void BagPlayer::seek(double time_from_start) {
402  LOG_DEBUG("bag seek T0+" << time_from_start);
403  startAction([this, time_from_start]() {
404  {
405  std::unique_lock<std::mutex> lock(_action_mutex);
406  _time_function = [time_from_start]() { return time_from_start; };
407  }
408  std::vector<std::pair<std::string, std::shared_ptr<const Message>>>
409  messages;
410  {
411  std::lock_guard<std::mutex> lock(_view_mutex);
412  auto seek_time = _bag_start_time + ros::Duration(time_from_start);
413  _view->findMessages(seek_time,
414  [&](const std::string &topic,
415  const std::shared_ptr<const Message> &message) {
416  messages.emplace_back(topic, message);
417  });
418  }
419  for (auto &m : messages) {
420  publish(m.first, m.second);
421  }
422  changed();
423  });
424 }
425 
426 void BagPlayer::rewind() {
427  LOG_DEBUG("bag rewind");
428  seek(0.0);
429 }
430 
431 BagPlayer::~BagPlayer() {
432  LOG_DEBUG("stopping bag player " << _path);
433  {
434  std::unique_lock<std::mutex> lock(_action_mutex);
435  _exit = true;
436  _action_condition.notify_one();
437  }
438  _thread.join();
439  LOG_DEBUG("rosbag closed " << _path);
440 }
441 
442 std::vector<std::string> BagPlayer::listTopics(const std::string &type_name) {
443  std::vector<std::string> ret;
444  for (auto &p : _topic_type_name_list) {
445  if (p.first == type_name) {
446  ret.push_back(p.second);
447  }
448  }
449  return ret;
450 }