Resurgence (PY2022)
Codebase for the Husky Robotics 2021-2022 rover Resurgence
Loading...
Searching...
No Matches
scheduler.h
1#pragma once
2
3#include "core.h"
4
5#include <chrono>
6#include <condition_variable>
7#include <functional>
8#include <loguru.hpp>
9#include <mutex>
10#include <optional>
11#include <queue>
12#include <string>
13#include <thread>
14#include <unordered_set>
15
16namespace util {
17
18namespace impl {
19
21public:
22 virtual void notify() = 0;
23};
24
25template <typename T>
26void notifyScheduler(T& scheduler);
27} // namespace impl
28
37template <typename Clock = std::chrono::steady_clock>
39public:
44
50 explicit PeriodicScheduler(const std::optional<std::string>& name = std::nullopt)
51 : name(name), newEventAdded(false), quitting(false), nextID(0),
52 thread(std::bind(&PeriodicScheduler::threadFn, this)) {}
53
54 PeriodicScheduler(const PeriodicScheduler&) = delete;
55
60 {
61 std::unique_lock lock(scheduleMutex);
62 quitting = true;
63 // wake up the thread with the cv so it can quit
64 newEventAdded = true;
65 }
66 scheduleCV.notify_one();
67 thread.join();
68 }
69
71
77 void clear() {
78 std::unique_lock lock(scheduleMutex);
79 schedule = decltype(schedule)();
80 toRemove.clear();
81 nextID = 0;
82 }
83
93 std::unique_lock lock(scheduleMutex);
94 toRemove.insert(id);
95 }
96
108 const std::function<void()>& fn) {
109 eventid_t id;
110 {
111 std::unique_lock lock(scheduleMutex);
112 id = nextID++;
113 auto now = Clock::now();
114 schedule.push({id, now + period, period, fn});
115 newEventAdded = true;
116 }
117 scheduleCV.notify_one();
118 return id;
119 }
120
121private:
122 friend void util::impl::notifyScheduler<>(PeriodicScheduler&);
123
124 void notify() override {
125 scheduleCV.notify_all();
126 }
127
131 void threadFn() {
132 if (name.has_value()) {
133 loguru::set_thread_name(name->c_str());
134 }
135 std::unique_lock lock(scheduleMutex);
136 while (!quitting) {
137 if (schedule.empty()) {
138 scheduleCV.wait(lock, [&] { return newEventAdded; });
139 newEventAdded = false;
140 } else {
141 auto now = Clock::now();
142 schedule_t event = schedule.top();
143 if (toRemove.find(event.id) != toRemove.end()) {
144 // if we should remove this id then remove the event and don't reschedule
145 schedule.pop();
146 toRemove.erase(event.id);
147 } else if (event.nextSendTime <= now) {
148 // execute the event and reschedule it
149 schedule.pop();
150 event.fn();
151 event.nextSendTime += event.period;
152 schedule.push(event);
153 } else {
154 // wait until the event should be executed
155 scheduleCV.wait_until(lock, event.nextSendTime,
156 [&] { return newEventAdded; });
157 newEventAdded = false;
158 }
159 }
160 }
161 }
162
166 struct schedule_t {
167 eventid_t id;
168 std::chrono::time_point<Clock> nextSendTime;
170 std::function<void()> fn;
171 friend bool operator>(const PeriodicScheduler::schedule_t& t1,
172 const PeriodicScheduler::schedule_t& t2) {
173 return t1.nextSendTime > t2.nextSendTime;
174 }
175 };
176
177 std::optional<std::string> name;
178 bool newEventAdded;
179 bool quitting;
180 eventid_t nextID = 0;
181 std::thread thread;
182 // we need a min priority queue
183 std::priority_queue<schedule_t, std::vector<schedule_t>, std::greater<schedule_t>>
184 schedule;
185 std::mutex scheduleMutex;
186 std::condition_variable scheduleCV;
187 std::unordered_set<eventid_t> toRemove;
188};
189
201template <typename Clock = std::chrono::steady_clock>
202class Watchdog : private impl::Notifiable {
203public:
216 const std::function<void()>& callback, bool keepCallingOnDeath = false)
217 : name(name), duration(duration), callback(callback),
218 keepCallingOnDeath(keepCallingOnDeath), fed(false), quitting(false),
219 thread(std::bind(&Watchdog::threadFn, this)) {}
220
231 Watchdog(std::chrono::milliseconds duration, const std::function<void()>& callback,
232 bool keepCallingOnDeath = false)
233 : name(std::nullopt), duration(duration), callback(callback),
234 keepCallingOnDeath(keepCallingOnDeath), fed(false), quitting(false),
235 thread(std::bind(&Watchdog::threadFn, this)) {}
236
237 Watchdog(const Watchdog&) = delete;
238
239 ~Watchdog() {
240 {
241 std::lock_guard lock(mutex);
242 quitting = true;
243 }
244 cv.notify_all();
245 if (thread.joinable()) {
246 thread.join();
247 }
248 }
249
250 Watchdog& operator=(const Watchdog&) = delete;
251
257 void feed() {
258 {
259 std::lock_guard lock(mutex);
260 fed = true;
261 }
262 cv.notify_one();
263 }
264
265private:
266 friend void util::impl::notifyScheduler<>(Watchdog&);
267
268 std::optional<std::string> name;
270 std::function<void()> callback;
271 const bool keepCallingOnDeath;
272 bool fed;
273 bool quitting;
274 std::mutex mutex;
276 std::thread thread;
277
278 void notify() override {
279 cv.notify_all();
280 }
281
282 void threadFn() {
283 if (name.has_value()) {
284 loguru::set_thread_name(name->c_str());
285 }
286 std::unique_lock lock(mutex);
287 while (!quitting) {
288 std::chrono::time_point<Clock> wakeTime = Clock::now() + duration;
289 if (cv.wait_until(lock, wakeTime, [&]() { return fed || quitting; })) {
290 if (quitting) {
291 break;
292 }
293 fed = false;
294 } else {
295 callback();
296 if (!keepCallingOnDeath) {
297 cv.wait(lock, [&]() { return fed || quitting; });
298 }
299 }
300 }
301 }
302};
303
315template <typename Clock = std::chrono::steady_clock>
316class AsyncTask : private virtual impl::Notifiable {
317public:
323 AsyncTask(const std::optional<std::string>& name = std::nullopt)
324 : name(name), running(false), quitting(false) {}
325
326 AsyncTask(const AsyncTask&) = delete;
327
328 virtual ~AsyncTask() {
329 stop();
330 if (thread.joinable()) {
331 thread.join();
332 }
333 }
334
335 AsyncTask& operator=(const AsyncTask&) = delete;
336
342 virtual void start() {
343 std::lock_guard lock(mutex);
344 if (!running) {
345 std::lock_guard threadLock(threadMutex);
346 if (thread.joinable()) {
347 thread.join();
348 }
349 running = true;
350 quitting = false;
351 thread = std::thread(&AsyncTask::run, this);
352 }
353 }
354
360 virtual void stop() {
361 bool isRunning = false;
362 {
363 std::lock_guard lock(mutex);
364 if (isRunningInternal()) {
365 quitting = true;
366 isRunning = true;
367 }
368 }
369 if (isRunning) {
370 cv.notify_one();
371 std::lock_guard threadLock(threadMutex);
372 if (thread.joinable()) {
373 thread.join();
374 }
375 }
376 }
377
383 bool isRunning() {
384 std::lock_guard lock(mutex);
385 return running;
386 }
387
388protected:
398
407 return running;
408 }
409
419 return cv.wait_until(lock, tp, [&]() { return quitting; });
420 }
421
429 template <typename Rep, typename Period>
432 return wait_until(lock, Clock::now() + dur);
433 }
434
441 return cv.wait(lock, [&]() { return quitting; });
442 }
443
447 void notify() override {
448 cv.notify_all();
449 }
450
451private:
452 std::optional<std::string> name;
453 bool running;
454 bool quitting;
456 std::thread thread;
457
458 // If acquiring both mutexes, acquire mutex first then threadMutex.
459 std::mutex mutex; // protects everything except thread
460 std::mutex threadMutex; // protects only thread
461
462 friend void util::impl::notifyScheduler<>(AsyncTask&);
463
464 void run() {
465 if (name.has_value()) {
466 loguru::set_thread_name(name->c_str());
467 }
469 // clear flags when exiting
470 RAIIHelper r([&]() { running = quitting = false; });
471 task(lock);
472 }
473};
474
483template <typename Clock = std::chrono::steady_clock>
484class PeriodicTask : public AsyncTask<Clock>, private virtual impl::Notifiable {
485public:
492 PeriodicTask(const std::chrono::milliseconds& period, const std::function<void()>& f)
493 : AsyncTask<Clock>(), period(period), f(f) {}
494
495protected:
497 auto event = scheduler.scheduleEvent(period, f);
499 scheduler.removeEvent(event);
500 }
501
502 void notify() override {
503 impl::notifyScheduler(scheduler);
505 }
506
507private:
509 std::function<void()> f;
510
511 inline static PeriodicScheduler<Clock> scheduler =
512 PeriodicScheduler<Clock>("PeriodicTask_Scheduler");
513
514 friend void util::impl::notifyScheduler<>(PeriodicTask&);
515};
516
517namespace impl {
518
530template <typename T>
531void notifyScheduler(T& scheduler) {
532 Notifiable& n = scheduler;
533 n.notify();
534}
535
536} // namespace impl
537
538} // namespace util
_GLIBCXX_END_NAMESPACE_CXX11 typedef basic_string< char > string
_Bind_helper< __is_socketlike< _Func >::value, _Func, _BoundArgs... >::type bind(_Func &&__f, _BoundArgs &&... __args)
constexpr nullopt_t nullopt
void lock(_L1 &__l1, _L2 &__l2, _L3 &... __l3)
duration< int64_t, milli > milliseconds
An abstract class that can be overridden to run long-running tasks and encapsulate task-related data.
Definition scheduler.h:316
virtual void task(std::unique_lock< std::mutex > &lock)=0
The long-running task, overridden by client code.
bool isRunningInternal()
Version of AsyncTask::isRunning() that does no synchronization.
Definition scheduler.h:406
bool wait_for(std::unique_lock< std::mutex > &lock, const std::chrono::duration< Rep, Period > &dur)
Wait for a given duration, or until the task has been stopped.
Definition scheduler.h:430
virtual void stop()
Stop the task and wait for it to finish.
Definition scheduler.h:360
bool isRunning()
Check if the task is running.
Definition scheduler.h:383
void wait_until_done(std::unique_lock< std::mutex > &lock)
Wait until the task has been stopped.
Definition scheduler.h:440
AsyncTask(const std::optional< std::string > &name=std::nullopt)
Construct a new task.
Definition scheduler.h:323
void notify() override
Not for use by client code.
Definition scheduler.h:447
bool wait_until(std::unique_lock< std::mutex > &lock, const std::chrono::time_point< Clock > &tp)
Wait until the specified time point, or until the task has been stopped.
Definition scheduler.h:417
virtual void start()
Start the task.
Definition scheduler.h:342
Uses a single thread to periodically invoke callbacks at a given frequency.
Definition scheduler.h:38
PeriodicScheduler(const std::optional< std::string > &name=std::nullopt)
Create a new PeriodicScheduler.
Definition scheduler.h:50
uint64_t eventid_t
The type of event ids.
Definition scheduler.h:43
eventid_t scheduleEvent(std::chrono::milliseconds period, const std::function< void()> &fn)
Schedule a new event to be executed periodically.
Definition scheduler.h:107
void clear()
Clears all currently scheduled recurring events.
Definition scheduler.h:77
void removeEvent(eventid_t id)
Remove an event from the schedule.
Definition scheduler.h:92
~PeriodicScheduler()
Join the thread and destruct.
Definition scheduler.h:59
Implements a task that executes a function periodically.
Definition scheduler.h:484
void task(std::unique_lock< std::mutex > &lock) override
The long-running task, overridden by client code.
Definition scheduler.h:496
PeriodicTask(const std::chrono::milliseconds &period, const std::function< void()> &f)
Construct a new periodic task.
Definition scheduler.h:492
void notify() override
Not for use by client code.
Definition scheduler.h:502
Implements a thread-safe watchdog.
Definition scheduler.h:202
void feed()
Feed the watchdog.
Definition scheduler.h:257
Watchdog(std::chrono::milliseconds duration, const std::function< void()> &callback, bool keepCallingOnDeath=false)
Construct a new Watchdog.
Definition scheduler.h:231
Watchdog(const std::string &name, std::chrono::milliseconds duration, const std::function< void()> &callback, bool keepCallingOnDeath=false)
Construct a new Watchdog.
Definition scheduler.h:215
Definition scheduler.h:20
softfloat & operator=(const softfloat &c)
::uint64_t uint64_t
A collection of utility functions and classes with common use-cases.
Definition SwerveController.cpp:145