C++ 多线程08:std::future

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

C++ 多线程std::future

文章目录

概念

我们前面介绍的std::thread 是C++11中提供异步创建多线程的工具只能是异步运行任务却无法获取任务执行的结果一般都是依靠全局对象全局对象在多线程下是及其不安全的为此标准库提供了std::future类模板来关联线程运行的函数和函数的返回结果这种获取结果的方式是异步的。

std::future 通常由某个 Provider 创建你可以把 Provider 想象成一个异步任务的提供者Provider 在某个线程中设置共享状态的值与该共享状态相关联的 std::future 对象调用 get通常在另外一个线程中 获取该值如果共享状态的标志不为 ready则调用 std::future::get 会阻塞当前的调用者直到 Provider 设置了共享状态的值此时共享状态的标志变为 readystd::future::get 返回异步任务的值或异常如果发生了异常。

一个有效(valid)的 std::future 对象通常由以下三种 Provider 创建并和某个共享状态相关联。Provider 可以是函数或者类他们分别是

  • std::async 函数
  • std::promise::get_futureget_future 为 promise 类的成员函数
  • std::packaged_task::get_future此时 get_future为 packaged_task 的成员函数

一个 std::future 对象只有在有效(valid)的情况下才有用(useful)由 std::future 默认构造函数创建的 future 对象不是有效的除非当前非有效的 future 对象被 move 赋值另一个有效的 future 对象。

std::future接口

template<typename ResultType>
class future
{
public:
  future() noexcept;
  future(future&&) noexcept;
  future& operator=(future&&) noexcept;
  ~future();
  
  future(future const&) = delete;
  future& operator=(future const&) = delete;


  bool valid() const noexcept;
  
  ResultType get();
  shared_future<ResultType> share();

  void wait();

  template<typename Rep,typename Period>
  future_status wait_for(
      std::chrono::duration<Rep,Period> const& relative_time);

  template<typename Clock,typename Duration>
  future_status wait_until(
      std::chrono::time_point<Clock,Duration> const& absolute_time);
};
复制代码
  1. 构造函数

    • 不带参数的默认构造函数此对象没有共享状态因此它是无效的但是可以通过移动赋值的方式将一个有效的future值赋值给它
    • 禁用拷贝构造
    • 支持移动构造。
     std::future<int> fut;           // 默认构造函数
     fut = std::async(do_some_task);   // move-赋值操作。
    复制代码
    
  2. 析构函数

    • 销毁future对象它是异常安全的。
  3. get函数

    • 当共享状态就绪时返回存储在共享状态中的值(或抛出异常)。
    • 如果共享状态尚未就绪(即提供者尚未设置其值或异常)则该函数将阻塞调用的线程直到就绪。
    • 当共享状态就绪后则该函数将取消阻塞并返回(或抛出)释放其共享状态这使得future对象不再有效因此对于每一个future共享状态该函数最多应被调用一次。
    • std::future<void>::get()(void特例化)不返回任何值但仍等待共享状态就绪并释放它。
    • 共享状态是作为原子操作(atomic operation)被访问。
    #include <iostream>       // std::cout
    #include <future>         // std::async, std::future
    #include <utility>        // std::move
    
    int do_get_value() { return 11; }
    
    int main () {
        // 由默认构造函数创建的 std::future 对象,
        // 初始化时该 std::future 对象处于为 invalid 状态.
        std::future<int> foo, bar;
        foo = std::async(do_get_value); // move 赋值, foo 变为 valid.
        bar = std::move(foo); // move 赋值, bar 变为 valid, 而 move 赋值以后 foo 变为 invalid.
    
        if (foo.valid()) {
            std::cout << "foo's value: " << foo.get() << '\n';
        } else {
            std::cout << "foo is not valid\n";
        }
    
        if (bar.valid()) {
            std::cout << "bar's value: " << bar.get() << '\n';
        } else {
            std::cout << "bar is not valid\n";
        }
        return 0;
    }
    复制代码
    
  4. operator=

    • 禁用拷贝赋值。
    • 支持移动赋值如果在调用之前此对象是有效的(即它已经访问共享状态)则将其与先前已关联的共享状态解除关联。如果它是与先前共享状态关联的唯一对象则先前的共享状态也会被销毁。
  5. share函数

    • 获取共享的future返回一个std::shared_future对象该对象获取future对象的共享状态。调用该函数之后该 std::future对象本身已经不和任何共享状态相关联因此该std::future的状态不再是 valid 的了。
  6. valid函数

    • 检查共享状态的有效性返回当前的future对象是否与共享状态关联。一个有效的std::future对象只能通过 std::async(), std::future::get_future 或者 std::packaged_task::get_future 来初始化。另外由 std::future 默认构造函数创建的 std::future 对象是无效(invalid)的当然通过 std::future 的 move 赋值后该 std::future 对象也可以变为 valid。一旦调用了std::future::get()函数再调用此函数将返回false。
  7. wait函数

    • 等待共享状态就绪。如果共享状态尚未就绪(即提供者尚未设置其值或异常)则该函数将阻塞调用的线程直到就绪。
    • 当共享状态就绪后则该函数将取消阻塞并返回。但是wait()并不读取共享状态的值或者异常。
  8. wait_for函数

    • 等待共享状态在指定的时间内(time span)准备就绪。如果共享状态尚未就绪(即提供者尚未设置其值或异常)则该函数将阻塞调用的线程直到就绪或已达到设置的时间。

    • 此函数的返回值类型为枚举类

      future_status
      

      。此枚举类有三种label

      • ready共享状态已就绪
      • timeout在指定的时间内未就绪
      • deferred共享状态包含了一个延迟函数(deferred function)。
  9. wait_until函数

    • 等待共享状态在指定的时间点(time point)准备就绪。如果共享状态尚未就绪(即提供者尚未设置其值或异常)则该函数将阻塞调用的线程直到就绪或已达到指定的时间点。
    • 此函数的返回值类型为枚举类future_status

