4 #include "timeseries.h" 6 TimeSeriesSubscriber::Impl::Impl() {
8 _thread = std::thread([
this]() {
10 std::vector<std::shared_ptr<TimeSeriesListener>> listeners;
11 double duration = 0.0;
12 std::shared_ptr<BagPlayer> player;
14 std::unique_lock<std::mutex> lock(_mutex);
23 for (
auto it = _listeners.begin(); it < _listeners.end();) {
24 if (
auto ptr = it->lock()) {
25 listeners.push_back(ptr);
28 it = _listeners.erase(it);
33 _condition.wait(lock);
36 if (player && _topic->isFromBag()) {
37 double bag_time = player->time();
38 player->readMessageSamples(
39 _topic->name(), bag_time - duration, bag_time,
40 [&](
const std::shared_ptr<const Message> &message) {
41 double playback_time = player->time();
42 for (
auto &listener : listeners) {
43 listener->push(message,
44 player->startTime().toNSec() +
45 (playback_time - duration) * 1000000000.0,
46 player->startTime().toNSec() +
47 playback_time * 1000000000.0);
54 for (
auto &listener : listeners) {
58 GlobalEvents::instance()->redraw();
63 TimeSeriesSubscriber::Impl::~Impl() {
65 std::unique_lock<std::mutex> lock(_mutex);
67 _condition.notify_all();
72 void TimeSeriesSubscriber::Impl::handleMessage(
73 const std::shared_ptr<const Message> &message) {
74 PROFILER(
"TimeSeriesSubscriber");
75 std::unique_lock<std::mutex> lock(_mutex);
76 if (_player && _topic->isFromBag() && !_player->isPlaying()) {
78 _condition.notify_one();
81 for (
auto it = _listeners.begin(); it < _listeners.end();) {
82 if (
auto l = it->lock()) {
83 l->push(message, message->time().toNSec() - _duration * 1000000000.0,
84 message->time().toNSec());
88 it = _listeners.erase(it);
93 std::vector<std::shared_ptr<TimeSeriesListener>> listeners;
94 for (
auto it = _listeners.begin(); it < _listeners.end();) {
95 if (
auto ptr = it->lock()) {
96 listeners.push_back(ptr);
99 it = _listeners.erase(it);
103 for (
auto l : listeners) {
104 l->push(message, message->time().toNSec() - _duration * 1000000000.0,
105 message->time().toNSec());
112 TimeSeriesSubscriber::TimeSeriesSubscriber(
const std::string &topic) {
113 _impl->_topic = Topic::instance(topic);
114 auto *impl = _impl.get();
117 std::unique_lock<std::mutex> lock(impl->_mutex);
118 impl->_player = player;
120 _subscriber = std::make_shared<Subscriber<Message>>(
121 topic, _impl, [impl](
const std::shared_ptr<const Message> &message) {
122 impl->handleMessage(message);
124 LockScope()->modified.connect(_impl, [impl]() {
126 std::unique_lock<std::mutex> lock(impl->_mutex);
127 if (impl->_player != player) {
128 if (player && impl->_topic->isFromBag()) {
129 impl->_refresh =
true;
131 impl->_player = player;
136 const std::string &TimeSeriesSubscriber::topic()
const {
137 return _impl->_topic->name();
140 const double TimeSeriesSubscriber::duration()
const {
141 std::unique_lock<std::mutex> lock(_impl->_mutex);
142 return _impl->_duration;
145 void TimeSeriesSubscriber::duration(
double duration) {
146 std::unique_lock<std::mutex> lock(_impl->_mutex);
147 _impl->_duration = duration;
150 void TimeSeriesSubscriber::addListener(
151 const std::shared_ptr<TimeSeriesListener> &listener) {
152 std::unique_lock<std::mutex> lock(_impl->_mutex);
153 _impl->_listeners.emplace_back(listener);
156 const std::shared_ptr<Subscriber<Message>> &
157 TimeSeriesSubscriber::subscriber()
const {
161 bool TimeSeriesQuery::transform(
const std::shared_ptr<const Message> &message,
162 std::pair<int64_t, double> &output) {
163 PROFILER(
"TimeSeriesQuery");
165 auto value_result = _query(parser);
166 if (!value_result.isNull()) {
167 output.second = value_result.toDouble();
168 auto stamp_result = _stamp_query(parser);
169 if (stamp_result.isTime()) {
170 output.first = stamp_result.toInteger();
174 if (output.first == 0) {
175 output.first = message->time().toNSec();