#include <pthread.h>
#include <unistd.h>

#include <algorithm>
#include <cassert>
#include <utility>
#include <vector>
#include <list>

class Event {
private:
    class Observer {
    public:
        Observer() : signaled(false), signaler(NULL) {
            pthread_mutex_init(&mutex, NULL);
            pthread_cond_init(&cond, NULL);
        }

        ~Observer() {
            pthread_mutex_destroy(&mutex);
            pthread_cond_destroy(&cond);
        }

        // Return true if this waiter was woken by this particular call. This
        // is required since a waiter could be waiting on many events at once,
        // i.e. when wait_any is called. In this case, it's possible for an
        // already woken waiter to be present in the wait list of some event.
        bool wake(Event* signaling_event) {
            pthread_mutex_lock(&mutex);

            if (signaled) {
                pthread_mutex_unlock(&mutex);
                return false;   // This observer was already signaled by
                                // someone else, so it rejects the signal.
            }

            signaled = true;
            signaler = signaling_event;

            pthread_mutex_unlock(&mutex);

            pthread_cond_signal(&cond);

            return true;
        }

        void lock() {
            pthread_mutex_lock(&mutex);
        }

        void unlock() {
            pthread_mutex_unlock(&mutex);
        }

        void cond_wait() {
            pthread_cond_wait(&cond, &mutex);
        }

        bool is_signaled() const {
            return signaled;
        }

        Event* signaling_event() const {
            return signaler;
        }

    private:
        bool signaled;
        Event* signaler;
        pthread_cond_t cond;
        pthread_mutex_t mutex;
    };

protected:
    Event(bool manual, bool signaled) :
        manual(manual), signaled(signaled) {
        pthread_mutex_init(&lock, NULL);
    }


    ~Event() {
        pthread_mutex_destroy(&lock);
    }

private:
    void wake_all() {
        typedef std::list<Observer*>::iterator observer_iterator;
        for (observer_iterator it = observers.begin(); it != observers.end(); ++it)
        	(*it)->wake(this);

        observers.clear();
    }

    // Wake one waiter in the wait list. Return true if someone got the signal,
    // and false otherwise. This return value is required since an auto-reset
    // event need to know if someone successfully consumed the signal.
    bool wake_one() {
        while (!observers.empty()) {
            Observer* observer = observers.front();
            observers.pop_front();

            if (observer->wake(this))
                return true;    // Someone got the signal.
        }

        return false;   // Nobody got the signal.
    }

    void remove_one(Observer* observer) {
        observers.remove(observer);
    }

    void enqueue_one(Observer* observer) {
        observers.push_front(observer);
    }

    // Recursive procedure to insert an observer in the wait list of several
    // events. If one event was signaled by the time of enqueue_many was
    // called, the return value will be the event index plus one and all
    // events will be unlocked.
    //
    //                  1       2       3       4
    //              +-------+-------+-------+-------+
    //              |  ev0  |  ev1  |  ev2  |  ev3  |
    //              +-------+-------+-------+-------+
    //
    // For example, if ev0 was signaled by the time enqueue_many was called,
    // the return value would be 1. In a similar fashion, if ev2 was signaled
    // by the time enqueue_many was called, the return value would be 3.
    //
    // If no event was signaled by the time enqueue_many was called, the
    // return value will be zero and the event locks will be held.
    //
    // To avoid deadlocks, the event list should be sorted before calling this
    // function. See wait_any for the usage context.
    static size_t enqueue_many(std::pair<Event*, size_t> events[],
                               size_t count, Observer* waiter) {
    	if (count == 0)
    		return 0;

		pthread_mutex_lock(&events[0].first->lock);

		if (events[0].first->signaled) {
		    if (!events[0].first->manual) {
		        // Auto-reset events must consume the signal.
		        events[0].first->signaled = false;
		    }

	        pthread_mutex_unlock(&events[0].first->lock);
			return count;
		}

		int stop_count = enqueue_many(events + 1, count - 1, waiter);

        // If enqueue many returns an integer greater than 0, it means that
		// an event in the remaining part of the "events" list was already
		// signaled. In this case, we simply return the stop count after having
		// released the event lock, otherwise the waiter is enqueued in the
		// event waiter list.
		if (stop_count == 0) {
		    events[0].first->enqueue_one(waiter);
		} else {
		    pthread_mutex_unlock(&events[0].first->lock);
		}

    	return stop_count;
    }

