00001 // -*- C++ -*- 00002 00003 // Copyright (C) 2007, 2008 Free Software Foundation, Inc. 00004 // 00005 // This file is part of the GNU ISO C++ Library. This library is free 00006 // software; you can redistribute it and/or modify it under the terms 00007 // of the GNU General Public License as published by the Free Software 00008 // Foundation; either version 2, or (at your option) any later 00009 // version. 00010 00011 // This library is distributed in the hope that it will be useful, but 00012 // WITHOUT ANY WARRANTY; without even the implied warranty of 00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00014 // General Public License for more details. 00015 00016 // You should have received a copy of the GNU General Public License 00017 // along with this library; see the file COPYING. If not, write to 00018 // the Free Software Foundation, 59 Temple Place - Suite 330, Boston, 00019 // MA 02111-1307, USA. 00020 00021 // As a special exception, you may use this file as part of a free 00022 // software library without restriction. Specifically, if other files 00023 // instantiate templates or use macros or inline functions from this 00024 // file, or you compile this file and link it with other files to 00025 // produce an executable, this file does not by itself cause the 00026 // resulting executable to be covered by the GNU General Public 00027 // License. This exception does not however invalidate any other 00028 // reasons why the executable file might be covered by the GNU General 00029 // Public License. 00030 00031 /** @file parallel/queue.h 00032 * @brief Lock-free double-ended queue. 00033 * This file is a GNU parallel extension to the Standard C++ Library. 00034 */ 00035 00036 // Written by Johannes Singler. 00037 00038 #ifndef _GLIBCXX_PARALLEL_QUEUE_H 00039 #define _GLIBCXX_PARALLEL_QUEUE_H 1 00040 00041 #include <parallel/types.h> 00042 #include <parallel/base.h> 00043 #include <parallel/compatibility.h> 00044 00045 /** @brief Decide whether to declare certain variable volatile in this file. */ 00046 #define _GLIBCXX_VOLATILE volatile 00047 00048 namespace __gnu_parallel 00049 { 00050 /**@brief Double-ended queue of bounded size, allowing lock-free 00051 * atomic access. push_front() and pop_front() must not be called 00052 * concurrently to each other, while pop_back() can be called 00053 * concurrently at all times. 00054 * @c empty(), @c size(), and @c top() are intentionally not provided. 00055 * Calling them would not make sense in a concurrent setting. 00056 * @param T Contained element type. */ 00057 template<typename T> 00058 class RestrictedBoundedConcurrentQueue 00059 { 00060 private: 00061 /** @brief Array of elements, seen as cyclic buffer. */ 00062 T* base; 00063 00064 /** @brief Maximal number of elements contained at the same time. */ 00065 sequence_index_t max_size; 00066 00067 /** @brief Cyclic begin and end pointers contained in one 00068 atomically changeable value. */ 00069 _GLIBCXX_VOLATILE lcas_t borders; 00070 00071 public: 00072 /** @brief Constructor. Not to be called concurrent, of course. 00073 * @param max_size Maximal number of elements to be contained. */ 00074 RestrictedBoundedConcurrentQueue(sequence_index_t max_size) 00075 { 00076 this->max_size = max_size; 00077 base = new T[max_size]; 00078 borders = encode2(0, 0); 00079 #pragma omp flush 00080 } 00081 00082 /** @brief Destructor. Not to be called concurrent, of course. */ 00083 ~RestrictedBoundedConcurrentQueue() 00084 { delete[] base; } 00085 00086 /** @brief Pushes one element into the queue at the front end. 00087 * Must not be called concurrently with pop_front(). */ 00088 void 00089 push_front(const T& t) 00090 { 00091 lcas_t former_borders = borders; 00092 int former_front, former_back; 00093 decode2(former_borders, former_front, former_back); 00094 *(base + former_front % max_size) = t; 00095 #if _GLIBCXX_ASSERTIONS 00096 // Otherwise: front - back > max_size eventually. 00097 _GLIBCXX_PARALLEL_ASSERT(((former_front + 1) - former_back) 00098 <= max_size); 00099 #endif 00100 fetch_and_add(&borders, encode2(1, 0)); 00101 } 00102 00103 /** @brief Pops one element from the queue at the front end. 00104 * Must not be called concurrently with pop_front(). */ 00105 bool 00106 pop_front(T& t) 00107 { 00108 int former_front, former_back; 00109 #pragma omp flush 00110 decode2(borders, former_front, former_back); 00111 while (former_front > former_back) 00112 { 00113 // Chance. 00114 lcas_t former_borders = encode2(former_front, former_back); 00115 lcas_t new_borders = encode2(former_front - 1, former_back); 00116 if (compare_and_swap(&borders, former_borders, new_borders)) 00117 { 00118 t = *(base + (former_front - 1) % max_size); 00119 return true; 00120 } 00121 #pragma omp flush 00122 decode2(borders, former_front, former_back); 00123 } 00124 return false; 00125 } 00126 00127 /** @brief Pops one element from the queue at the front end. 00128 * Must not be called concurrently with pop_front(). */ 00129 bool 00130 pop_back(T& t) //queue behavior 00131 { 00132 int former_front, former_back; 00133 #pragma omp flush 00134 decode2(borders, former_front, former_back); 00135 while (former_front > former_back) 00136 { 00137 // Chance. 00138 lcas_t former_borders = encode2(former_front, former_back); 00139 lcas_t new_borders = encode2(former_front, former_back + 1); 00140 if (compare_and_swap(&borders, former_borders, new_borders)) 00141 { 00142 t = *(base + former_back % max_size); 00143 return true; 00144 } 00145 #pragma omp flush 00146 decode2(borders, former_front, former_back); 00147 } 00148 return false; 00149 } 00150 }; 00151 } //namespace __gnu_parallel 00152 00153 #undef _GLIBCXX_VOLATILE 00154 00155 #endif