Thread pool for ensemble learning

Huang Kevin
3 min readJul 17, 2023

How to make use of C++ concurrency in machine learning task?

What is thread pool?

A thread pool is a software design pattern that manages and reuses a pool of worker threads for executing concurrent tasks. Instead of creating and destroying threads for each task, a thread pool maintains a pool of pre-initialized threads, allowing for more efficient thread management and reducing the overhead of thread creation. [1]

How thread pool works?

Class ThreadPool mainly is composed of two member functions — constructor and enqueueTask.

  • Constructor of thread pool:

It creates multiple threads, each of which monitors the taskQueue for any pending tasks and executes them.

explicit ThreadPool(size_t numThreads)
: stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back([this]() {
while (true) {
std::packaged_task<int()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
taskAvailable.wait(lock, [this]() { return stop || !taskQueue.empty(); });
if (stop && taskQueue.empty())
return;
task = std::move(taskQueue.front());
taskQueue.pop();
}
task();
}
});
}
}
  • enqueueTask:

It receives task and place it in taskQueue. In this member function, three key STL functions cooperate to put input function into thread pool— std::packaged_ task, std::future and std::condition_variable. First, function and its arguments are wrapped up into std::package_ task, before being placed to taskQueue. Once all the preparations are complete, a thread from thread pool would be awakened for execution via std::condition_variable.

template<typename F, typename... Args>
auto enqueueTask(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
std::packaged_task<decltype(f(args...))()> task(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<decltype(f(args...))> result = task.get_future();

{
std::lock_guard<std::mutex> lock(queueMutex);
taskQueue.emplace(std::move(task));
}
taskAvailable.notify_one();

return result;
}

How to use thread pool to accelerate the training and inference of model ensemble?

Basically, there are two loops to leverage thread pool, In the first loop, the train or predict method are wrapped up in lambda function which captures the necessary variables before being submitted to thread pool. The returned value from packaged task will be obtained by std::future. In the second loop, value captured by std::future is stored into the vector of integers using get() method.

void train(const std::vector<std::vector<double>>& data, const std::vector<int>& labels) {

std::vector<std::future<int>> future_results(base_models.size());
// First loop
for (int i = 0; i < base_models.size(); i++) {

future_results[i] = thread_pool.enqueueTask([this, i, data, labels]() -> int{
train_base_model(i, data, labels);
return 0;
});

}
// Second loop
std::vector<int> results(base_models.size());
for (int i = 0; i < base_models.size();i++) {
results[i] = future_results[i].get();
}

}

predict method:


double predict(const std::vector<double>& data) const {

std::vector<std::future<double>> predictions(base_models.size());

// First loop: submit tasks to the thread pool
for (size_t i = 0; i < base_models.size(); ++i) {
predictions[i] = thread_pool.enqueueTask([this, i, &data]() -> double{
return base_models[i]->predict(x);
});
}

// Second loop: capture the predictions from each base model
std::vector<double> base_predictions(base_models.size());
for (size_t i = 0; i < predictions.size(); ++i) {
base_predictions[i] = predictions[i].get();
}

return meta_model->predict(base_predictions);
}
  • To complete the same set of unit tests, the asynchronous version runs approximately three times faster than the sequential version.
// Runtime of async version
real 0m7.748s
user 0m23.693s
sys 0m0.174s
// Runtime of sequential version
real 0m21.997s
user 0m21.871s
sys 0m0.124s

Conclusion:

Nowadays, numerous AI applications have expanded to edge devices with limited computing resources. Consequently, leveraging C++ for performance enhancement has become increasingly crucial. While there are many articles discussing concurrency in Python, there are relatively few that address concurrency issues in C++, particularly in its application to machine learning. In this post, I have provided a simple example to illustrate the benefits of C++ concurrency in machine learning tasks, specifically ensemble learning. In my next post, I plan to focus on thread synchronization between data acquisition and model inference.

[1] https://en.wikipedia.org/wiki/Thread_pool

--

--

Huang Kevin

Algorithm engineer at semiconductor company with background in physics