下面来看看详细的代码

#include <iostream>
#include <future>
#include <chrono>
#include <utility>
#include <thread>
  
int test_future_1()
{
    { // constructor/get/operator=
        auto get_value = []() { return 10; };
        std::future<int> foo; // default-constructed
        // move-constructed
        std::future<int> bar = std::async(get_value); 
        
        int x = bar.get();
        std::cout << "value: " << x << '\n'; // 10
        
        std::future<int> foo2(std::async(get_value));
        std::cout << "value: " << foo2.get() << '\n'; // 10
    }
 
    { // share
        std::future<int> fut = std::async([]() { return 10; });
        std::shared_future<int> shfut = fut.share();
        
        //std::cout << "value: " << fut.get() << '\n'; 
        // crash, 执行完fut.share()后fut对象将变得无效
        std::cout << "fut valid: " << fut.valid() << '\n';// 0
        
        // shared futures can be accessed multiple times:
        std::cout << "value: " << shfut.get() << '\n'; // 10
        // 20, 对于std::shared_future对象get函数可以被多次访问
        std::cout << "its double: " << shfut.get() * 2 << '\n'; 
    }
 
    { // valid
        std::future<int> foo, bar;
        foo = std::async([]() { return 10; });
        bar = std::move(foo);
        
        if (foo.valid()) {std::cout << "foo's value: " << foo.get() << '\n';}
        else {std::cout << "foo is not valid\n";}
        
        if (bar.valid()) {std::cout << "bar's value: " << bar.get() << '\n';}
        else {std::cout << "bar is not valid\n";}
    }
 
    { // wait
        auto is_prime = [](int x) {
            for (int i = 2; i < x; ++i) if (x%i == 0) return false;
            return true;
        };
 
        // call function asynchronously:
        std::future<bool> fut = std::async(is_prime, 194232491);
        
        std::cout << "checking...\n";
        fut.wait();
        
        std::cout << "\n194232491 ";
        // guaranteed to be ready (and not block) after wait returns
        if (fut.get()) {
        	std::cout << "is prime.\n";
        } else {
        	std::cout << "is not prime.\n";
        }
    }
 
    { // wait_for
        auto is_prime = [](int x) {
            for (int i = 2; i < x; ++i) if (x%i == 0) return false;
            return true;
        };
    
        // call function asynchronously:
        std::future<bool> fut = std::async(is_prime, 700020007);
        
        // do something while waiting for function to set future:
        std::cout << "checking, please wait";
        std::chrono::milliseconds span(100);
        // 可能多次调用std::future::wait_for函数
        while (fut.wait_for(span) == std::future_status::timeout) 
        	std::cout << '.';
        
        bool x = fut.get(); // retrieve return value
        std::cout << "\n700020007 " << (x ? "is" : "is not") << " prime.\n";
    }
 
    return 0;
}
 
