Windows events & pthread: WaitForMultipleObjects

Don't try to fill your user's mind with your design.

— Giles Colborne

Dans la première partie de cette série de deux articles, une implantation des évènements, basée sur pthread, a été développée. Dans le cadre de cette première implantation, les évènements ont été modélisés comme de simples objets unissant un booléen, une variable de condition et un verrou. Avec cette vue restreinte, la sémantique supportée se résumait à celle de la fonction WaitForSingleObject.

Implanter uniquement l'attente simple s'avère toutefois insuffisant dans la pratique. L'opération manquante, WaitForMultipleObjects, étant massivement utilisée par la plupart des applications développées pour Windows.

En outre, pour supporter des attentes multiples, un changement de perspective doit être opéré. L'intuition sous-jacente étant qu'une variable de condition par évènement ne suffit pas à la tâche.

Ainsi, plutôt que de contenir une variable de condition sur laquelle les fil d'exécution bloquent directement, chaque évènement contiendra une liste d'attente dans laquelle les fils d'exécution s'enregistreront.

Effectuer une attente multiple consistera simplement à enregistrer un fil d'exécution dans la liste d'attente de plusieurs évènements.

Observateur

L'observateur joue le rôle d'intermédiaire entre un évènement et un fil d'exécution effectuant une attente.

L'interface de la classe Observer reflète cette condition particulière. La méthode wake permet à un évènement de réveiller un fil d'exécution en attente alors que les autres méthodes permettent à un fil d'exécution d'enregistrer et exécuter une attente sur un évènement de manière atomique.

Une particularité de la méthode wake est de retourner un booléen indiquant si l'observateur accepte ou non le signal. Cette information est nécessaire, car un observateur peut être enregistré dans la liste d'attente de plusieurs évènements simultanément. Il est donc possible qu'un observateur ayant déjà été signalé apparaisse dans la liste d'attente d'un évènement. Sans cette valeur de retour, un évènement automatique n'aurait aucun moyen de déterminer si un observateur a consommé ou non son signal.

L'argument de la méthode wake permet quant à lui, dans le cas d'une attente multiple, de retracer l'évènement à l'origine du signal.

class Observer {
public:
    Observer() : signaled(false), signaler(NULL) {
        pthread_mutex_init(&mutex, NULL);
        pthread_cond_init(&cond, NULL);
    }

    ~Observer() {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }

    bool wake(Event* signaling_event) {
        pthread_mutex_lock(&mutex);

        if (signaled) {
            pthread_mutex_unlock(&mutex);
            return false;
        }

        signaled = true;
        signaler = signaling_event;

        pthread_mutex_unlock(&mutex);

        pthread_cond_signal(&cond);

        return true;
    }

    void lock() {
        pthread_mutex_lock(&mutex);
    }

    void unlock() {
        pthread_mutex_unlock(&mutex);
    }

    void cond_wait() {
        pthread_cond_wait(&cond, &mutex);
    }

    bool is_signaled() const {
        return signaled;
    }

    Event* signaling_event() const {
        return signaler;
    }

private:
    bool signaled;
    Event* signaler;
    pthread_cond_t cond;
    pthread_mutex_t mutex;
};

Évènement

La classe Event peut être découpée en deux parties: la logique de signalement, c'est-à-dire les méthodes signal, wake_one et wake_all; puis la logique d'attente, c'est-à-dire les méthodes wait, wait_any, enqueue_many et enqueue_one.

Lorsqu'un évènement est signalé, selon qu'il s'agisse d'un évènement automatique ou manuel, la méthode signal transmet l'appel soit à la méthode wake_all ou wake_one. La méthode wake_one retourne un booléen qui indique à l'appelant si un observateur a été effectivement réveillé. Cette valeur de retour permet à un évènement automatique de déterminer si le signal a été consommé, c'est-à-dire si un observateur a été réveillé avec succès par cet appel.

La première manière d'effectuer une attente sur un évènement consiste à appeler directement la méthode wait sur ce dernier: il s'agit alors d'une attente simple. Dans ce cas, un observateur est instancié sur la pile et ajouté à la liste d'attente de l'évènement. Une fois l'observateur signalé par l'évènement, celui-ci est tout simplement retiré de la liste d'attente avant que la méthode ne retourne le contrôle à l'appelant.

