This is the mail archive of the libstdc++@gcc.gnu.org mailing list for the libstdc++ project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

Re: [patch] [WIP] Optimize synchronization in std::future if futexes are available.


Oing libstdc++@, as required for all libstdc++ patches.

Original patch at
https://gcc.gnu.org/ml/gcc-patches/2014-11/msg02004.html

This WORK-IN-PROGRESS patch uses an atomic unsigned and futex operations
to optimize the synchronization code in std::future.  The current code
uses a mutex/condvar combination, which is both slower (e.g., due to
mutex contention, stronger ordering requirements for condvars, using an
additional condvar-internal mutex, ...) and makes std::future fairly
large.

It introduces an __atomic_futex_unsigned type, which provides basic
atomic operations (load and store) on an atomic<unsigned> and
additionally provides load_when_[not_]equal operations that do blocking
waits on the atomic -- pretty much what futexes do.  Such an
__atomic_futex_unsigned is then

... ?

(At a guess, you were going to say it would be useful for implementing
the proposed std::synchronic? :-)

There are a few bits missing in this patch:
* A fallback implementation for platforms that don't provide futexes, in
the form of a different implementation of __atomic_futex_unsigned.  A
mutex+condvar combination is what I'm aiming at; for std::future, this
would lead to similar code and sizeof(std::future) as currently.
* More documentation of how the synchronization works.  Jonathan has a
patch in flight for that, so this should get merged.

Yup.

* Integration with the on_thread_exit patch that Jonathan posted, which
uses the current, lock-based synchronization scheme and thus needs to
get adapted.

Aside: I think I'm going to apply that _at_thread_exit() patch, so we
have a complete implementation committed, even if we actually end up
replacing it with an atomic one - just so the patch isn't lost.
I'll add some comments to the code at the same time.

* Testing.

The testsuite isn't that extensive, but covers some of the tricky
cases I was trying to handle.

There are ways to optimize further I suppose, for example by letting the
__atomic_futex_unsigned take care of all current uses of call_once too.
Let me know if I should do that.  This would reduce the number of atomic
ops a little in some cases, as well as reduce space required for futures
a little.

Comments?

I like it, assuming it passes testing :-)

Inline below ...

