10 virtual void push(
const std::shared_ptr<const Message> &message,
11 int64_t start, int64_t end) = 0;
12 virtual void commit() {}
17 std::vector<std::weak_ptr<TimeSeriesListener>> _listeners;
18 volatile bool _stop_flag =
false;
20 bool _refresh =
false;
21 std::condition_variable _condition;
22 std::shared_ptr<Topic> _topic;
25 std::shared_ptr<const Message> _received_message;
26 std::shared_ptr<BagPlayer> _player;
27 Impl(
const Impl &) =
delete;
28 Impl &operator=(
const Impl &) =
delete;
31 void handleMessage(
const std::shared_ptr<const Message> &message);
33 std::shared_ptr<Impl> _impl = std::make_shared<Impl>();
34 std::shared_ptr<Subscriber<Message>> _subscriber;
40 const std::string &topic()
const;
41 const double duration()
const;
42 void duration(
double duration);
43 void addListener(
const std::shared_ptr<TimeSeriesListener> &listener);
44 const std::shared_ptr<Subscriber<Message>> &subscriber()
const;
47 template <
class Output>
49 mutable std::mutex _mutex;
50 std::map<int64_t, Output> _data;
56 virtual void push(
const std::shared_ptr<const Message> &message,
57 int64_t start, int64_t end)
override {
58 PROFILER(
"TimeSeriesTransformer");
59 if (_data.find(message->time().toNSec()) == _data.end()) {
61 if (transform(message, output)) {
62 std::unique_lock<std::mutex> lock(_mutex);
63 _data.emplace(message->time().toNSec(), output);
66 while (!_data.empty() && _data.begin()->first < start) {
67 _data.erase(_data.begin());
69 while (!_data.empty() && _data.rbegin()->first > end) {
70 auto it = _data.end();
75 std::vector<Output> data()
const {
76 std::vector<Output> ret;
77 std::unique_lock<std::mutex> lock(_mutex);
78 for (
auto &pair : _data) {
79 ret.push_back(pair.second);
81 return std::move(ret);
85 virtual bool transform(
const std::shared_ptr<const Message> &message,
99 virtual bool transform(
const std::shared_ptr<const Message> &message,
100 std::pair<int64_t, double> &output)
override;