TAMSVIZ
Visualization and annotation tool for ROS
timeseries.h
1 // TAMSVIZ
2 // (c) 2020 Philipp Ruppel
3 
4 #include "bagplayer.h"
5 #include "mquery.h"
6 #include "topic.h"
7 #include "workspace.h"
8 
10  virtual void push(const std::shared_ptr<const Message> &message,
11  int64_t start, int64_t end) = 0;
12  virtual void commit() {}
13 };
14 
16  struct Impl {
17  std::vector<std::weak_ptr<TimeSeriesListener>> _listeners;
18  volatile bool _stop_flag = false;
19  std::mutex _mutex;
20  bool _refresh = false;
21  std::condition_variable _condition;
22  std::shared_ptr<Topic> _topic;
23  double _duration;
24  std::thread _thread;
25  std::shared_ptr<const Message> _received_message;
26  std::shared_ptr<BagPlayer> _player;
27  Impl(const Impl &) = delete;
28  Impl &operator=(const Impl &) = delete;
29  Impl();
30  ~Impl();
31  void handleMessage(const std::shared_ptr<const Message> &message);
32  };
33  std::shared_ptr<Impl> _impl = std::make_shared<Impl>();
34  std::shared_ptr<Subscriber<Message>> _subscriber;
35 
36 public:
37  TimeSeriesSubscriber(const std::string &topic);
39  TimeSeriesSubscriber &operator=(const TimeSeriesSubscriber &) = delete;
40  const std::string &topic() const;
41  const double duration() const;
42  void duration(double duration);
43  void addListener(const std::shared_ptr<TimeSeriesListener> &listener);
44  const std::shared_ptr<Subscriber<Message>> &subscriber() const;
45 };
46 
47 template <class Output>
49  mutable std::mutex _mutex;
50  std::map<int64_t, Output> _data;
51 
52 public:
55  TimeSeriesTransformer &operator=(const TimeSeriesTransformer &) = delete;
56  virtual void push(const std::shared_ptr<const Message> &message,
57  int64_t start, int64_t end) override {
58  PROFILER("TimeSeriesTransformer");
59  if (_data.find(message->time().toNSec()) == _data.end()) {
60  Output output;
61  if (transform(message, output)) {
62  std::unique_lock<std::mutex> lock(_mutex);
63  _data.emplace(message->time().toNSec(), output);
64  }
65  }
66  while (!_data.empty() && _data.begin()->first < start) {
67  _data.erase(_data.begin());
68  }
69  while (!_data.empty() && _data.rbegin()->first > end) {
70  auto it = _data.end();
71  --it;
72  _data.erase(it);
73  }
74  }
75  std::vector<Output> data() const {
76  std::vector<Output> ret;
77  std::unique_lock<std::mutex> lock(_mutex);
78  for (auto &pair : _data) {
79  ret.push_back(pair.second);
80  }
81  return std::move(ret);
82  }
83 
84 protected:
85  virtual bool transform(const std::shared_ptr<const Message> &message,
86  Output &output) = 0;
87 };
88 
90  : public TimeSeriesTransformer<std::pair<int64_t, double>> {
91  MessageQuery _query;
92  MessageQuery _stamp_query = MessageQuery("header.stamp");
93 
94 public:
95  TimeSeriesQuery(const MessageQuery &query) : _query(query) {}
96  const MessageQuery &query() const { return _query; }
97 
98 protected:
99  virtual bool transform(const std::shared_ptr<const Message> &message,
100  std::pair<int64_t, double> &output) override;
101 };