    // Sorting function used in wait_any.
    static bool by_address(const std::pair<Event*, size_t>& a,
                           const std::pair<Event*, size_t>& b) {
        return a.first < b.first;   //  Strict Weak Ordering.
    }

public:
    void signal() {
        pthread_mutex_lock(&lock);

        if (signaled) {
            pthread_mutex_unlock(&lock);
            return;
        }

        // When a manual-reset event is signaled, all threads waiting on the
        // event become schedulable. When an auto-reset event is signaled,
        // one thread waiting on the event becomes schedulable. Also, an auto-
        // reset event is automatically reset to the non-signaled state when
        // a thread successfully waits on it.
        if (manual) {
            wake_all();
            signaled = true;
        } else if (!wake_one()) {
            signaled = true;
        }

        pthread_mutex_unlock(&lock);
    }

    void wait() {
        pthread_mutex_lock(&lock);

        if (signaled) {
            if (!manual) {
                signaled = false;   // Auto-reset, need to consume the event.
            }

            pthread_mutex_unlock(&lock);
            return;
        }

        Observer observer;

        // Acquire the lock on the observer object so it doesn't get signaled
        // before the cond_wait is performed, i.e. close the gap between the
        // call to enqueue_one and cond_wait.
        observer.lock();

        enqueue_one(&observer);

        // The observer is enqueued in the event wait list. Since the observer
        // lock is held, we can safely release the event lock without risking
        // of loosing any signal.
        pthread_mutex_unlock(&lock);

        while (!observer.is_signaled())
            observer.cond_wait();

        pthread_mutex_lock(&lock);
        remove_one(&observer);
        pthread_mutex_unlock(&lock);
    }

    static size_t wait_any(Event* events[], size_t size) {

        // Map each event to its current index position so we can
        // return the original event index after sorting.
        std::vector<std::pair<Event*, size_t> > waitables;
        waitables.reserve(size);

        for (size_t i = 0; i < size; i++)
            waitables.push_back(std::make_pair(events[i], i));

        // In order to acquire the locks in a globally consistent order.
        std::sort(waitables.begin(), waitables.end(), by_address);

        Observer observer;

        size_t index = enqueue_many(&waitables[0], size, &observer);

        if (index != 0) {
            // All waitables are unlocked. Waiter is not queued anywhere.
            // Simply return the signaled event index.
            return waitables[size - index].second;
        }

        // Lock is held on all waitables and the observer has been successfully
        // enqueued in each wait list. Before, releasing the waitable locks,
        // make sure to lock the observer so we don't loose any signal, i.e.
        // close the gap between calls to enqueue_many and observer::cond_wait.
        observer.lock();

        // Release the locks in reverse order.
        for (size_t i = 0; i < waitables.size(); i++)
            pthread_mutex_unlock(&waitables[size - (1 + i)].first->lock);

        while (!observer.is_signaled())
            observer.cond_wait();

        observer.unlock();

        for (size_t i = 0; i < size; i++) {
            if (events[i] != observer.signaling_event()) {
                pthread_mutex_lock(&events[i]->lock);
                events[i]->remove_one(&observer);
                pthread_mutex_unlock(&events[i]->lock);
            } else {
                index = i;
            }
        }

        return index;
    }

    bool reset() {
    	pthread_mutex_lock(&lock);
    	bool old = signaled;
    	signaled = false;
    	pthread_mutex_unlock(&lock);
    	return old;
    }

private:
    bool manual;
    bool signaled;
    pthread_mutex_t lock;
    std::list<Observer*> observers;
};