int test_future_2()
{
    // future from a packaged_task
    std::packaged_task<int()> task([] { return 7; }); // wrap the function
    std::future<int> f1 = task.get_future(); // get a future
    std::thread t(std::move(task)); // launch on a thread
    
    // future from an async()
    std::future<int> f2 = std::async(std::launch::async, [] { return 8; });
    
    std::cout << "Waiting..." << std::flush;
    f1.wait();
    f2.wait();
    std::cout << "Done!\nResults are: " << f1.get()
        << ' ' << f2.get() << ' ' << '\n';
    t.join();
 
    return 0;
}
 

void initiazer(std::promise<int> * promObj)
{
    std::cout << "Inside Thread" << std::endl;
    promObj->set_value(35);
}
 
int test_future_3()
{
    std::promise<int> promiseObj;
    std::future<int> futureObj = promiseObj.get_future();
    std::thread th(initiazer, &promiseObj);
    std::cout << "value: " << futureObj.get() << std::endl;
    th.join();
    
    // If std::promise object is destroyed before setting the value 
    // the calling get() function on associated std::future object will throw exception.
    
    // A part from this, if you want your thread to return multiple values 
    // at different point of time then
    
    // just pass multiple std::promise objects in thread 
    // and fetch multiple return values from thier associated multiple std::future objects.
    
    return 0;
}
 
int main() {
    test_future_1();
    test_future_2();
    test_future_3();
}
复制代码

通过std::async()创建异步任务的std::futurestd::async的创建任务的传参方式和 std::thread 一样也可以使用类的成员方法进行创建

#include <future>
#include <iostream>

class A {
 public:
  int f(int i) { return i; }
};

int main() {
  A a;
  std::future<int> res = std::async(&A::f, &a, 1);
  std::cout << res.get();  // 1阻塞至线程返回结果
}
复制代码

需要注意std::future只能 get() 一次多次get会抛出异常

#include <future>
#include <iostream>

int main() {
  std::future<void> res = std::async([] {});
  res.get();
  try {
  	res.get();
  } catch (const std::future_error& e) {
    // exception: std::future_error: No associated state
    std::cout << e.what() << std::endl;
  }
}
复制代码

std::shared_future

std::shared_future类型模板是为了等待其他线程上的异步结果。其和std::promisestd::packaged_task类型模板还有std::async函数模板都是为异步结果准备的工具。与std::future唯一的区别就是多个std::shared_future实例可以引用同一个异步结果。

std::shared_future实例是CopyConstructible(拷贝构造)和CopyAssignable(拷贝赋值)。你也可以用ResultTypestd::future类型对象移动构造一个std::shared_future类型对象。

访问给定std::shared_future实例是非同步的。因此当有多个线程访问同一个std::shared_future实例且无任何外围同步操作时这样的访问是不安全的。不过访问关联状态时是同步的所以多个线程访问多个独立的std::shared_future实例且没有外围同步操作的时候是安全的。

shared_future的接口与future基本一致这里就不再详细介绍了

