Existe-t-il une queue sans locking à plusieurs producteurs et à producteur unique pour c ++?

Plus je lisais, plus je devenais confus … J’aurais trouvé banal de trouver une queue mpsc formellement correcte implémentée en c ++.

Chaque fois que je découvre un autre problème, des recherches ultérieures semblent suggérer l’existence de problèmes tels que l’ABA ou d’autres conditions de course subtiles.

Beaucoup parlent de la nécessité d’un ramassage des ordures. C’est quelque chose que je veux éviter.

Existe-t-il une implémentation open source correcte acceptée?

Vous voudrez peut-être vérifier le disrupteur; il est disponible en C ++ ici: http://lmax-exchange.github.io/disruptor/

Vous pouvez également trouver des explications sur son fonctionnement ici sur stackoverflow. En gros, il s’agit d’un tampon circulaire sans locking, optimisé pour la transmission de messages FIFO entre des threads dans des emplacements de taille fixe.

Voici deux implémentations que j’ai trouvées utiles: File d’attente annulaire multi-producteurs multi-producteurs multi-producteurs sans locking @ NatSys Lab. Blog et
Encore une autre implémentation d’une queue de tableaux circulaires sans locking @ CodeProject

NOTE: le code ci-dessous est incorrect, je le laisse comme exemple, à quel point ces choses peuvent être délicates.

Si vous n’aimez pas la complexité de la version de Google, voici quelque chose de similaire de moi – c’est beaucoup plus simple, mais je le laisse comme un exercice au lecteur pour le faire fonctionner (cela fait partie d’un projet plus grand, pas portable pour le moment) . L’idée est de conserver une mémoire tampon cirtulaire pour les données et un petit ensemble de compteurs pour identifier les emplacements pour l’écriture / écriture et la lecture / lecture. Étant donné que chaque compteur se trouve dans sa propre ligne de cache et que (normalement) chacun n’est mis à jour de manière atomique qu’une fois dans la vie d’un message, ils peuvent tous être lus sans synchronisation. Il existe un sharepoint conflit potentiel entre les threads d’écriture dans post_done : il est requirejs pour la garantie FIFO. Les compteurs (head_, wrtn_, rdng_, tail_) ont été sélectionnés pour assurer l’exactitude et la FIFO. Par conséquent, abandonner la FIFO exigerait également un changement de compteurs (et cela pourrait être difficile à faire sans sacrifier l’exactitude). Il est possible d’améliorer légèrement les performances pour les scénarios avec un seul consommateur, mais cela ne me dérange pas: vous devez l’annuler si d’autres cas d’utilisation à plusieurs lecteurs sont détectés.

Sur ma machine, la latence ressemble à ce qui suit (centile à gauche, moyenne à l’intérieur de ce centile à droite, l’unité est la microseconde, mesurée par rdtsc):

  total=1000000 samples, avg=0.24us 50%=0.214us, avg=0.093us 90%=0.23us, avg=0.151us 99%=0.322us, avg=0.159us 99.9%=15.566us, avg=0.173us 

Ces résultats s’appliquent à un seul consommateur, c’est-à-dire que le thread de travail appelle wheel.read () en boucle étroite et vérifie s’il n’est pas vide (défilement vers le bas, par exemple). Les consommateurs en attente (utilisation du processeur beaucoup plus faible) attendraient un événement (une des fonctions acquire... ), ce qui ajoute environ 1 à 2 us à la latence moyenne due au changement de contexte.

Comme il y a très peu de conflits en lecture, les consommateurs s’adaptent très bien au nombre de threads de travail, par exemple pour 3 threads sur ma machine:

  total=1500000 samples, avg=0.07us 50%=0us, avg=0us 90%=0.155us, avg=0.016us 99%=0.361us, avg=0.038us 99.9%=8.723us, avg=0.044us 

Les patchs seront les bienvenus 🙂

 // Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki // // Dissortingbuted under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #pragma once #include  #include  #include  #include  #include  #include  namespace core { namespace wheel { struct bad_size : core::exception { template explicit bad_size(const T&, size_t m) : core::exception(std::ssortingng("Slot capacity exceeded, sizeof(") + typeid(T).name() + ") = " + boost::lexical_cast(sizeof(T)) + ", capacity = " + boost::lexical_cast(m) ) {} }; // inspired by Disruptor template  class wheel : boost::noncopyable { __declspec(align(64)) struct slot_detail { // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel) // slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel) // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate template  void done(wheel* w) { if (Writing) w->post_done(sequence); else w->read_done(); } // cache line for sequence number and header long long sequence; Header header; // there is no such thing as data type with variable size, but we need it to avoid thrashing // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element. // This is well into UB territory! Using template parameter for this is not good, since it // results in this small implementation detail leaking to all possible user interfaces. __declspec(align(8)) char data[8]; }; // use this as a storage space for slot_detail, to guarantee 64 byte alignment _declspec(align(64)) struct slot_block { long long padding[8]; }; public: // wrap slot data to outside world template  class slot { template friend class wheel; slot& operator=(const slot&); // moveable but non-assignable // may only be constructed by wheel slot(slot_detail* impl, wheel
