17 października 2015

[C++11] std::future - programowanie oparte na zadaniach

W programowaniu współbieżnym preferowanym podejściem powinno być programowanie oparte na zadnich, a nie na wątkach [1]. Kilka przydatnych linków:
std::future jest obiektem, przechowującym wyniki (także wyjątki) asynchronie działających funkcji - czyta wynik ze współdzielonego stanu. Wynik można pozyskać za pomocą metody get(). W takim scenariuszu wątek główny zostanie zablokowany do chwili aż wynik będzie dostępny, lub zwróci wynik natychmiast, jeżeli asynchroniczna operacja zakończyła się wcześniej.

Do asynchronicznego wołania funkcji służy std::async. Jest to rozwiązanie najbardziej wysokopoziomowe, gdyż sam std::async dba o to by ustawić std::future w stan ready. Wywołanie oprócz ewentualnych argumentów jakie mają być przekazane do funkcji wymaga również podania policy:
  • std::launch::async - nowy wątek zostanie uruchomiony do wykonania zadania
  • std::launch::deferred - wykonanie zadania zostaje odroczone do momentu wywołania metod get() lub wait()
  • std::launch::async | std::launch::deferred - kombinacja flag, jednak zachowanie jest zależne od implementacji
Wszystkie przykłady kompilowałem w ten sam sposób, z dodatkowo włączonym sanitizerem, który pomógł wykryć kilka błędów w kodzie.
$ clang++ -std=c++14 -fsanitize=thread -lpthread -g main.cpp
Przykład obliczania ciągu Fibonacciego z zastosowaniem std::async.
#include <iostream>
#include <future>

using namespace std;

int fibonacci(int n)
{
    std::cout << "Current n: " << n << std::endl;
    if (n == 0)
        return 0;
    else if (n == 1)
        return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}

int main()
{
    std::future<int> result = std::async(std::launch::async | std::launch::deferred,
                                         fibonacci, 3);
    std::cout << "Result: " << result.get() << std::endl;

    return 0;
}
Wynik:
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Result: 2
std::packaged_task także pozawala na tworzenie obiektów std::future. Można go porównać do std::function, a więc jako wrapper do tworzenia obiektów callable. W przeciwieństwie do std::async, nie uruchamia on jednak przekazanej funkcji automatycznie. Najważniejszą właściwością std::packaged_task jest możliwość pozyskania z niego obiektu std::future, i przekazania go do innego wątku, gdzie zostanie wykonany.

Przykład:
#include <iostream>
#include <thread>
#include <future>

using namespace std;

int fibonacci(int n)
{
    std::cout << "Current n: " << n << std::endl;
    if (n == 0)
        return 0;
    else if (n == 1)
        return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}

int main()
{
    std::packaged_task<int(int)> task{fibonacci};
    std::future<int> result = task.get_future();

    std::thread t{std::move(task), 3};
    t.join();

    std::cout << "Result: " << result.get() << std::endl;

    return 0;
}
Wynik:
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Result: 2
std::promise można rozważyć jako początek kanału komunikacyjnego, na którego końcu znajduje się std::future. Do jego zadań, należy zapisanie wyniku do współdzielonego stanu. Czasami dane, które nas interesują są już dostępne (obliczone) i nie musimy czekać na zakończenie jakiegoś wątku. Za pomocą std::promise możemy w wątku głównym wytworzyć obiekt std::future (czyli końcówka kanału - dzięki metodzie get_future()), a następnie przekazać go (std::promise) do innego wątku. W wątku pobocznym, gdy interesujące nas dane zostaną obliczone wołamy metodę set_value() (początek kanału). W ten sposób obiekt std::future zostanie ustawiony w stan ready, i dane będą mogły być odczytane w wątku głównym. W porównywaniu do std::async, jest to rozwiązaniem bardziej nisko poziomowe, lecz czasami zachodzi potrzeba aby z niego skorzystać.

std::promise ma jeszcze jedno zastosowanie, można go wykorzystać jako mechanizm do sygnalizacji pomiędzy wątkami (std::promise<void> - typ obiektu jako void). Coś na kształt std::condition_variable, z tym że taka sygnalizacja może zadziać się tylko raz. Aby odblokować pracę na współdzielonych danych należy wywołać set_value(). Aby wstrzymać pracę wątku do momentu, aż dane będą dostępne, należy wywołać metodę wait().

Przykład:
#include <iostream>
#include <thread>
#include <future>

using namespace std;

struct Fibonacci {
    std::promise<int> p;
    bool isSatisfied;
    const int index;
    int partialIndex;

    Fibonacci(std::promise<int> p_, int index_)
        : p(std::move(p_))
        , isSatisfied(false)
        , index(index_)
        , partialIndex(0)
    {
    }

    int fibonacci(int n)
    {
        std::cout << "Current n: " << n << std::endl;

        int result = 0;
        if (n == 0)
            result = 0;
        else if (n == 1)
            result = 1;
        else
            result = fibonacci(n - 1) + fibonacci(n - 2);

        if (isSatisfied == false and partialIndex == n) {
            isSatisfied = true;
            std::cout << "Passing partial result" << std::endl;

            p.set_value(result);
        }
        return result;
    }

    void operator()(int partialVal_)
    {
        partialIndex = partialVal_;
        fibonacci(index);
    }
};

int main()
{
    std::promise<int> p;
    std::future<int> result = p.get_future();

    std::thread t{ Fibonacci{ std::move(p), 5 }, 3 };

    std::cout << "Result: " << result.get() << std::endl;
    t.join();

    return 0;
}
Przykładowy wynik:
Current n: 5
Current n: 4
Current n: 3
Current n: 2
Current n: 1
Current n: 0
Current n: 1
Passing partial result
Current n: 2
Current n: 1
Current n: 0
Current n: 3
Current n: 2
Result: 2Current n: 
1
Current n: 0
Current n: 1
Bibliografia:
  • [1] Scotta Meyersa: Skuteczny nowoczesny C++. APN PROMISE SA, 2015. Rozdział 7, str. 289.
  • [2] Anthony Williams: C++ Concurency in Action. USA Manning publications Co., 2012. Rozdział 7, str. 67.

Brak komentarzy:

Prześlij komentarz