template<typename ResultType>
class shared_future
{
public:
  shared_future() noexcept;
  shared_future(future<ResultType>&&) noexcept;
  
  shared_future(shared_future&&) noexcept;
  shared_future(shared_future const&);
  shared_future& operator=(shared_future const&);
  shared_future& operator=(shared_future&&) noexcept;
  ~shared_future();

  bool valid() const noexcept;

  ResultType get() const;

  void wait() const;

  template<typename Rep,typename Period>
  future_status wait_for(
     std::chrono::duration<Rep,Period> const& relative_time) const;

  template<typename Clock,typename Duration>
  future_status wait_until(
     std::chrono::time_point<Clock,Duration> const& absolute_time)
    const;
};
复制代码

std::shared_future可以多次获取结果它可以通过std::future的右值构造。每一个std::shared_future对象上返回的结果不同步多线程访问std::shared_future需要加锁防止 race condition更好的方法是给每个线程拷贝一个 std::shared_future 对象这样就可以安全访问而无需加锁

#include <iostream>
#include <future>

int main() {
    std::promise<int> ps;
    std::future<int> ft = ps.get_future();
    std::shared_future<int> sf(std::move(ft));
    // 或直接 std::shared_future<int> sf{ps.get_future()};
    ps.set_value(1);
    int ret = sf.get();
    std::cout << "get1: "<< ret << std::endl;
    ret = sf.get(); //std::shared_future可以多次get
    std::cout << "get2: "<< ret << std::endl;
}
复制代码

输出

get1: 1
get2: 1
复制代码

也可以直接用std::future::share()生成std::shared_future

#include <iostream>
#include <future>

int main() {
  std::promise<int> ps;
  auto sf = ps.get_future().share();
  ps.set_value(2);
  int ret = sf.get();
  std::cout << "get1: "<< ret << std::endl;
  ret = sf.get();
  std::cout << "get2: "<< ret << std::endl;
}
复制代码

输出

get1: 2
get2: 2
复制代码

实现原理猜想

其实写这个有点超出本人目前的能力范围了因为还没能力去读它的源码。但是还是想在这里猜想一下从当前的这个工作场景来其实很像很像我们之前介绍过的条件变量C++多线程condition_variable - 掘金 (juejin.cn)。我们不防猜想一下

  • 情况1可以使用条件变量实现future对象中设置一个条件变量在异步线程结束时调用notify_one()在get()函数中调用wait()这样可以实现一个简单的异步调用,缺点是需要互斥量条件变量一个仅发生一次的过程这样不免有些浪费wait()操作更是需要加锁解锁也就是说这样一个完整的过程我们需要加锁解锁各两次还需要一个notify_one()但优点也很明显,过程简单且如果等待时间较长可以把cpu让给其他工作线程全局上节省的时间随等待时间加长而变长但等待时间短的话除了完成功能就没有丝毫优势了。
  • 情况2也可以使用自旋锁实现自旋锁(spinlock)是指当一个线程在获取锁的时候如果锁已经被其它线程获取那么该线程将循环等待然后不断的判断锁是否能够被成功获取直到获取到锁才会退出循环。future对象只需要设置一个类内的原子变量当异步线程结束后改变值然后get()成员自旋等待即可这种方法优点与缺点都是很明显的优点:比起条件变量这样笨重的东西确实轻盈了不少且在等待时间较小时不存在条件变量所需要的用户态与内核态之间的转换缺点:获取锁的线程一直处于活跃状态但是并没有执行任何有效的任务因此使用这种锁会造成busy-waiting也就是CPU不断的进行检测操作而无法处理其他任务。当等待时间较长的时候cpu空转无意义的消耗当然这是自旋锁本身的弊端。

最后查阅了一些资料证实了future最终是使用了情况2-自旋锁实现但是与标准的自旋锁有一些差异具体的细节以后有能力了再详细介绍了。

参考

C++11 并发指南四( 详解三 std::future & std::shared_future) - Haippy - 博客园 (cnblogs.com)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: c++