En regardant des réponses comme celle-ci , nous pouvons faire des choses comme:
boost::asio::io_service ioService; boost::thread_group threadpool; { boost::asio::io_service::work work(ioService); threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService)); threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); } threadpool.join_all();
Cependant, dans mon cas, je veux faire quelque chose comme:
while (condition) { ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); threadpool.join_all(); // DO SOMETHING WITH RESULTS }
Cependant, la ligne boost::asio::io_service::work work(ioService)
n’est pas à sa place et, autant que je boost::asio::io_service::work work(ioService)
, je ne peux pas la recréer sans avoir à recréer chaque thread du pool.
Dans mon code, le temps système nécessaire à la création de threads semble négligeable (et de meilleures performances que le code précédent basé sur le mutex), mais existe-t-il une méthode plus propre pour le faire?
while (condition) { //... stuff threadpool.join_all(); //... }
Cela n’a aucun sens, car vous ne pouvez rejoindre qu’une seule fois les discussions. Une fois réunis, ils sont partis. Vous ne voulez pas démarrer de nouveaux threads tout le temps (utilisez un pool de threads + une queue¹).
Puisque vous ne voulez pas réellement arrêter les threads, vous ne voulez probablement pas détruire le travail. Si vous insistez, un shared_ptr
ou un optional
fonctionne bien (juste my_work.reset()
it)
¹ Suggestion de mise à jour :
thread_pool
simple avec queue de tâches: (dans le thread renforcé, exception générant “thread_resource_error: ressource temporairement indisponible” ) io_service
(utilisant work
) des files d’attente de travail c ++ avec blocage METTRE À JOUR
Une simple extension à “SOLUTION N ° 2” permettrait d’attendre que toutes les tâches soient terminées, sans rejoindre les ouvriers ni détruire le pool:
void drain() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::empty(phx::ref(_queue))); }
Notez que pour un fonctionnement fiable, il est également nécessaire de signaler la variable de condition sur la queue:
cv.notify_all(); // in order to signal drain
C’est une interface invitant les conditions de concurrence (la queue peut accepter les tâches de nombreux threads, donc une fois que drain()
retourné, un autre thread aurait déjà posté une nouvelle tâche).
Cela indique que la queue est vide et non pas lorsque la tâche est terminée. La queue ne peut pas en savoir plus. Si vous en avez besoin, utilisez une barrière / the_work
une condition dans la tâche ( the_work
dans cet exemple). Le mécanisme de mise en queue / planification n’est pas pertinent ici.
Live On Coliru
#include #include #include using namespace boost; using namespace boost::phoenix::arg_names; class thread_pool { private: mutex mx; condition_variable cv; typedef function job_t; std::deque _queue; thread_group pool; boost::atomic_bool shutdown; static void worker_thread(thread_pool& q) { while (auto job = q.dequeue()) (*job)(); } public: thread_pool() : shutdown(false) { for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i) pool.create_thread(bind(worker_thread, ref(*this))); } void enqueue(job_t job) { lock_guard lk(mx); _queue.push_back(std::move(job)); cv.notify_one(); } void drain() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::empty(phx::ref(_queue))); } optional dequeue() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue))); if (_queue.empty()) return none; auto job = std::move(_queue.front()); _queue.pop_front(); cv.notify_all(); // in order to signal drain return std::move(job); } ~thread_pool() { shutdown = true; { lock_guard lk(mx); cv.notify_all(); } pool.join_all(); } }; void the_work(int id) { std::cout << "worker " << id << " entered\n"; // no more synchronization; the pool size determines max concurrency std::cout << "worker " << id << " start work\n"; this_thread::sleep_for(chrono::milliseconds(2)); std::cout << "worker " << id << " done\n"; } int main() { thread_pool pool; // uses 1 thread per core for (auto i = 0ull; i < 20; ++i) { for (int i = 0; i < 10; ++i) pool.enqueue(bind(the_work, i)); pool.drain(); // make the queue empty, leave the threads std::cout << "Queue empty\n"; } // destructing pool joins the worker threads }