Asynchronous Programming

Asynchronous programming is the ability to run functions an computations simultaneously while being able to communicate the results between different functions and execution paths.

Futures and Promises

A promise; represented by the class std::promise, is a object that stores a value or exception that is retrieved by a std::future object. Semantically, this means that a; generally asynchronous function has promised a value to the caller of the asynchronous function but however, does not yet have the value. The future of a promised value is obtained directly from the std::promise object. The caller of the promised value can query, wait for or extract the value from the corresponding std::future object however, these may block if the result is not yet ready. The timeout methods will return a std::future_status object which is an enum indicating if the wait timed out, if the future is deferred (lazy loading) or ready. The value of a promise is communicated via a shared memory state between the std::future and std::promise objects. This shared state cannot be shared between different threads meaning that only one std::future can be used to obtain the result of a promised value. To enforce this std::future is move-only. A shared future can be obtained by called std::future::share to create a std::shared_future which claims ownership of the shared state such that the future can be copied between different threads. Promises and futures are found in the <future> header.

#include <chrono>
#include <future>
#include <iostream>
#include <thread>
#include <utility>

using namespace std::literals;

auto job = [](std::promise<int>&& p, auto a, auto b)
{
    std::this_thread::sleep_for(3s);
    auto r = a + b;
    p.set_value(r);  
    std::this_thread::sleep_for(3s);
};


auto main() -> int
{
    auto p = std::promise<int>{};
    auto f = p.get_future();

    auto th = std::thread(job, std::move(p), 4, 5);

    auto start = std::chrono::high_resolution_clock::now();
    std::cout << "Waiting for job...\n";
    auto r          = f.get();
    auto finish     = std::chrono::high_resolution_clock::now();
    auto duration   = std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count();

    std::cout << "Result: " << r << std::endl;
    std::cout << "Took: " << duration << " ms" << std::endl;

    th.join();

    return 0;
}
$ bpt build -t build.yaml -o build

# ...

$ ./build/futures
Waiting for job...
Result: 9
Took: 3000 ms

Example

Async

Another way to create asynchronous functions without having to deal with threads directly is to use std::async. This function will create an asynchronous section that will run according to a launch policy. The standard only defines two policies, this being std::launch::async; which will run a function a separate thread, or std::launch::deferred which will run the function lazily on the calling thread the first time the value is requested. std::async returns a std::future object that is used to query, wait for or extract the result of the asynchronous function. std::async is found in the <future> header.

#include <algorithm>
#include <chrono>
#include <concepts>
#include <execution>
#include <future>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <numeric>
#include <utility>
#include <vector>

template <typename time_t = std::chrono::microseconds>
struct measure
{
    template <typename F, typename... Args>
    static auto execution(F func, Args&&... args) 
        -> std::pair<typename time_t::rep, std::invoke_result_t<F, Args...>>
    {
        auto start = std::chrono::system_clock::now();
        auto result = std::invoke(func, std::forward<Args>(args)...);
        auto duration = std::chrono::duration_cast<time_t>(std::chrono::system_clock::now() - start);
        return std::pair<typename time_t::rep, std::invoke_result_t<F, Args...>>{ duration.count(), result };
    }
};

template<std::random_access_iterator I, std::sentinel_for<I> S, std::movable A>
auto parallel_sum(I first, S last, A init) -> A
{
    auto middle = first + ((last - first) / 2);
    
    /// Launch async sum on last half of the values 
    auto future = std::async(std::launch::async, parallel_sum<I, S, A>, middle, last, A{});
    
    /// Sum first half of the range locally.
    auto result = parallel_sum(first, middle, init);

    /// Obtain the future and sum with the result
    return result + future.get();
}