diff --git a/libstdc++-v3/include/std/future b/libstdc++-v3/include/std/future
index 8989474..392c13f 100644
--- a/libstdc++-v3/include/std/future
+++ b/libstdc++-v3/include/std/future
@@ -291,14 +292,19 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
    {
      typedef _Ptr<_Result_base> _Ptr_type;

+      enum _Status {
+	not_ready,
+	ready

These names need to be uglified, _S_not_ready and _S_ready would be
the usual convention.

+      };
+
      _Ptr_type			_M_result;
-      mutex               	_M_mutex;
-      condition_variable  	_M_cond;
+      __atomic_futex_unsigned<>	_M_status;
      atomic_flag         	_M_retrieved;
      once_flag			_M_once;

    public:
-      _State_baseV2() noexcept : _M_result(), _M_retrieved(ATOMIC_FLAG_INIT)
+      _State_baseV2() noexcept : _M_result(), _M_retrieved(ATOMIC_FLAG_INIT),
+	  _M_status(_Status::not_ready)
	{ }
      _State_baseV2(const _State_baseV2&) = delete;
      _State_baseV2& operator=(const _State_baseV2&) = delete;
@@ -308,8 +314,9 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
      wait()
      {
	_M_complete_async();
-	unique_lock<mutex> __lock(_M_mutex);
-	_M_cond.wait(__lock, [&] { return _M_ready(); });
+	// Acquire MO makes sure this synchronizes with the thread that made
+	// the future ready.
+	_M_status.load_when_equal(_Status::ready, memory_order_acquire);

Do we need another _M_complete_async() here?

Consider Thread A calling wait_for() on a future associated with an
async thread C, and Thread B calling wait() on the same future. The
state is made ready, and both A and B wake up. B returns immediately,
but A calls _M_complete_sync() which joins thread C.  This fails the
requirement that completion of the async thread (i.e. joining C)
synchronizes with the first thread to detect the ready state.

So I think we want another call to _M_complete_sync() which will wait
until C has been joined.

The current implementation ensures this requirement is met by using
the mutex so that A and B cannot wake up at the same time, so one of
them will perform the join on C and return before the other can
proceed.

	return *_M_result;
      }

@@ -317,15 +324,23 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
        future_status
        wait_for(const chrono::duration<_Rep, _Period>& __rel)
        {
-	  unique_lock<mutex> __lock(_M_mutex);
-	  if (_M_ready())
+	  _Status _s = _M_status.load(memory_order_acquire);
+	  if (_s == _Status::ready)
	    return future_status::ready;
-	  if (_M_has_deferred())
+	  if (_M_is_deferred_future())
	    return future_status::deferred;
-	  if (_M_cond.wait_for(__lock, __rel, [&] { return _M_ready(); }))
+	  if (_M_status.load_when_equal_for(_Status::ready,
+	      memory_order_acquire, __rel))
	    {
	      // _GLIBCXX_RESOLVE_LIB_DEFECTS
	      // 2100.  timed waiting functions must also join
+	      // This call is a no-op by default except on an async future,
+	      // in which case the async thread is joined.  It's also not a
+	      // no-op for a deferred future, but such a future will never
+	      // reach this point because it returns future_status::deferred
+	      // instead of waiting for the future to become ready (see
+	      // above).  Async futures synchronize in this call, so we need
+	      // no further synchronization here.
	      _M_complete_async();
	      return future_status::ready;
	    }

This is a better comment than the one I was going to add, thanks.


@@ -354,14 +372,12 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
      void
      _M_set_result(function<_Ptr_type()> __res, bool __ignore_failure = false)
      {
-	unique_lock<mutex> __lock(_M_mutex, defer_lock);
+	bool __did_set = false;
        // all calls to this function are serialized,
        // side-effects of invoking __res only happen once
	call_once(_M_once, &_State_baseV2::_M_do_set, this,
-		  std::__addressof(__res), std::__addressof(__lock));
-	if (__lock.owns_lock())
-	  _M_cond.notify_all();
-	else if (!__ignore_failure)
+		  std::__addressof(__res), std::__addressof(__did_set));
+	if (!__did_set && !__ignore_failure)
          __throw_future_error(int(future_errc::promise_already_satisfied));
      }


I was going to add this comment above _M_set_result:

     // Provide a result to the shared state and make it ready.
     // Atomically performs:
     //   if (!_M_ready) {
     //     _M_result = __res();
     //     _M_ready = true;
     //   }

And then the new _M_set_result_aside would be just:

     // Provide a result to the shared state without making it ready.
     // Atomically performs:
     //   if (!_M_ready) {
     //     _M_result = __res();
     //   }


@@ -372,11 +388,9 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
	  {
	    error_code __ec(make_error_code(future_errc::broken_promise));
	    __res->_M_error = make_exception_ptr(future_error(__ec));
-	    {
-	      lock_guard<mutex> __lock(_M_mutex);
-	      _M_result.swap(__res);
-	    }
-	    _M_cond.notify_all();
+	    // Must not be called concurrently with set_result.
+	    _M_result.swap(__res);
+	    _M_status.store_notify_all(_Status::ready, memory_order_release);
	  }
      }


After our discussion about this _M_break_promise() function I realised
that the new thread started by the _Async_state_impl constructor is
the only thing that can ever call _M_set_result on that shared state,
because no providers are exposed to the user, so it doesn't need to
use _M_set_result() (which calls std::call_once) and can just have
similar code to _M_break_promise(), modifying _M_result directly.

Similarly, I think the _Async_state_impl destructor can call

 if (_M_thread.joinable())
   _M_thread.join()

directly instead of using _M_join() (which calls std::call_once)
because once the state is being destroyed it implies there can be no
more waiting functions being called on the state, so we don't have to
worry about races to join the thread. It will either have been joined
already (so joinable() is false) or we are dropping the last reference
to the state and need to join, and it's undefined behaviour for
another thread to call a waiting function on the future while it's
being destroyed.

I think both those optimisations are relevant even with your atomic
implementation.

@@ -466,21 +480,22 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION

    private:
      void
-      _M_do_set(function<_Ptr_type()>* __f, unique_lock<mutex>* __lock)
+      _M_do_set(function<_Ptr_type()>* __f, bool* __did_set)
      {
-        _Ptr_type __res = (*__f)(); // do not hold lock while running setter
-	__lock->lock();
+        _Ptr_type __res = (*__f)();
+        // Notify the caller that we did try to set; if we do not throw an
+        // exception, the caller will be aware that it did set (e.g., see
+        // _M_set_result).
+	*__did_set = true;
        _M_result.swap(__res);
+        _M_status.store_notify_all(_Status::ready, memory_order_release);

Doing the store+notify here means this function can't be used for the
set_value_at_thread_exit case. If this function just sets *__did_set
to say whether it happened then the caller can decide whether to make
the state ready or not.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]