17 października 2015

[C++11] std::future - programowanie oparte na zadaniach

W programowaniu współbieżnym preferowanym podejściem powinno być programowanie oparte na zadnich, a nie na wątkach [1]. Kilka przydatnych linków:
std::future jest obiektem, przechowującym wyniki (także wyjątki) asynchronie działających funkcji - czyta wynik ze współdzielonego stanu. Wynik można pozyskać za pomocą metody get(). W takim scenariuszu wątek główny zostanie zablokowany do chwili aż wynik będzie dostępny, lub zwróci wynik natychmiast, jeżeli asynchroniczna operacja zakończyła się wcześniej.

Do asynchronicznego wołania funkcji służy std::async. Jest to rozwiązanie najbardziej wysokopoziomowe, gdyż sam std::async dba o to by ustawić std::future w stan ready. Wywołanie oprócz ewentualnych argumentów jakie mają być przekazane do funkcji wymaga również podania policy:
  • std::launch::async - nowy wątek zostanie uruchomiony do wykonania zadania
  • std::launch::deferred - wykonanie zadania zostaje odroczone do momentu wywołania metod get() lub wait()
  • std::launch::async | std::launch::deferred - kombinacja flag, jednak zachowanie jest zależne od implementacji
Wszystkie przykłady kompilowałem w ten sam sposób, z dodatkowo włączonym sanitizerem, który pomógł wykryć kilka błędów w kodzie.
$ clang++ -std=c++14 -fsanitize=thread -lpthread -g main.cpp
Przykład obliczania ciągu Fibonacciego z zastosowaniem std::async.
#include <iostream>
#include <future>

using namespace std;

int fibonacci(int n)
{
    std::cout << "Current n: " << n << std::endl;
    if (n == 0)
        return 0;
    else if (n == 1)
        return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}

int main()
{
    std::future<int> result = std::async(std::launch::async | std::launch::deferred,
                                         fibonacci, 3);
    std::cout << "Result: " << result.get() << std::endl;

    return 0;
}
Wynik:
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Result: 2
std::packaged_task także pozawala na tworzenie obiektów std::future. Można go porównać do std::function, a więc jako wrapper do tworzenia obiektów callable. W przeciwieństwie do std::async, nie uruchamia on jednak przekazanej funkcji automatycznie. Najważniejszą właściwością std::packaged_task jest możliwość pozyskania z niego obiektu std::future, i przekazania go do innego wątku, gdzie zostanie wykonany.

Przykład:
#include <iostream>
#include <thread>
#include <future>

using namespace std;

int fibonacci(int n)
{
    std::cout << "Current n: " << n << std::endl;
    if (n == 0)
        return 0;
    else if (n == 1)
        return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}

int main()
{
    std::packaged_task<int(int)> task{fibonacci};
    std::future<int> result = task.get_future();

    std::thread t{std::move(task), 3};
    t.join();

    std::cout << "Result: " << result.get() << std::endl;

    return 0;
}
Wynik:
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Result: 2
std::promise można rozważyć jako początek kanału komunikacyjnego, na którego końcu znajduje się std::future. Do jego zadań, należy zapisanie wyniku do współdzielonego stanu. Czasami dane, które nas interesują są już dostępne (obliczone) i nie musimy czekać na zakończenie jakiegoś wątku. Za pomocą std::promise możemy w wątku głównym wytworzyć obiekt std::future (czyli końcówka kanału - dzięki metodzie get_future()), a następnie przekazać go (std::promise) do innego wątku. W wątku pobocznym, gdy interesujące nas dane zostaną obliczone wołamy metodę set_value() (początek kanału). W ten sposób obiekt std::future zostanie ustawiony w stan ready, i dane będą mogły być odczytane w wątku głównym. W porównywaniu do std::async, jest to rozwiązaniem bardziej nisko poziomowe, lecz czasami zachodzi potrzeba aby z niego skorzystać.

std::promise ma jeszcze jedno zastosowanie, można go wykorzystać jako mechanizm do sygnalizacji pomiędzy wątkami (std::promise<void> - typ obiektu jako void). Coś na kształt std::condition_variable, z tym że taka sygnalizacja może zadziać się tylko raz. Aby odblokować pracę na współdzielonych danych należy wywołać set_value(). Aby wstrzymać pracę wątku do momentu, aż dane będą dostępne, należy wywołać metodę wait().

Przykład:
#include <iostream>
#include <thread>
#include <future>

using namespace std;

struct Fibonacci {
    std::promise<int> p;
    bool isSatisfied;
    const int index;
    int partialIndex;

    Fibonacci(std::promise<int> p_, int index_)
        : p(std::move(p_))
        , isSatisfied(false)
        , index(index_)
        , partialIndex(0)
    {
    }

    int fibonacci(int n)
    {
        std::cout << "Current n: " << n << std::endl;

        int result = 0;
        if (n == 0)
            result = 0;
        else if (n == 1)
            result = 1;
        else
            result = fibonacci(n - 1) + fibonacci(n - 2);

        if (isSatisfied == false and partialIndex == n) {
            isSatisfied = true;
            std::cout << "Passing partial result" << std::endl;

            p.set_value(result);
        }
        return result;
    }

    void operator()(int partialVal_)
    {
        partialIndex = partialVal_;
        fibonacci(index);
    }
};

