TAMSVIZ
Visualization and annotation tool for ROS
All Classes Pages
timeseries.cpp
1 // TAMSVIZ
2 // (c) 2020 Philipp Ruppel
3 
4 #include "timeseries.h"
5 
6 TimeSeriesSubscriber::Impl::Impl() {
7  _duration = 0;
8  _thread = std::thread([this]() {
9  while (true) {
10  std::vector<std::shared_ptr<TimeSeriesListener>> listeners;
11  double duration = 0.0;
12  std::shared_ptr<BagPlayer> player;
13  {
14  std::unique_lock<std::mutex> lock(_mutex);
15  while (true) {
16  if (_stop_flag) {
17  return;
18  }
19  if (_refresh) {
20  _refresh = false;
21  duration = _duration;
22  player = _player;
23  for (auto it = _listeners.begin(); it < _listeners.end();) {
24  if (auto ptr = it->lock()) {
25  listeners.push_back(ptr);
26  ++it;
27  } else {
28  it = _listeners.erase(it);
29  }
30  }
31  break;
32  }
33  _condition.wait(lock);
34  }
35  }
36  if (player && _topic->isFromBag()) {
37  double bag_time = player->time();
38  player->readMessageSamples(
39  _topic->name(), bag_time - duration, bag_time,
40  [&](const std::shared_ptr<const Message> &message) {
41  double playback_time = player->time();
42  for (auto &listener : listeners) {
43  listener->push(message,
44  player->startTime().toNSec() +
45  (playback_time - duration) * 1000000000.0,
46  player->startTime().toNSec() +
47  playback_time * 1000000000.0);
48  }
49  return !_stop_flag;
50  });
51  if (_stop_flag) {
52  return;
53  }
54  for (auto &listener : listeners) {
55  listener->commit();
56  }
57  }
58  GlobalEvents::instance()->redraw();
59  }
60  });
61 }
62 
63 TimeSeriesSubscriber::Impl::~Impl() {
64  {
65  std::unique_lock<std::mutex> lock(_mutex);
66  _stop_flag = true;
67  _condition.notify_all();
68  }
69  _thread.join();
70 }
71 
72 void TimeSeriesSubscriber::Impl::handleMessage(
73  const std::shared_ptr<const Message> &message) {
74  PROFILER("TimeSeriesSubscriber");
75  std::unique_lock<std::mutex> lock(_mutex);
76  if (_player && _topic->isFromBag() && !_player->isPlaying()) {
77  _refresh = true;
78  _condition.notify_one();
79  } else {
80  if (0) {
81  for (auto it = _listeners.begin(); it < _listeners.end();) {
82  if (auto l = it->lock()) {
83  l->push(message, message->time().toNSec() - _duration * 1000000000.0,
84  message->time().toNSec());
85  l->commit();
86  ++it;
87  } else {
88  it = _listeners.erase(it);
89  }
90  }
91  }
92  if (1) {
93  std::vector<std::shared_ptr<TimeSeriesListener>> listeners;
94  for (auto it = _listeners.begin(); it < _listeners.end();) {
95  if (auto ptr = it->lock()) {
96  listeners.push_back(ptr);
97  ++it;
98  } else {
99  it = _listeners.erase(it);
100  }
101  }
102  lock.unlock();
103  for (auto l : listeners) {
104  l->push(message, message->time().toNSec() - _duration * 1000000000.0,
105  message->time().toNSec());
106  l->commit();
107  }
108  }
109  }
110 }
111 
112 TimeSeriesSubscriber::TimeSeriesSubscriber(const std::string &topic) {
113  _impl->_topic = Topic::instance(topic);
114  auto *impl = _impl.get();
115  {
116  auto player = LockScope()->player;
117  std::unique_lock<std::mutex> lock(impl->_mutex);
118  impl->_player = player;
119  }
120  _subscriber = std::make_shared<Subscriber<Message>>(
121  topic, _impl, [impl](const std::shared_ptr<const Message> &message) {
122  impl->handleMessage(message);
123  });
124  LockScope()->modified.connect(_impl, [impl]() {
125  auto player = LockScope()->player;
126  std::unique_lock<std::mutex> lock(impl->_mutex);
127  if (impl->_player != player) {
128  if (player && impl->_topic->isFromBag()) {
129  impl->_refresh = true;
130  }
131  impl->_player = player;
132  }
133  });
134 }
135 
136 const std::string &TimeSeriesSubscriber::topic() const {
137  return _impl->_topic->name();
138 }
139 
140 const double TimeSeriesSubscriber::duration() const {
141  std::unique_lock<std::mutex> lock(_impl->_mutex);
142  return _impl->_duration;
143 }
144 
145 void TimeSeriesSubscriber::duration(double duration) {
146  std::unique_lock<std::mutex> lock(_impl->_mutex);
147  _impl->_duration = duration;
148 }
149 
150 void TimeSeriesSubscriber::addListener(
151  const std::shared_ptr<TimeSeriesListener> &listener) {
152  std::unique_lock<std::mutex> lock(_impl->_mutex);
153  _impl->_listeners.emplace_back(listener);
154 }
155 
156 const std::shared_ptr<Subscriber<Message>> &
157 TimeSeriesSubscriber::subscriber() const {
158  return _subscriber;
159 }
160 
161 bool TimeSeriesQuery::transform(const std::shared_ptr<const Message> &message,
162  std::pair<int64_t, double> &output) {
163  PROFILER("TimeSeriesQuery");
164  MessageParser parser(message);
165  auto value_result = _query(parser);
166  if (!value_result.isNull()) {
167  output.second = value_result.toDouble();
168  auto stamp_result = _stamp_query(parser);
169  if (stamp_result.isTime()) {
170  output.first = stamp_result.toInteger();
171  } else {
172  output.first = 0;
173  }
174  if (output.first == 0) {
175  output.first = message->time().toNSec();
176  }
177  return true;
178  } else {
179  return false;
180  }
181 }