La seconde manière d'effectuer une attente sur un évènement consiste à appeler la méthode statique wait_any avec comme argument une liste d'évènements: il s'agit alors d'une attente multiple. Dans ce cas, la logique est plus complexe, mais demeure tout de même relativement compréhensible. Ainsi, tout comme lors d'une attente unique, un observateur est instancié sur la pile. Maintenant, il s'agit d'ajouter ce dernier à la liste d'attente de tous les évènements passés en argument de manière atomique. Pour ce faire, le verrou de chaque évènement doit être acquis. Comme plusieurs attentes multiples peuvent être entreprises sur des listes d'évènements qui s'intersectent, l'acquisition des verrous doit être effectuée dans un ordre globalement consistant. Pour y arriver, il est nécessaire de trier la liste d'évènements en utilisant comme critère de comparaison l'adresse de ces derniers. Une fois les évènements triés, la méthode récursive enqueue_many se charge d'insérer atomiquement l'observateur dans la liste d'attente de tous les évènements. Si un évènement est signalé avant que la méthode enqueue_many n'ait pu acquérir le verrou de tous les évènements, cette dernière relâche tous les verrous acquis en cours de route et retourne l'index de l'évènement signalé. Si, au contraire, la méthode enqueue_many parvient à acquérir le verrou de tous les évènements et à ajouter l'observateur dans la liste d'attente de chacun d'eux, la valeur retournée sera 0. Après avoir acquis le verrou de l'observateur, le verrou des évènements est relâché en ordre inverse d'acquisition. Une fois l'observateur signalé, avant de retourner le contrôle à l'appelant, ce dernier est retiré de la liste d'attente des évènements sur lesquels l'attente multiple était effectuée. Lors de cette opération, l'index de l'évènement ayant signalé l'observateur est récupéré pour ensuite être retourné à l'appelant.

class Event {
protected:
    Event(bool manual, bool signaled) :
        manual(manual), signaled(signaled) {
        pthread_mutex_init(&lock, NULL);
    }

    ~Event() {
        pthread_mutex_destroy(&lock);
    }

private:
    void wake_all() {
        typedef std::list<Observer*>::iterator iterator;
        for (iterator it = observers.begin(); it != observers.end(); ++it)
            (*it)->wake(this);

        observers.clear();
    }

    bool wake_one() {
        while (!observers.empty()) {
            Observer* observer = observers.front();
            observers.pop_front();

            if (observer->wake(this))
                return true;    // Someone got the signal.
        }

        return false;   // Nobody got the signal.
    }

    void remove_one(Observer* observer) {
        observers.remove(observer);
    }

    void enqueue_one(Observer* observer) {
        observers.push_front(observer);
    }

    static size_t enqueue_many(std::pair<Event*, size_t> events[],
                               size_t count, Observer* waiter) {
        if (count == 0)
            return 0;

        pthread_mutex_lock(&events[0].first->lock);

        if (events[0].first->signaled) {
            if (!events[0].first->manual) {
                // Auto-reset events must consume the signal.
                events[0].first->signaled = false;
            }

            pthread_mutex_unlock(&events[0].first->lock);
            return count;
        }

        int stop_count = enqueue_many(events + 1, count - 1, waiter);

        if (stop_count == 0) {
            events[0].first->enqueue_one(waiter);
        } else {
            pthread_mutex_unlock(&events[0].first->lock);
        }

        return stop_count;
    }

    // Sorting function used in wait_any.
    static bool by_address(const std::pair<Event*, size_t>& a,
                           const std::pair<Event*, size_t>& b) {
        return a.first < b.first;   //  Strict Weak Ordering.
    }

public:
    void signal() {
        pthread_mutex_lock(&lock);

        if (signaled) {
            pthread_mutex_unlock(&lock);
            return;
        }

        if (manual) {
            wake_all();
            signaled = true;
        } else if (!wake_one()) {
            signaled = true;
        }

        pthread_mutex_unlock(&lock);
    }

    void wait() {
        pthread_mutex_lock(&lock);

        if (signaled) {
            if (!manual) {
                signaled = false;   // Consume the event.
            }

            pthread_mutex_unlock(&lock);
            return;
        }

        Observer observer;

        observer.lock();

        enqueue_one(&observer);

        pthread_mutex_unlock(&lock);

        while (!observer.is_signaled())
            observer.cond_wait();

        pthread_mutex_lock(&lock);
        remove_one(&observer);
        pthread_mutex_unlock(&lock);
    }