* w, size_t c) : slot_(impl) , wheel_(w) , capacity_(c) {} public: slot(slot&& s) : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_) { s.slot_ = NULL; } ~slot() { if (slot_) { slot_->done(wheel_); } } // slot accessors - use Header to store information on what type is actually stored in data bool empty() const { return !slot_; } long long sequence() const { return slot_->sequence; } Header& header() { return slot_->header; } char* data() { return slot_->data; } template T& cast() { static_assert(boost::is_pod::value, "Data type must be POD"); if (sizeof(T) > capacity_) throw bad_size(T(), capacity_); if (empty()) throw no_data(); return *((T*) data()); } private: slot_detail* slot_; wheel
* wheel_; const size_t capacity_; }; private: // dynamic size of slot, with extra capacity, expressed in 64 byte blocks static size_t sizeof_slot(size_t s) { size_t m = sizeof(slot_detail); // add capacity less 8 bytes already within sizeof(slot_detail) m += max(8, s) - 8; // round up to 64 bytes, ie alignment of slot_detail size_t r = m & ~(unsigned int)63; if (r < m) r += 64; r /= 64; return r; } // calculate actual slot capacity back from number of 64 byte blocks static size_t slot_capacity(size_t s) { return s*64 - sizeof(slot_detail) + 8; } // round up to power of 2 static size_t round_size(size_t s) { // enfore minimum size if (s <= min_size) return min_size; // find rounded value --s; size_t r = 1; while (s) { s >>= 1; r <<= 1; }; return r; } slot_detail& at(long long sequence) { // find index from sequence number and return slot at found index of the wheel return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]); } public: wheel(size_t capacity, size_t size) : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_() , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size)) { static_assert(boost::is_pod
::value, "Header type must be POD"); static_assert(sizeof(slot_block) == 64, "This was unexpected"); wheel_ = new slot_block[size_ * blocks_]; // all slots must be initialised to 0 memset(wheel_, 0, size_ * 64 * blocks_); active_ = 1; } ~wheel() { stop(); delete[] wheel_; } // all accessors needed size_t capacity() const { return capacity_; } // capacity of a single slot size_t size() const { return size_; } // number of slots available size_t queue() const { return (size_t)head_ - (size_t)tail_; } bool active() const { return active_ == 1; } // enough to call it just once, to fine tune slot capacity template void check() const { static_assert(boost::is_pod::value, "Data type must be POD"); if (sizeof(T) > capacity_) throw bad_size(T(), capacity_); } // stop the wheel - safe to execute many times size_t stop() { InterlockedExchange(&active_, 0); // must wait for current read to complete while (rdng_ != tail_) Sleep(10); return size_t(head_ - tail_); } // return first available slot for write slot post() { if (!active_) throw stopped(); // the only memory barrier on head seq. number we need, if not overflowing long long h = InterlockedIncrement64(&head_); while(h - (long long) size_ > tail_) { if (InterlockedDecrement64(&head_) == h - 1) throw overflowing(); // protection against case of race condition when we are overflowing // and two or more threads try to post and two or more messages are read, // all at the same time. If this happens we must re-try, otherwise we // could have skipped a sequence number - causing infinite wait in post_done Sleep(0); h = InterlockedIncrement64(&head_); } slot_detail& r = at(h); r.sequence = h; // wrap in writeable slot return slot(&r, this, capacity_); } // return first available slot for write, nothrow variant slot post(std::nothrow_t) { if (!active_) return slot(NULL, this, capacity_); // the only memory barrier on head seq. number we need, if not overflowing long long h = InterlockedIncrement64(&head_); while(h - (long long) size_ > tail_) { if (InterlockedDecrement64(&head_) == h - 1) return slot(NULL, this, capacity_); // must retry if race condition described above Sleep(0); h = InterlockedIncrement64(&head_); } slot_detail& r = at(h); r.sequence = h; // wrap in writeable slot return slot(&r, this, capacity_); } // read first available slot for read slot read() { slot_detail* r = NULL; // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier if (active_ && rdng_ < wrtn_) { // the only memory barrier on reading seq. number we need const long long h = InterlockedIncrement64(&rdng_); // check if this slot has been written, step back if not if (h > wrtn_) InterlockedDecrement64(&rdng_); else r = &at(h); } // wrap in readable slot return slot(r , this, capacity_); } // waiting for new post, to be used by non-polling clients void acquire() { event_.acquire(); } bool try_acquire() { return event_.try_acquire(); } bool try_acquire(unsigned long timeout) { return event_.try_acquire(timeout); } void release() {} private: void post_done(long long sequence) { const long long t = sequence - 1; // the only memory barrier on written seq. number we need while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t) Sleep(0); // this is outside of critical path for polling clients event_.set(); } void read_done() { // the only memory barrier on tail seq. number we need InterlockedIncrement64(&tail_); } // each in its own cache line // head_ - wrtn_ = no. of messages being written at this moment // rdng_ - tail_ = no. of messages being read at the moment // head_ - tail_ = no. of messages to read (including those being written and read) // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read) __declspec(align(64)) volatile long long head_; // currently writing or written __declspec(align(64)) volatile long long wrtn_; // written __declspec(align(64)) volatile long long rdng_; // currently reading or read __declspec(align(64)) volatile long long tail_; // read __declspec(align(64)) volatile long active_; // flag switched to 0 when stopped __declspec(align(64)) api::event event_; // set when new message is posted const size_t blocks_; // number of 64-byte blocks in a single slot_detail const size_t capacity_; // capacity of data() section per single slot. Initialisation depends on blocks_ const size_t size_; // number of slots available, always power of 2 slot_block* wheel_; }; }}