class AutoEvent : public Event {
public:
    AutoEvent(bool signaled = false) :
        Event(false, signaled) {}
};

class ManualEvent : public Event {
public:
    ManualEvent(bool signaled = false) :
        Event(true, signaled) {}
};

void test_wait_with_manual_event() {

    ManualEvent ev(false);

    ev.signal();
    ev.wait();

    // Make sure that the event was still signaled.
    assert(ev.reset() == true);
}

void test_wait_with_auto_event() {

    AutoEvent ev(false);

    ev.signal();
    ev.wait();

    // Make sure that the wait call consumed the signal,
    // i.e. that the event isn't signaled anymore.
    assert(ev.reset() == false);
}

void test_wait_any_with_manual_events() {

    ManualEvent ev0(false);
    ManualEvent ev1(false);
    ManualEvent ev2(false);

    Event* events[] = { &ev0, &ev1, &ev2 };

    // Test two things:
    //  - that the returned index is correct.
    //  - that the event was signaled after wait_any.

    ev0.signal();
    assert(Event::wait_any(events, 3) == 0);
    assert(ev0.reset() == true);

    ev1.signal();
    assert(Event::wait_any(events, 3) == 1);
    assert(ev1.reset() == true);

    ev2.signal();
    assert(Event::wait_any(events, 3) == 2);
    assert(ev2.reset() == true);
}

void test_wait_any_with_auto_events() {

    AutoEvent ev0(false);
    AutoEvent ev1(false);
    AutoEvent ev2(false);

    Event* events[] = { &ev0, &ev1, &ev2 };

    // Test two things:
    //  - that the returned index is correct.
    //  - that the event wasn't signaled after wait_any.

    ev0.signal();
    assert(Event::wait_any(events, 3) == 0);
    assert(ev0.reset() == false);

    ev1.signal();
    assert(Event::wait_any(events, 3) == 1);
    assert(ev1.reset() == false);

    ev2.signal();
    assert(Event::wait_any(events, 3) == 2);
    assert(ev2.reset() == false);
}

struct TestContext {
    TestContext(Event** events, size_t size) :
        num_event(size), events(events) {
        pthread_barrier_init(&barrier, NULL, 2);
    }

    ~TestContext() {
        pthread_barrier_destroy(&barrier);
    }

    pthread_barrier_t barrier;
    size_t num_event;
    Event** events;
};

void* wait_thread(void* arg) {

    TestContext* ctx = reinterpret_cast<TestContext*>(arg);

    for (size_t index = 0; index < ctx->num_event; index++) {

        assert(Event::wait_any(ctx->events, ctx->num_event) == index);

        ctx->events[index]->reset(); // Required for manual events.

        pthread_barrier_wait(&ctx->barrier);
    }

    return 0;
}

void test_wait_any_multithread(TestContext& ctx) {

    pthread_t thread;
    pthread_create(&thread, NULL, &wait_thread, (void*) &ctx);

    for (size_t index = 0; index < ctx.num_event; index++) {

        pthread_yield();

        ctx.events[index]->signal();

        pthread_barrier_wait(&ctx.barrier);
    }

    pthread_join(thread, NULL);
}

void test_wait_any_with_auto_events_multithread() {

    AutoEvent ev1, ev2, ev3;
    Event* events[] = { &ev2, &ev1, &ev3 };

    TestContext ctx(events, 3);

    test_wait_any_multithread(ctx);
}

void test_wait_any_with_manual_events_multithread() {

    ManualEvent ev1, ev2, ev3;
    Event* events[] = { &ev3, &ev2, &ev1 };

    TestContext ctx(events, 3);

    test_wait_any_multithread(ctx);
}

int main() {

    test_wait_with_manual_event();
    test_wait_with_auto_event();

    test_wait_any_with_manual_events();
    test_wait_any_with_auto_events();

    // These two tests aren't intended to ensure thread-safety
    // per se, but simply to exercise the basic functionalities.
    test_wait_any_with_auto_events_multithread();
    test_wait_any_with_manual_events_multithread();

    return 0;
}