9 #include <rosbag/view.h> 16 RosBagViewBase(
const std::string &filename) { _bag.open(filename); }
21 std::vector<std::pair<std::string, std::string>> _viz_topic_list;
22 std::unordered_map<std::string, std::string> _topic_bag_to_viz;
23 std::unordered_map<std::string, std::string> _topic_viz_to_bag;
25 std::shared_ptr<const Message> loadMessage(
const rosbag::ConnectionInfo *conn,
26 const rosbag::IndexEntry &index,
27 const rosbag::Bag &bag) {
28 auto *msg = newMessageInstance(conn, index, bag);
29 boost::shared_ptr<Message> inst = msg->instantiate<
Message>();
30 auto m = std::make_shared<Message>(*inst);
31 m->time(msg->getTime());
39 for (
auto &conn : getConnections()) {
40 std::string viz_name = conn->topic;
41 if (viz_name.empty() || viz_name[0] !=
'/') {
42 viz_name =
"/" + viz_name;
44 _topic_bag_to_viz[conn->topic] = viz_name;
45 _topic_viz_to_bag[viz_name] = conn->topic;
46 _viz_topic_list.emplace_back(conn->datatype, viz_name);
50 ros::Time startTime() {
return getBeginTime(); }
51 ros::Time endTime() {
return getEndTime(); }
53 std::vector<std::pair<std::string, std::string>> topics() {
54 return _viz_topic_list;
58 findMessages(
const ros::Time &t,
59 const std::function<
void(
const std::string &,
60 const std::shared_ptr<const Message> &)>
62 for (
auto *range : ranges_) {
63 if (range->begin != range->end) {
64 auto iter = std::upper_bound(range->begin, range->end, t,
65 rosbag::IndexEntryCompare());
66 if (iter != range->begin) {
69 if (iter == range->end) {
74 _topic_bag_to_viz[range->connection_info->topic],
75 loadMessage(range->connection_info, *iter, *range->bag_query->bag));
81 bool findTimeSpan(
const std::string &viz_topic,
double t,
double *start,
83 auto &bag_topic = _topic_viz_to_bag[viz_topic];
84 for (
auto *range : ranges_) {
85 if (range->connection_info->topic != bag_topic) {
88 auto iter2 = std::upper_bound(range->begin, range->end,
89 getBeginTime() + ros::Duration(t),
90 rosbag::IndexEntryCompare());
91 if (iter2 == range->end) {
97 if (iter == range->end) {
101 *start = (iter->time - getBeginTime()).toSec();
102 *duration = (iter2->time - getBeginTime()).toSec() - *start;
110 std::multimap<ros::Time,
111 std::pair<std::multiset<rosbag::IndexEntry>::const_iterator,
112 rosbag::MessageRange *>>
118 for (
auto *range : _view->ranges_) {
119 _iterators.emplace(range->begin->time,
120 std::make_pair(range->begin, range));
124 for (
auto *range : _view->ranges_) {
125 auto iter = std::lower_bound(range->begin, range->end,
126 _view->getBeginTime() +
127 ros::Duration(time_from_start),
128 rosbag::IndexEntryCompare());
129 if (iter != range->end) {
130 _iterators.emplace(range->begin->time, std::make_pair(iter, range));
134 bool operator==(
const Iterator &other) {
135 return _iterators == other._iterators;
137 bool operator!=(
const Iterator &other) {
138 return _iterators != other._iterators;
141 auto iter = _iterators.begin();
142 auto pair = iter->second;
143 _iterators.erase(iter);
145 if (pair.first != pair.second->end) {
146 _iterators.emplace(pair.first->time, pair);
150 std::pair<std::string, std::shared_ptr<const Message>> operator*()
const {
151 auto &pair = _iterators.begin()->second;
152 return std::make_pair(
153 _view->_topic_bag_to_viz[pair.second->connection_info->topic],
154 _view->loadMessage(pair.second->connection_info, *pair.first,
155 *(pair.second->bag_query->bag)));
160 bool BagPlayer::findMessageTimeSpan(
const std::string &topic,
double time,
161 double *start,
double *duration)
const {
162 std::unique_lock<std::mutex> lock(_view_mutex);
163 return _view->findTimeSpan(topic, time, start, duration);
166 void BagPlayer::publish(
const std::string &topic,
167 const std::shared_ptr<const Message> &message) {
170 LOG_DEBUG(
"bag message topic:" 172 <<
" time:" << (message->time() - _bag_start_time).toSec());
174 _topics[topic]->publish(message);
193 void BagPlayer::readMessageSamples(
194 const std::string &topic,
double start,
double stop,
195 const std::function<
bool(
const std::shared_ptr<const Message> &)>
200 std::unique_lock<std::mutex> lock(_view_mutex);
203 while (it != it_end) {
204 std::pair<std::string, std::shared_ptr<const Message>> p;
206 std::unique_lock<std::mutex> lock(_view_mutex);
208 if ((p.second->time() - _bag_start_time).toSec() > stop) {
212 bool ok = callback(p.second);
217 std::unique_lock<std::mutex> lock(_view_mutex);
222 std::unique_lock<std::mutex> lock(_view_mutex);
227 BagPlayer::BagPlayer(
const std::string &path) : _path(path) {
228 LOG_DEBUG(
"opening bag " << path);
230 if (
auto *n = std::strrchr(path.c_str(),
'/')) {
233 _time_function = []() {
return 0.0; };
234 LOG_DEBUG(
"creating bag view");
235 _view = std::make_shared<RosBagView>(path);
236 LOG_DEBUG(
"listing topics");
237 for (
auto &topic : _view->topics()) {
238 _topic_type_name_list.push_back(topic);
239 _topic_names.push_back(topic.second);
240 _topics[topic.second] = Topic::instance(topic.second);
242 LOG_DEBUG(
"done listing topics");
243 _message_playback_scope =
244 std::make_shared<MessagePlaybackScope>(_topic_names);
245 _duration = (_view->endTime() - _view->startTime()).toSec();
246 _bag_start_time = _view->startTime();
247 LOG_DEBUG(
"rosbag opened path:" << path <<
" duration:" << _duration);
248 _thread = std::thread([
this]() {
249 LOG_DEBUG(
"bag thread started");
251 std::function<void()> action;
253 std::unique_lock<std::mutex> lock(_action_mutex);
256 LOG_DEBUG(
"exiting bag player thread");
259 if (_pending_action) {
260 LOG_DEBUG(
"new bag player action");
261 action = _pending_action;
262 _pending_action =
nullptr;
265 _action_condition.wait(lock);
268 LOG_DEBUG(
"executing bag player action");
270 LOG_DEBUG(
"bag player action finished");
272 LOG_DEBUG(
"exiting bag thread");
275 LOG_DEBUG(
"bag opened");
278 double BagPlayer::time() {
279 std::unique_lock<std::mutex> lock(_action_mutex);
280 return _time_function();
283 void BagPlayer::startAction(
const std::function<
void()> &action) {
284 std::unique_lock<std::mutex> lock(_action_mutex);
285 _pending_action = action;
286 _action_condition.notify_one();
289 void BagPlayer::stop() {
290 LOG_DEBUG(
"bag stop");
291 startAction([
this]() { changed(); });
294 void BagPlayer::play(
const std::vector<double> ¬ification_times) {
295 LOG_DEBUG(
"bag play");
296 startAction([
this, notification_times]() {
298 double time_from_start = this->time();
299 auto it_notify = notification_times.begin();
300 while (it_notify != notification_times.end() &&
301 *it_notify < time_from_start) {
304 if (time_from_start + 1e-6 >= _duration) {
305 time_from_start = 0.0;
307 auto playback_start_time = std::chrono::steady_clock::now();
308 int64_t view_start_time =
309 (_bag_start_time + ros::Duration(time_from_start)).toNSec();
311 std::unique_lock<std::mutex> lock(_action_mutex);
312 double duration = this->duration();
313 _time_function = [duration, playback_start_time, time_from_start]() {
314 return std::min(duration, std::chrono::duration<double>(
315 std::chrono::steady_clock::now() -
324 std::unique_lock<std::mutex> lock(_view_mutex);
327 while (it != it_end) {
328 std::pair<std::string, std::shared_ptr<const Message>> msg;
330 std::lock_guard<std::mutex> lock(_view_mutex);
333 int64_t message_time = msg.second->time().toNSec();
334 auto send_time = playback_start_time +
335 std::chrono::nanoseconds(message_time - view_start_time);
336 bool interrupted =
false;
340 std::unique_lock<std::mutex> lock(_action_mutex);
342 auto current_time = std::chrono::steady_clock::now();
343 if (interrupted_nolock()) {
347 if (current_time >= send_time) {
350 if (it_notify != notification_times.end()) {
351 if (current_time >= playback_start_time +
352 std::chrono::nanoseconds(
353 int64_t(*it_notify * 1000000000.0))) {
359 auto wakeup_time = send_time;
360 if (it_notify != notification_times.end()) {
362 std::min(send_time, playback_start_time +
363 std::chrono::nanoseconds(int64_t(
364 *it_notify * 1000000000.0)));
366 _action_condition.wait_until(lock, wakeup_time);
372 if (std::chrono::steady_clock::now() >= send_time) {
379 publish(msg.first, msg.second);
382 std::lock_guard<std::mutex> lock(_view_mutex);
387 std::lock_guard<std::mutex> lock(_view_mutex);
392 std::unique_lock<std::mutex> lock(_action_mutex);
393 double t = _time_function();
394 _time_function = [t]() {
return t; };
401 void BagPlayer::seek(
double time_from_start) {
402 LOG_DEBUG(
"bag seek T0+" << time_from_start);
403 startAction([
this, time_from_start]() {
405 std::unique_lock<std::mutex> lock(_action_mutex);
406 _time_function = [time_from_start]() {
return time_from_start; };
408 std::vector<std::pair<std::string, std::shared_ptr<const Message>>>
411 std::lock_guard<std::mutex> lock(_view_mutex);
412 auto seek_time = _bag_start_time + ros::Duration(time_from_start);
413 _view->findMessages(seek_time,
414 [&](
const std::string &topic,
415 const std::shared_ptr<const Message> &message) {
416 messages.emplace_back(topic, message);
419 for (
auto &m : messages) {
420 publish(m.first, m.second);
426 void BagPlayer::rewind() {
427 LOG_DEBUG(
"bag rewind");
431 BagPlayer::~BagPlayer() {
432 LOG_DEBUG(
"stopping bag player " << _path);
434 std::unique_lock<std::mutex> lock(_action_mutex);
436 _action_condition.notify_one();
439 LOG_DEBUG(
"rosbag closed " << _path);
442 std::vector<std::string> BagPlayer::listTopics(
const std::string &type_name) {
443 std::vector<std::string> ret;
444 for (
auto &p : _topic_type_name_list) {
445 if (p.first == type_name) {
446 ret.push_back(p.second);