Voici à quoi peut ressembler le fil conducteur ouvrier consommateur:

  while (wheel.active()) { core::wheel::wheel::slot slot = wheel.read(); if (!slot.empty()) { Data& d = slot.cast(); // do work } // uncomment below for waiting consumer, saving CPU cycles // else // wheel.try_acquire(10); } 

Exemple de consommateur ajouté édité

L’implémentation la mieux adaptée dépend des propriétés souhaitées d’une queue. Devrait-il être illimité ou un borné va bien? Devrait-il être linéarisable ou des exigences moins ssortingctes seraient acceptables? Quelle est la force de FIFO dont vous avez besoin? Êtes-vous prêt à payer le coût de révision de la liste par le consommateur (il existe une implémentation très simple dans laquelle le consommateur prend la queue d’une liste à lien unique, obtenant ainsi immédiatement tous les articles proposés par les producteurs)? Devrait-il garantir qu’aucun fil n’est jamais bloqué, ou que de minuscules chances d’obtenir un fil bloqué sont correctes? Et etc.

Quelques liens utiles:
Plusieurs producteurs, un seul consommateur sont-ils possibles dans un environnement sans locking?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3

J’espère que cela pourra aider.

J’imagine que cela n’existe pas – et si c’est le cas, ce n’est pas portable ou open source.

Conceptuellement, vous essayez de contrôler deux pointeurs simultanément: le pointeur de tail et le pointeur de tail->next . Cela ne peut généralement pas être fait uniquement avec des primitives sans locking.

Vous trouverez ci-dessous la technique que j’ai utilisée pour ma bibliothèque coopérative multi-tâches / multi-threading (MACE) http://bytemaster.github.com/mace/ . Il présente l’avantage d’être sans locking, sauf lorsque la queue est vide.

 struct task { boost::function func; task* next; }; boost::mutex task_ready_mutex; boost::condition_variable task_ready; boost::atomic task_in_queue; // this can be called from any thread void thread::post_task( task* t ) { // atomically post the task to the queue. task* stale_head = task_in_queue.load(boost::memory_order_relaxed); do { t->next = stale_head; } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) ); // Because only one thread can post the 'first task', only that thread will attempt // to aquire the lock and therefore there should be no contention on this lock except // when *this thread is about to block on a wait condition. if( !stale_head ) { boost::unique_lock lock(task_ready_mutex); task_ready.notify_one(); } } // this is the consumer thread. void process_tasks() { while( !done ) { // this will atomically pop everything that has been posted so far. pending = task_in_queue.exchange(0,boost::memory_order_consume); // pending is a linked list in 'reverse post order', so process them // from tail to head if you want to maintain order. if( !pending ) { // lock scope boost::unique_lock lock(task_ready_mutex); // check one last time while holding the lock before blocking. if( !task_in_queue ) task_ready.wait( lock ); } }