int main()
{
    std::promise<int> p;
    std::future<int> result = p.get_future();

    std::thread t{ Fibonacci{ std::move(p), 5 }, 3 };

    std::cout << "Result: " << result.get() << std::endl;
    t.join();

    return 0;
}
Przykładowy wynik:
Current n: 5
Current n: 4
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Passing partial result
Current n: 2
Current n: 1
Current n: 0
Current n: 3
Current n: 2
Result: 2Current n: 
1
Current n: 0
Current n: 1
Bibliografia:
  • [1] Scotta Meyersa: Skuteczny nowoczesny C++. APN PROMISE SA, 2015. Rozdział 7, str. 289.
  • [2] Anthony Williams: C++ Concurency in Action. USA Manning publications Co., 2012. Rozdział 7, str. 67.

2 października 2015

[C++11] std::condition_variable i współdzielenie danych między wątkami

Praca na danych wymaga mechanizmu, dzięki któremu wątki będą mogły zablokować swoje działania do czasu, aż z danych nie będzie korzystał żaden inny wątek. Najprostszym i zarazem najmniej efektywnym rozwiązaniem może być pętla i próba założenia blokady na muteks. Biblioteka standardowa na szczęście oferuje znacznie lepsze rozwiązanie w postaci std::condition_variable.

std::condition_variable wymaga do działania muteksów i std::unique_lock (ze względu na efektywność). Pozwala to na większą elastyczność, gdyż muteks może zostać zablokowany i odblokowany w dowolnym momencie (std::lock_guard wykorzystuje RAII). Klasa posiada dwa rodzaje metod: wait_* (wait, waif_for, wait_until) oraz notify_* (notify_one, notify_all). Te pierwsze blokują działanie wątku do czasu, aż zostanie spełniony warunek który został przekazany do wait_*. Może to być lambda, punkt w czasie lub odcinek czasu. Natomiast notify_* zajmuje się wybudzaniem wątków.

Ciekawostka, o której dowiedziałem się z cppreference. Przed wołaniem notify_*, należy ręczenie zawołać unlock na obiekcie std::unique_lock (choć destruktor tego obiektu i tak by to wykonał). W ten sposób unikamy sytuacji, w której wybudzamy wątek, tylko po to by ponownie go uśpić, ponieważ warunek nie został jeszcze osiągnięty.
Przykład.
#include <condition_variable>
#include <thread>
#include <mutex>
#include <list>
#include <string>
#include <iostream>

using namespace std;

std::mutex m;
std::condition_variable cv;
std::list<string> train;

void dig_coal() {
    int resource = 4;
    while(resource >= 0) {
        std::unique_lock<std::mutex> l{m};
        cv.wait_for(l, std::chrono::milliseconds{400});

        if (resource > 0) train.push_back("coal");
        else train.push_back("empty");
        resource -= 1;
        std::cout << "Added trolley, train length: " << train.size() << std::endl;

        l.unlock();
        cv.notify_one();
    }

    std::cout << "No more coal to mining." << endl;
}

void burn_coal() {
    while(true) {
        std::unique_lock<std::mutex> l{m};
        cv.wait(l, []{ return not train.empty(); });

        const string trolley = train.front();
        train.pop_front();
        std::cout << "Coal burn, train length: " << train.size() << std::endl;

        l.unlock();
        cv.notify_one();

        if(trolley == "empty") break;
        std::this_thread::sleep_for(std::chrono::milliseconds{600});
    }

    std::cout << "All coal burn." << endl;
}

int main() {
    std::thread miner{dig_coal};
    std::thread power_station{burn_coal};

    cv.notify_one();

    miner.join();
    power_station.join();

    return 0;
}
Wynik:
Added trolley, train length: 1
Coal burn, train length: 0
Added trolley, train length: 1
Added trolley, train length: 2
Coal burn, train length: 1
Added trolley, train length: 2
Added trolley, train length: 3
No more coal to mining.
Coal burn, train length: 2
Coal burn, train length: 1
Coal burn, train length: 0
All coal burn.

1 października 2015

kompilacja clang-a i libc++

Z kolejnymi wydaniami, pojawiają się delikatne różnice w procesie instalacji, warto więc śledzić poniższe linki.
Ściągnięcie źródeł + instalacja:
git clone http://llvm.org/git/llvm.git

cd llvm/tools
git clone http://llvm.org/git/clang.git

cd llvm/projects
git clone http://llvm.org/git/compiler-rt.git

cd llvm/projects
git clone http://llvm.org/git/libcxx.git
git clone http://llvm.org/git/libcxxabi.git

cd llvm/projects
git clone http://llvm.org/git/test-suite.git

mkdir llvm_build
mkdir llvm_root
cd llvm_build

cmake -G "Unix Makefiles" DCMAKE_INSTALL_PREFIX=/home/beru/llvm_root/ ../llvm
make -j2
make install
Kompilacja prostego Hello World. Dzięki temu zlepkowi argumentów jestem w stanie używać przekompilowanej przez siebie wersji biblioteki standardowej z przekompilowaną wersją clang-a jak i tą pochodzącą z repozytorium dystrybucji.
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/beru/llvm_root/lib/

~/llvm_root/bin/clang++ main.cpp -std=c++14 -stdlib=libc++ \
    -nodefaultlibs -lc++ -lc++abi -lm -lc -lgcc_s -lgcc \
    -I/home/beru/llvm_root/include/c++/v1/ \
    -L/home/beru/llvm_root/lib/