Problème de cycle de vie d’asio :: io_service et thread_group

En regardant des réponses comme celle-ci , nous pouvons faire des choses comme:

boost::asio::io_service ioService; boost::thread_group threadpool; { boost::asio::io_service::work work(ioService); threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService)); threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); } threadpool.join_all(); 

Cependant, dans mon cas, je veux faire quelque chose comme:

 while (condition) { ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); ioService.post(boost::bind(...)); threadpool.join_all(); // DO SOMETHING WITH RESULTS } 

Cependant, la ligne boost::asio::io_service::work work(ioService) n’est pas à sa place et, autant que je boost::asio::io_service::work work(ioService) , je ne peux pas la recréer sans avoir à recréer chaque thread du pool.

Dans mon code, le temps système nécessaire à la création de threads semble négligeable (et de meilleures performances que le code précédent basé sur le mutex), mais existe-t-il une méthode plus propre pour le faire?

 while (condition) { //... stuff threadpool.join_all(); //... } 

Cela n’a aucun sens, car vous ne pouvez rejoindre qu’une seule fois les discussions. Une fois réunis, ils sont partis. Vous ne voulez pas démarrer de nouveaux threads tout le temps (utilisez un pool de threads + une queue¹).

Puisque vous ne voulez pas réellement arrêter les threads, vous ne voulez probablement pas détruire le travail. Si vous insistez, un shared_ptr ou un optional fonctionne bien (juste my_work.reset() it)

¹ Suggestion de mise à jour :

  • thread_pool simple avec queue de tâches: (dans le thread renforcé, exception générant “thread_resource_error: ressource temporairement indisponible” )
  • Une file d’attente basée sur io_service (utilisant work ) des files d’attente de travail c ++ avec blocage

METTRE À JOUR

Une simple extension à “SOLUTION N ° 2” permettrait d’attendre que toutes les tâches soient terminées, sans rejoindre les ouvriers ni détruire le pool:

  void drain() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::empty(phx::ref(_queue))); } 

Notez que pour un fonctionnement fiable, il est également nécessaire de signaler la variable de condition sur la queue:

  cv.notify_all(); // in order to signal drain 

CAVEATS

  1. C’est une interface invitant les conditions de concurrence (la queue peut accepter les tâches de nombreux threads, donc une fois que drain() retourné, un autre thread aurait déjà posté une nouvelle tâche).

  2. Cela indique que la queue est vide et non pas lorsque la tâche est terminée. La queue ne peut pas en savoir plus. Si vous en avez besoin, utilisez une barrière / the_work une condition dans la tâche ( the_work dans cet exemple). Le mécanisme de mise en queue / planification n’est pas pertinent ici.

DEMO

Live On Coliru

 #include  #include  #include  using namespace boost; using namespace boost::phoenix::arg_names; class thread_pool { private: mutex mx; condition_variable cv; typedef function job_t; std::deque _queue; thread_group pool; boost::atomic_bool shutdown; static void worker_thread(thread_pool& q) { while (auto job = q.dequeue()) (*job)(); } public: thread_pool() : shutdown(false) { for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i) pool.create_thread(bind(worker_thread, ref(*this))); } void enqueue(job_t job) { lock_guard lk(mx); _queue.push_back(std::move(job)); cv.notify_one(); } void drain() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::empty(phx::ref(_queue))); } optional dequeue() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue))); if (_queue.empty()) return none; auto job = std::move(_queue.front()); _queue.pop_front(); cv.notify_all(); // in order to signal drain return std::move(job); } ~thread_pool() { shutdown = true; { lock_guard lk(mx); cv.notify_all(); } pool.join_all(); } }; void the_work(int id) { std::cout << "worker " << id << " entered\n"; // no more synchronization; the pool size determines max concurrency std::cout << "worker " << id << " start work\n"; this_thread::sleep_for(chrono::milliseconds(2)); std::cout << "worker " << id << " done\n"; } int main() { thread_pool pool; // uses 1 thread per core for (auto i = 0ull; i < 20; ++i) { for (int i = 0; i < 10; ++i) pool.enqueue(bind(the_work, i)); pool.drain(); // make the queue empty, leave the threads std::cout << "Queue empty\n"; } // destructing pool joins the worker threads }