34class OPENDHT_PUBLIC ThreadPool {
36 static ThreadPool& computation();
37 static ThreadPool& io();
40 ThreadPool(
unsigned minThreads,
unsigned maxThreads = 0);
43 void run(std::function<
void()>&& cb);
46 std::future<T> get(std::function<T()>&& cb) {
47 auto ret = std::make_shared<std::promise<T>>();
48 run([cb = std::move(cb), ret]()
mutable {
53 ret->set_exception(std::current_exception());
57 return ret->get_future();
60 std::shared_future<T> getShared(std::function<T()>&& cb) {
61 return get(std::move(cb));
64 void stop(
bool wait =
true);
70 std::condition_variable cv_ {};
71 std::queue<std::function<void()>> tasks_ {};
72 std::vector<std::unique_ptr<std::thread>> threads_;
73 unsigned readyThreads_ {0};
77 const unsigned maxThreads_;
78 std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)};
79 double threadDelayRatio_ {2};
81 void threadEnded(std::thread&);
84class OPENDHT_PUBLIC Executor :
public std::enable_shared_from_this<Executor> {
86 Executor(
ThreadPool& pool,
unsigned maxConcurrent = 1)
87 : threadPool_(pool), maxConcurrent_(maxConcurrent)
90 void run(std::function<
void()>&& task);
93 std::reference_wrapper<ThreadPool> threadPool_;
94 const unsigned maxConcurrent_ {1};
96 unsigned current_ {0};
97 std::queue<std::function<void()>> tasks_ {};
99 void run_(std::function<
void()>&& task);
103class OPENDHT_PUBLIC ExecutionContext {
106 : threadPool_(pool), state_(std::make_shared<SharedState>())
109 ~ExecutionContext() {
115 state_->destroy(
false);
118 void run(std::function<
void()>&& task) {
119 std::lock_guard<std::mutex> lock(state_->mtx);
120 if (state_->shutdown_)
return;
121 state_->pendingTasks++;
122 threadPool_.get().run([task = std::move(task), state = state_] {
130 std::condition_variable cv {};
131 unsigned pendingTasks {0};
132 unsigned ongoingTasks {0};
134 bool shutdown_ {
false};
136 std::atomic_bool destroyed {
false};
138 void destroy(
bool wait =
true) {
139 std::unique_lock<std::mutex> lock(mtx);
140 if (destroyed)
return;
142 cv.wait(lock, [
this] {
return pendingTasks == 0 && ongoingTasks == 0; });
146 cv.wait(lock, [
this] {
return ongoingTasks == 0; });
151 void run(
const std::function<
void()>& task) {
153 std::lock_guard<std::mutex> lock(mtx);
157 if (destroyed)
return;
160 std::lock_guard<std::mutex> lock(mtx);
166 std::reference_wrapper<ThreadPool> threadPool_;
167 std::shared_ptr<SharedState> state_;