    static size_t wait_any(Event* events[], size_t size) {

        std::vector<std::pair<Event*, size_t> > waitables;
        waitables.reserve(size);

        for (size_t i = 0; i < size; i++)
            waitables.push_back(std::make_pair(events[i], i));

        // In order to acquire the locks in a globally consistent order.
        std::sort(waitables.begin(), waitables.end(), by_address);

        Observer observer;

        size_t index = enqueue_many(&waitables[0], size, &observer);

        if (index != 0) {
            return waitables[size - index].second;
        }

        observer.lock();

        // Release the locks in reverse order.
        for (size_t i = 0; i < waitables.size(); i++)
            pthread_mutex_unlock(&waitables[size - (1 + i)].first->lock);

        while (!observer.is_signaled())
            observer.cond_wait();

        observer.unlock();

        for (size_t i = 0; i < size; i++) {
            if (events[i] != observer.signaling_event()) {
                pthread_mutex_lock(&events[i]->lock);
                events[i]->remove_one(&observer);
                pthread_mutex_unlock(&events[i]->lock);
            } else {
                index = i;
            }
        }

        return index;
    }

    bool reset() {
        pthread_mutex_lock(&lock);
        bool old = signaled;
        signaled = false;
        pthread_mutex_unlock(&lock);
        return old;
    }

private:
    bool manual;
    bool signaled;
    pthread_mutex_t lock;
    std::list<Observer*> observers;
};

De manière à rendre plus explicite l'instantiation des évènements, le constructeur de la classe Event n'est pas publique. Un client doit donc utiliser une des classes suivantes afin d'instantier un évènement, selon qu'il soit automatique ou manuel.

class AutoEvent : public Event {
public:
    AutoEvent(bool signaled = false) :
        Event(false, signaled) {}
};

class ManualEvent : public Event {
public:
    ManualEvent(bool signaled = false) :
        Event(true, signaled) {}
};

Exemple d'exécution

Afin de comprendre le fonctionnement de cette machinerie, le plus simple demeure d'en simuler une exécution.

Considérons donc un scénario comprenant trois évènements automatiques, ev1, ev2 et ev3, désarmés, préalablement triés et dont les listes d'attente respectives sont initialement vides.

Imaginons qu'un premier fil d'exécution effectue une attente multiple, c'est-à-dire un appel à Event::wait_any, sur les évènements ev1, ev2 et ev3.

L'observateur de ce fil d'exécution, w1, devant être ajouté de manière atomique à la liste d'attente de chaque évènement, l'acquisition des verrous correspondants est entreprise.

Alors que le premier fil d'exécution a acquis avec succès le verrou du premier évènement, un second fil d'exécution entame une attente multiple sur les évènements 2 et 3. L'acquisition, par ce second fil d'exécution, du verrou de l'évènement 2 bloque alors la progression du premier fil d'exécution.

Après avoir acquis le verrou du troisième évènement avec succès, le second fil d'exécution est en mesure d'enfiler son observateur, w2, dans la liste d'attente des évènements 2 et 3.

Les verrous des évènements 2 et 3 sont par la suite relâchés en ordre inverse d'acquisition, laissant la voie libre au premier fil d'exécution.

Ayant maintenant la voie libre, le premier fil d'exécution résume sa progression et acquiert le verrou du second évènement.

Toutefois, avant que le premier fil d'exécution n'ait pu acquérir le verrou du troisième évènement, ce dernier est signalé. L'observateur du second fil d'exécution, w2, consomme alors le signal.

Le premier fil d'exécution poursuit alors sa progression en acquérant le verrou de l'évènement 3.

Il importe de remarquer que, à ce moment, le second fil d'exécution cherche à acquérir, sans succès, le verrou du second évènement afin de retirer son observateur de la liste d'attente correspondante.

Alors même que le premier fil d'exécution parvient à enfiler son observateur dans les listes d'attente respectives des évènements 1, 2 et 3, l'évènement 2 est signalé.

L'observateur du second fil d'exécution étant toujours présent dans la liste d'attente, celui-ci est signalé, mais rejette le signal.

C'est finalement l'observateur du premier fil d'exécution qui, lorsque signalé, consomme le signal.

Conclusion

La machinerie présentée dans cet article permet non seulement de reproduire la sémantique de la fonction WaitForMultipleObjects pour des évènements, mais également pour tous les objets Windows pouvant être impliqués dans une opération d'attente multiple.

Plusieurs améliorations pourraient être apportées à l'implantation des classes Event et Observer. Entre autres, l'utilisation de lock guards simplifierait sensiblement certaines parties du code en plus de rendre celui-ci étanche aux exceptions. Par ailleurs, ajouter un délai d'expiration aux méthodes Event::wait et Event::wait_any comblerait un manque évident dans l'interface de la classe Event.

Le code source peut être consulté en ligne: event.cpp.

Lexique