auto main() -> int
{
    auto v1 = std::vector<double>(999, 0.1);
    auto v2 = std::vector<double>(100'000'007, 0.1);

    std::cout << std::fixed << std::setprecision(5);

    auto [acc_time_v1, acc_result_v1] = measure<>::execution([](const auto& rng){ return std::accumulate(rng.begin(), rng.end(), 0.0); }, v1);
    std::cout << "std::accumulate : [v1 - 999 elements]\n";
    std::cout << "Time: " << acc_time_v1 << " us\n";
    std::cout << "Result: " << acc_result_v1 << std::endl;

    auto [reduce_time_v1, reduce_result_v1] = measure<>::execution([](const auto& rng){ return std::reduce(std::execution::par, rng.begin(), rng.end(), 0.0); }, v1);
    std::cout << "std::reduce(std::execution::par) : [v1 - 999 elements]\n";
    std::cout << "Time: " << reduce_time_v1 << " us\n";
    std::cout << "Result: " << reduce_result_v1 << std::endl;

    auto [par_time_v1, par_result_v1] = measure<>::execution([](const auto& rng){ return std::accumulate(rng.begin(), rng.end(), 0.0); }, v1);
    std::cout << "parallel_sum : [v1 - 999 elements]\n";
    std::cout << "Time: " << par_time_v1 << " us\n";
    std::cout << "Result: " << par_result_v1 << std::endl;

    std::cout << "------------------------------------------------------\n";

    auto [acc_time_v2, acc_result_v2] = measure<>::execution([](const auto& rng){ return std::accumulate(rng.begin(), rng.end(), 0.0); }, v2);
    std::cout << "std::accumulate : [v2 - 100'000'007 elements]\n";
    std::cout << "Time: " << acc_time_v2 << " us\n";
    std::cout << "Result: " << acc_result_v2 << std::endl;

    auto [reduce_time_v2, reduce_result_v2] = measure<>::execution([](const auto& rng){ return std::reduce(std::execution::par, rng.begin(), rng.end(), 0.0); }, v2);
    std::cout << "std::reduce(std::execution::par) : [v2 - 100'000'007 elements]\n";
    std::cout << "Time: " << reduce_time_v2 << " us\n";
    std::cout << "Result: " << reduce_result_v2 << std::endl;

    auto [par_time_v2, par_result_v2] = measure<>::execution([](const auto& rng){ return std::accumulate(rng.begin(), rng.end(), 0.0); }, v2);
    std::cout << "parallel_sum : [v2 - 100'000'007 elements]\n";
    std::cout << "Time: " << par_time_v2 << " us\n";
    std::cout << "Result: " << par_result_v2 << std::endl;

    return 0;
}
$ bpt build -t build.yaml -o build

# ...

$ ./build/async
std::accumulate : [v1 - 999 elements]
Time: 2 us
Result: 99.90000
std::reduce(std::execution::par) : [v1 - 999 elements]
Time: 750 us
Result: 99.90000
parallel_sum : [v1 - 999 elements]
Time: 2 us
Result: 99.90000
------------------------------------------------------
std::accumulate : [v2 - 100'000'007 elements]
Time: 178779 us
Result: 10000000.68113
std::reduce(std::execution::par) : [v2 - 100'000'007 elements]
Time: 19993 us
Result: 10000000.70009
parallel_sum : [v2 - 100'000'007 elements]
Time: 153180 us
Result: 10000000.68113

Example

Packaged Sections

The final way to create a future value is the use of std::packaged_task. This wraps a callable such that its result type is held in a shared state that can be accesses by a std::future object created from the section. std::packaged_task objects can be moved to std::thread and std::jthread objects so that the return value of a section can be acquired from a thread (which usually discards it). std::packaged_task is move-only and is found in the <future> header.

#include <chrono>
#include <future>
#include <iostream>
#include <thread>
#include <utility>

using namespace std::literals;

auto job = [](auto a, auto b)
{
    std::this_thread::sleep_for(150ms);
    auto r = a + b;
    std::this_thread::sleep_for(150ms);
    return r;
};

auto main() -> int
{
    auto pkg    = std::packaged_task<int(int, int)>{ job };
    auto f      = pkg.get_future();

    auto th = std::thread(std::move(pkg), 4, 5);

    auto start = std::chrono::high_resolution_clock::now();
    std::cout << "Waiting for job...\n";
    auto r          = f.get();
    auto finish     = std::chrono::high_resolution_clock::now();
    auto duration   = std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count();

    std::cout << "Result: " << r << std::endl;
    std::cout << "Took: " << duration << " ms" << std::endl;

    th.join();

    return 0;
}
$ bpt build -t build.yaml -o build

# ...

$ ./build/packaged_task
Waiting for job...
Result: 9
Took: 300 ms

Example

std::packaged_task