This is the mail archive of the libstdc++@gcc.gnu.org mailing list for the libstdc++ project.
| Index Nav: | [Date Index] [Subject Index] [Author Index] [Thread Index] | |
|---|---|---|
| Message Nav: | [Date Prev] [Date Next] | [Thread Prev] [Thread Next] |
| Other format: | [Raw text] | |
Major revision of the parallel mode algorithms, to work correctly even
if omp_get_dynamic(), PR 33893.
Also, the quicksort variants work correctly if !omp_get_nested().
However, the speedup is limited to 2 in this case. Getting rid of this
limitation is very elaborate since OpenMP 2.5 does not support barriers
among a subset of a team. Maybe things get better with OpenMP 3.0, I
still have to look into this.
Because the process included a lot of formatting cleanup, I'm also
attaching a version of the patch ignoring white space, for better
readability.
Please comment and/or approve.
tested x86_64-unknown-linux-gnu: no regressions
2007-11-16 Johannes Singler <singler@ira.uka.de>
*include/parallel/multiway_merge.h: made omp_dynamic-safe
*include/parallel/workstealing.h: made omp_dynamic-safe
*include/parallel/base.h: infrastructure, cleanup
*include/parallel/par_loop.h: made omp_dynamic-safe
*include/parallel/features.h: activate loser tree variant
*include/parallel/quicksort.h: made omp_dynamic-safe
*include/parallel/compiletime_settings.h: settings overridable
*include/parallel/equally_split.h: made omp_dynamic-safe
*include/parallel/omp_loop_static.h: made omp_dynamic-safe
*include/parallel/random_shuffle.h: made omp_dynamic-safe
*include/parallel/balanced_quicksort.h: made omp_dynamic-safe
*include/parallel/set_operations.h: made omp_dynamic-safe
*include/parallel/unique_copy.h: made omp_dynamic-safe
*include/parallel/multiway_mergesort.h: made omp_dynamic-safe
*include/parallel/search.h: made omp_dynamic-safe
*include/parallel/partition.h: made omp_dynamic-safe
*include/parallel/partial_sum.h: made omp_dynamic-safe
*include/parallel/find.h: made omp_dynamic-safe
*include/parallel/omp_loop.h: made omp_dynamic-safe
Johannes
Index: include/parallel/multiway_merge.h
===================================================================
--- include/parallel/multiway_merge.h (revision 130225)
+++ include/parallel/multiway_merge.h (working copy)
@@ -1343,11 +1343,6 @@
typedef typename std::iterator_traits<RandomAccessIterator1>::value_type
value_type;
-#if _GLIBCXX_ASSERTIONS
- for (RandomAccessIteratorIterator rii = seqs_begin; rii != seqs_end; rii++)
- _GLIBCXX_PARALLEL_ASSERT(is_sorted((*rii).first, (*rii).second, comp));
-#endif
-
// k sequences.
int k = static_cast<int>(seqs_end - seqs_begin);
@@ -1360,12 +1355,19 @@
if (total_length == 0 || k == 0)
return target;
+ bool tight = (total_length == length);
+
+ std::vector<std::pair<difference_type, difference_type> >* pieces;
+
thread_index_t num_threads = static_cast<thread_index_t>(std::min(static_cast<difference_type>(get_max_threads()), total_length));
- bool tight = (total_length == length);
-
+ #pragma omp parallel num_threads (num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
// Thread t will have to merge pieces[iam][0..k - 1]
- std::vector<std::pair<difference_type, difference_type> >* pieces = new std::vector<std::pair<difference_type, difference_type> >[num_threads];
+ pieces = new std::vector<std::pair<difference_type, difference_type> >[num_threads];
for (int s = 0; s < num_threads; s++)
pieces[s].resize(k);
@@ -1414,7 +1416,7 @@
copy(seqs_begin, seqs_end, se.begin());
- difference_type* borders = static_cast<difference_type*>(__builtin_alloca(sizeof(difference_type) * (num_threads + 1)));
+ difference_type* borders = new difference_type[num_threads + 1];
equally_split(length, num_threads, borders);
for (int s = 0; s < (num_threads - 1); s++)
@@ -1427,8 +1429,8 @@
if (!tight)
{
offsets[num_threads - 1].resize(k);
- multiseq_partition(se.begin(), se.end(),
- difference_type(length),
+ multiseq_partition(se.begin(), se.end(),
+ difference_type(length),
offsets[num_threads - 1].begin(), comp);
}
}
@@ -1458,9 +1460,8 @@
}
delete[] offsets;
}
+ } //single
-# pragma omp parallel num_threads(num_threads)
- {
thread_index_t iam = omp_get_thread_num();
difference_type target_position = 0;
@@ -1496,11 +1497,11 @@
(pieces[iam][0].second - pieces[iam][0].first) + (pieces[iam][1].second - pieces[iam][1].first),
comp);
}
- }
+ } //parallel
-#if _GLIBCXX_ASSERTIONS
+ #if _GLIBCXX_ASSERTIONS
_GLIBCXX_PARALLEL_ASSERT(is_sorted(target, target + length, comp));
-#endif
+ #endif
// Update ends of sequences.
for (int s = 0; s < k; s++)
Index: include/parallel/workstealing.h
===================================================================
--- include/parallel/workstealing.h (revision 130225)
+++ include/parallel/workstealing.h (working copy)
@@ -98,7 +98,8 @@
*/
template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
Op
- for_each_template_random_access_workstealing(RandomAccessIterator begin,
+ for_each_template_random_access_workstealing(
+ RandomAccessIterator begin,
RandomAccessIterator end,
Op op, Fu& f, Red r,
Result base, Result& output,
@@ -120,24 +121,30 @@
// Total number of threads currently working.
thread_index_t busy = 0;
- thread_index_t num_threads = get_max_threads();
- difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
+ Job<difference_type> *job;
+
omp_lock_t output_lock;
omp_init_lock(&output_lock);
+ // Write base value to output.
+ output = base;
+
// No more threads than jobs, at least one thread.
- difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
- num_threads = static_cast<thread_index_t>(num_threads_max);
+ thread_index_t num_threads =
+ __gnu_parallel::max<thread_index_t>(1, __gnu_parallel::min<difference_type>(length, get_max_threads()));
+ #pragma omp parallel shared(busy) num_threads(num_threads)
+ {
+
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+
// Create job description array.
- Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
+ job = new Job<difference_type>[num_threads * stride];
+ }
- // Write base value to output.
- output = base;
-
-#pragma omp parallel shared(busy) num_threads(num_threads)
- {
// Initialization phase.
// Flags for every thread if it is doing productive work.
@@ -161,8 +168,8 @@
// Every thread has its own random number generator (modulo num_threads).
random_number rand_gen(iam, num_threads);
-#pragma omp atomic
// This thread is currently working.
+ #pragma omp atomic
busy++;
iam_working = true;
@@ -170,7 +177,8 @@
// How many jobs per thread? last thread gets the rest.
my_job.first = static_cast<difference_type>(iam * (length / num_threads));
- my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
+ my_job.last = (iam == (num_threads - 1)) ?
+ (length - 1) : ((iam + 1) * (length / num_threads) - 1);
my_job.load = my_job.last - my_job.first + 1;
// Init result with first value (to have a base value for reduction).
@@ -185,26 +193,29 @@
RandomAccessIterator current;
-#pragma omp barrier
+ #pragma omp barrier
// Actual work phase
// Work on own or stolen start
while (busy > 0)
{
// Work until no productive thread left.
-#pragma omp flush(busy)
+ #pragma omp flush(busy)
// Thread has own work to do
while (my_job.first <= my_job.last)
{
// fetch-and-add call
// Reserve current job block (size chunk_size) in my queue.
- difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
+ difference_type current_job =
+ fetch_and_add<difference_type>(&(my_job.first), chunk_size);
// Update load, to make the three values consistent,
// first might have been changed in the meantime
my_job.load = my_job.last - my_job.first + 1;
- for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
+ for (difference_type job_counter = 0;
+ job_counter < chunk_size && current_job <= my_job.last;
+ job_counter++)
{
// Yes: process it!
current = begin + current_job;
@@ -214,15 +225,14 @@
result = r(result, f(op, current));
}
-#pragma omp flush(busy)
-
+ #pragma omp flush(busy)
}
// After reaching this point, a thread's job list is empty.
if (iam_working)
{
-#pragma omp atomic
// This thread no longer has work.
+ #pragma omp atomic
busy--;
iam_working = false;
@@ -233,7 +243,7 @@
{
// Find random nonempty deque (not own) and do consistency check.
yield();
-#pragma omp flush(busy)
+ #pragma omp flush(busy)
victim = rand_gen();
supposed_first = job[victim * stride].first;
supposed_last = job[victim * stride].last;
@@ -262,29 +272,24 @@
// omp_unset_lock(&(job[victim * stride].lock));
my_job.first = stolen_first;
-
- // Avoid std::min dependencies.
- my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
-
+ my_job.last = __gnu_parallel::min(stolen_try, supposed_last);
my_job.load = my_job.last - my_job.first + 1;
//omp_unset_lock(&(my_job.lock));
-#pragma omp atomic
// Has potential work again.
+ #pragma omp atomic
busy++;
iam_working = true;
-#pragma omp flush(busy)
+ #pragma omp flush(busy)
}
-#pragma omp flush(busy)
+ #pragma omp flush(busy)
} // end while busy > 0
// Add accumulated result to output.
omp_set_lock(&output_lock);
output = r(output, result);
omp_unset_lock(&output_lock);
-
- //omp_destroy_lock(&(my_job.lock));
}
delete[] job;
Index: include/parallel/base.h
===================================================================
--- include/parallel/base.h (revision 130225)
+++ include/parallel/base.h (working copy)
@@ -92,6 +92,20 @@
b = (int)((x >> 0 ) & lcas_t_mask);
}
+ /** @brief Equivalent to std::min. */
+ template<typename T>
+ const T& min(const T& a, const T& b)
+ {
+ return (a < b) ? a : b;
+ };
+
+ /** @brief Equivalent to std::max. */
+ template<typename T>
+ const T& max(const T& a, const T& b)
+ {
+ return (a > b) ? a : b;
+ };
+
/** @brief Constructs predicate for equality from strict weak
* ordering predicate
*/
Index: include/parallel/par_loop.h
===================================================================
--- include/parallel/par_loop.h (revision 130225)
+++ include/parallel/par_loop.h (working copy)
@@ -41,6 +41,7 @@
#include <omp.h>
#include <parallel/settings.h>
+#include <parallel/base.h>
namespace __gnu_parallel
{
@@ -65,45 +66,47 @@
*/
template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
Op
- for_each_template_random_access_ed(RandomAccessIterator begin,
- RandomAccessIterator end, Op o, Fu& f,
- Red r, Result base, Result& output,
+ for_each_template_random_access_ed(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op o, Fu& f, Red r, Result base, Result& output,
typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::difference_type difference_type;
const difference_type length = end - begin;
- const difference_type settings_threads = static_cast<difference_type>(get_max_threads());
- const difference_type dmin = settings_threads < length ? settings_threads : length;
- const difference_type dmax = dmin > 1 ? dmin : 1;
+ Result *thread_results;
- thread_index_t num_threads = static_cast<thread_index_t>(dmax);
+ thread_index_t num_threads = __gnu_parallel::min<difference_type>(get_max_threads(), length);
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ thread_results = new Result[num_threads];
+ }
- Result *thread_results = new Result[num_threads];
+ thread_index_t iam = omp_get_thread_num();
-#pragma omp parallel num_threads(num_threads)
- {
// Neutral element.
Result reduct = Result();
- thread_index_t p = num_threads;
- thread_index_t iam = omp_get_thread_num();
- difference_type start = iam * length / p;
- difference_type limit = (iam == p - 1) ? length : (iam + 1) * length / p;
+ difference_type start = equally_split_point(length, num_threads, iam),
+ stop = equally_split_point(length, num_threads, iam + 1);
- if (start < limit)
+ if (start < stop)
{
reduct = f(o, begin + start);
- start++;
+ ++start;
}
- for (; start < limit; start++)
+ for (; start < stop; ++start)
reduct = r(reduct, f(o, begin + start));
thread_results[iam] = reduct;
- }
+ } //parallel
for (thread_index_t i = 0; i < num_threads; i++)
output = r(output, thread_results[i]);
Index: include/parallel/features.h
===================================================================
--- include/parallel/features.h (revision 130225)
+++ include/parallel/features.h (working copy)
@@ -66,7 +66,7 @@
* @brief Include guarded (sequences may run empty) loser tree,
* moving objects.
* @see __gnu_parallel::Settings multiway_merge_algorithm */
-#define _GLIBCXX_LOSER_TREE 0
+#define _GLIBCXX_LOSER_TREE 1
#endif
#ifndef _GLIBCXX_LOSER_TREE_EXPLICIT
Index: include/parallel/quicksort.h
===================================================================
--- include/parallel/quicksort.h (revision 130225)
+++ include/parallel/quicksort.h (working copy)
@@ -65,13 +65,15 @@
difference_type n = end - begin;
num_samples = std::min(num_samples, n);
- value_type* samples = static_cast<value_type*>(__builtin_alloca(sizeof(value_type) * num_samples));
+ // Allocate uninitialized, to avoid default constructor.
+ value_type* samples = static_cast<value_type*>(operator new(num_samples * sizeof(value_type)));
+
for (difference_type s = 0; s < num_samples; s++)
{
- const unsigned long long index = static_cast<unsigned long long>(s)
+ const unsigned long long index = static_cast<unsigned long long>(s)
* n / num_samples;
- samples[s] = begin[index];
+ new(samples + s) value_type(begin[index]);
}
__gnu_sequential::sort(samples, samples + num_samples, comp);
@@ -110,14 +112,14 @@
if (n <= 1)
return;
- thread_index_t num_processors_left;
+ thread_index_t num_threads_left;
if ((num_threads % 2) == 1)
- num_processors_left = num_threads / 2 + 1;
+ num_threads_left = num_threads / 2 + 1;
else
- num_processors_left = num_threads / 2;
+ num_threads_left = num_threads / 2;
- pivot_rank = n * num_processors_left / num_threads;
+ pivot_rank = n * num_threads_left / num_threads;
difference_type split = parallel_sort_qs_divide(begin, end, comp, pivot_rank,
Settings::sort_qs_num_samples_preset, num_threads);
@@ -125,9 +127,9 @@
#pragma omp parallel sections
{
#pragma omp section
- parallel_sort_qs_conquer(begin, begin + split, comp, num_processors_left);
+ parallel_sort_qs_conquer(begin, begin + split, comp, num_threads_left);
#pragma omp section
- parallel_sort_qs_conquer(begin + split, end, comp, num_threads - num_processors_left);
+ parallel_sort_qs_conquer(begin + split, end, comp, num_threads - num_threads_left);
}
}
@@ -165,10 +167,7 @@
// Hard to avoid.
omp_set_num_threads(num_threads);
- bool old_nested = (omp_get_nested() != 0);
- omp_set_nested(true);
parallel_sort_qs_conquer(begin, begin + n, comp, num_threads);
- omp_set_nested(old_nested);
}
} //namespace __gnu_parallel
Index: include/parallel/compiletime_settings.h
===================================================================
--- include/parallel/compiletime_settings.h (revision 130225)
+++ include/parallel/compiletime_settings.h (working copy)
@@ -53,24 +53,33 @@
#define _GLIBCXX_CALL(n) printf(" %s:\niam = %d, n = %ld, num_threads = %d\n", __PRETTY_FUNCTION__, omp_get_thread_num(), (n), get_max_threads());
#endif
+#ifndef _GLIBCXX_SCALE_DOWN_FPU
/** @brief Use floating-point scaling instead of modulo for mapping
* random numbers to a range. This can be faster on certain CPUs. */
#define _GLIBCXX_SCALE_DOWN_FPU 0
+#endif
+#ifndef _GLIBCXX_ASSERTIONS
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Should be switched on only locally. */
#define _GLIBCXX_ASSERTIONS 0
+#endif
+#ifndef _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_L1
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Consider the size of the L1 cache for __gnu_parallel::parallel_random_shuffle(). */
#define _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_L1 0
+#endif
+#ifndef _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Consider the size of the TLB for __gnu_parallel::parallel_random_shuffle(). */
#define _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB 0
+#endif
+#ifndef _GLIBCXX_MULTIWAY_MERGESORT_COPY_LAST
/** @brief First copy the data, sort it locally, and merge it back
* (0); or copy it back after everything is done (1).
*
* Recommendation: 0 */
#define _GLIBCXX_MULTIWAY_MERGESORT_COPY_LAST 0
-
+#endif
Index: include/parallel/equally_split.h
===================================================================
--- include/parallel/equally_split.h (revision 130225)
+++ include/parallel/equally_split.h (working copy)
@@ -39,30 +39,51 @@
namespace __gnu_parallel
{
- /** @brief Function to split a sequence into parts of almost equal size.
+ /** @brief Function to num_longer_chunks a sequence into parts of almost equal size.
*
- * The resulting sequence s of length p+1 contains the splitting
+ * The resulting sequence s of length num_threads+1 contains the splitting
* positions when splitting the range [0,n) into parts of almost
* equal size (plus minus 1). The first entry is 0, the last one
* n. There may result empty parts.
* @param n Number of elements
- * @param p Number of parts
+ * @param num_threads Number of parts
* @param s Splitters
- * @returns End of splitter sequence, i. e. @c s+p+1 */
+ * @returns End of splitter sequence, i. e. @c s+num_threads+1 */
template<typename _DifferenceTp, typename OutputIterator>
OutputIterator
- equally_split(_DifferenceTp n, thread_index_t p, OutputIterator s)
+ equally_split(_DifferenceTp n, thread_index_t num_threads, OutputIterator s)
{
typedef _DifferenceTp difference_type;
- difference_type chunk_length = n / p, split = n % p, start = 0;
- for (int i = 0; i < p; i++)
+ difference_type chunk_length = n / num_threads, num_longer_chunks = n % num_threads, pos = 0;
+ for (thread_index_t i = 0; i < num_threads; ++i)
{
- *s++ = start;
- start += (difference_type(i) < split) ? (chunk_length + 1) : chunk_length;
+ *s++ = pos;
+ pos += (i < num_longer_chunks) ? (chunk_length + 1) : chunk_length;
}
*s++ = n;
return s;
}
+
+
+ /** @brief Function to num_longer_chunks a sequence into parts of almost equal size.
+ *
+ * Returns the position of the splitting point between
+ * thread number thread_no (included) and
+ * thread number thread_no+1 (excluded).
+ * @param n Number of elements
+ * @param num_threads Number of parts
+ * @returns Splitting point */
+ template<typename _DifferenceTp>
+ _DifferenceTp
+ equally_split_point(_DifferenceTp n, thread_index_t num_threads, thread_index_t thread_no)
+ {
+ typedef _DifferenceTp difference_type;
+ difference_type chunk_length = n / num_threads, num_longer_chunks = n % num_threads;
+ if(thread_no < num_longer_chunks)
+ return thread_no * (chunk_length + 1);
+ else
+ return num_longer_chunks * (chunk_length + 1) + (thread_no - num_longer_chunks) * chunk_length;
+ }
}
#endif
Index: include/parallel/omp_loop_static.h
===================================================================
--- include/parallel/omp_loop_static.h (revision 130225)
+++ include/parallel/omp_loop_static.h (working copy)
@@ -64,39 +64,47 @@
* std::count_n()).
* @return User-supplied functor (that may contain a part of the result).
*/
- template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
+ template<typename RandomAccessIterator,
+ typename Op,
+ typename Fu,
+ typename Red,
+ typename Result>
Op
- for_each_template_random_access_omp_loop_static(RandomAccessIterator begin,
+ for_each_template_random_access_omp_loop_static(
+ RandomAccessIterator begin,
RandomAccessIterator end,
- Op o, Fu& f, Red r,
- Result base, Result& output,
+ Op o, Fu& f, Red r, Result base, Result& output,
typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
- typedef std::iterator_traits<RandomAccessIterator> traits_type;
- typedef typename traits_type::difference_type difference_type;
+ typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
+ difference_type;
- thread_index_t num_threads = (get_max_threads() < (end - begin)) ? get_max_threads() : (end - begin);
- Result *thread_results = new Result[num_threads];
difference_type length = end - begin;
+ thread_index_t num_threads = std::min<difference_type>(get_max_threads(), length);
- for (thread_index_t i = 0; i < num_threads; i++)
- {
- thread_results[i] = r(thread_results[i], f(o, begin+i));
- }
+ Result *thread_results;
-#pragma omp parallel num_threads(num_threads)
+ #pragma omp parallel num_threads(num_threads)
{
-#pragma omp for schedule(static, Settings::workstealing_chunk_size)
- for (difference_type pos = 0; pos < length; pos++)
+ #pragma omp single
{
- thread_results[omp_get_thread_num()] = r(thread_results[omp_get_thread_num()], f(o, begin+pos));
+ num_threads = omp_get_num_threads();
+ thread_results = new Result[num_threads];
+
+ for (thread_index_t i = 0; i < num_threads; i++)
+ thread_results[i] = Result();
}
- }
+ thread_index_t iam = omp_get_thread_num();
+
+ #pragma omp for schedule(static, Settings::workstealing_chunk_size)
+ for (difference_type pos = 0; pos < length; pos++)
+ thread_results[iam] =
+ r(thread_results[iam], f(o, begin+pos));
+ } //parallel
+
for (thread_index_t i = 0; i < num_threads; i++)
- {
output = r(output, thread_results[i]);
- }
delete [] thread_results;
@@ -106,6 +114,7 @@
return o;
}
+
} // end namespace
#endif
Index: include/parallel/random_shuffle.h
===================================================================
--- include/parallel/random_shuffle.h (revision 130225)
+++ include/parallel/random_shuffle.h (working copy)
@@ -99,9 +99,6 @@
/** @brief Number of threads participating in total. */
int num_threads;
- /** @brief Number of owning thread. */
- int iam;
-
/** @brief Begin index for bins taken care of by this thread. */
bin_index bins_begin;
@@ -135,9 +132,9 @@
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
- DRSSorterPU<RandomAccessIterator, RandomNumberGenerator>* d = &pus[omp_get_thread_num()];
+ thread_index_t iam = omp_get_thread_num();
+ DRSSorterPU<RandomAccessIterator, RandomNumberGenerator>* d = &pus[iam];
DRandomShufflingGlobalData<RandomAccessIterator>* sd = d->sd;
- thread_index_t iam = d->iam;
// Indexing: dist[bin][processor]
difference_type length = sd->starts[iam + 1] - sd->starts[iam];
@@ -258,7 +255,7 @@
*/
template<typename RandomAccessIterator, typename RandomNumberGenerator>
inline void
- parallel_random_shuffle_drs(RandomAccessIterator begin, RandomAccessIterator end, typename std::iterator_traits<RandomAccessIterator>::difference_type n, int num_threads, RandomNumberGenerator& rng)
+ parallel_random_shuffle_drs(RandomAccessIterator begin, RandomAccessIterator end, typename std::iterator_traits<RandomAccessIterator>::difference_type n, thread_index_t num_threads, RandomNumberGenerator& rng)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -280,11 +277,11 @@
// No more buckets than TLB entries, power of 2
// Power of 2 and at least one element per bin, at most the TLB size.
- num_bins = std::min(n, (difference_type)num_bins_cache);
+ num_bins = std::min<difference_type>(n, num_bins_cache);
#if _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB
// 2 TLB entries needed per bin.
- num_bins = std::min((difference_type)Settings::TLB_size / 2, num_bins);
+ num_bins = std::min<difference_type>(Settings::TLB_size / 2, num_bins);
#endif
num_bins = round_up_to_pow2(num_bins);
@@ -308,14 +305,20 @@
}
#endif
- num_threads = std::min((bin_index)num_threads, (bin_index)num_bins);
+ num_threads = std::min<bin_index>(num_threads, num_bins);
if (num_threads <= 1)
return sequential_random_shuffle(begin, end, rng);
DRandomShufflingGlobalData<RandomAccessIterator> sd(begin);
+ DRSSorterPU<RandomAccessIterator, random_number >* pus;
+ difference_type* starts;
- DRSSorterPU<RandomAccessIterator, random_number >* pus = new DRSSorterPU<RandomAccessIterator, random_number >[num_threads];
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ pus = new DRSSorterPU<RandomAccessIterator, random_number >[num_threads];
sd.temporaries = new value_type*[num_threads];
//sd.oracles = new bin_index[n];
@@ -328,7 +331,7 @@
sd.dist[0][0] = 0;
sd.dist[b][0] = 0;
}
- difference_type* starts = sd.starts = new difference_type[num_threads + 1];
+ starts = sd.starts = new difference_type[num_threads + 1];
int bin_cursor = 0;
sd.num_bins = num_bins;
sd.num_bits = log2(num_bins);
@@ -347,15 +350,14 @@
for (; j < bin_cursor; j++)
sd.bin_proc[j] = i;
pus[i].num_threads = num_threads;
- pus[i].iam = i;
pus[i].seed = rng(std::numeric_limits<uint32>::max());
pus[i].sd = &sd;
}
starts[num_threads] = start;
-
+ } //single
// Now shuffle in parallel.
-#pragma omp parallel num_threads(num_threads)
parallel_random_shuffle_drs_pu(pus);
+ }
delete[] starts;
delete[] sd.bin_proc;
Index: include/parallel/balanced_quicksort.h
===================================================================
--- include/parallel/balanced_quicksort.h (revision 130225)
+++ include/parallel/balanced_quicksort.h (working copy)
@@ -94,18 +94,6 @@
QSBThreadLocal(int queue_size) : leftover_parts(queue_size) { }
};
- /** @brief Initialize the thread local storage.
- * @param tls Array of thread-local storages.
- * @param queue_size Size of the work-stealing queue. */
- template<typename RandomAccessIterator>
- inline void
- qsb_initialize(QSBThreadLocal<RandomAccessIterator>** tls, int queue_size)
- {
- int iam = omp_get_thread_num();
- tls[iam] = new QSBThreadLocal<RandomAccessIterator>(queue_size);
- }
-
-
/** @brief Balanced quicksort divide step.
* @param begin Begin iterator of subsequence.
* @param end End iterator of subsequence.
@@ -116,7 +104,7 @@
template<typename RandomAccessIterator, typename Comparator>
inline typename std::iterator_traits<RandomAccessIterator>::difference_type
qsb_divide(RandomAccessIterator begin, RandomAccessIterator end,
- Comparator comp, int num_threads)
+ Comparator comp, thread_index_t num_threads)
{
_GLIBCXX_PARALLEL_ASSERT(num_threads > 0);
@@ -174,7 +162,9 @@
inline void
qsb_conquer(QSBThreadLocal<RandomAccessIterator>** tls,
RandomAccessIterator begin, RandomAccessIterator end,
- Comparator comp, thread_index_t iam, thread_index_t num_threads)
+ Comparator comp,
+ thread_index_t iam, thread_index_t num_threads,
+ bool parent_wait)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -182,12 +172,12 @@
difference_type n = end - begin;
- if (num_threads <= 1 || n < 2)
+ if (num_threads <= 1 || n <= 1)
{
tls[iam]->initial.first = begin;
tls[iam]->initial.second = end;
- qsb_local_sort_with_helping(tls, comp, iam);
+ qsb_local_sort_with_helping(tls, comp, iam, parent_wait);
return;
}
@@ -201,22 +191,37 @@
thread_index_t num_threads_leftside = std::max<thread_index_t>(1, std::min<thread_index_t>(num_threads - 1, split_pos * num_threads / n));
-#pragma omp atomic
+ #pragma omp atomic
*tls[iam]->elements_leftover -= (difference_type)1;
// Conquer step.
-#pragma omp parallel sections num_threads(2)
+ #pragma omp parallel num_threads(2)
{
-#pragma omp section
- qsb_conquer(tls, begin, begin + split_pos, comp, iam, num_threads_leftside);
+ bool wait;
+ if(omp_get_num_threads() < 2)
+ wait = false;
+ else
+ wait = parent_wait;
+
+ #pragma omp sections
+ {
+ #pragma omp section
+ {
+ qsb_conquer(tls, begin, begin + split_pos, comp, iam, num_threads_leftside, wait);
+ wait = parent_wait;
+ }
// The pivot_pos is left in place, to ensure termination.
-#pragma omp section
+ #pragma omp section
+ {
qsb_conquer(tls, begin + split_pos + 1, end, comp,
- iam + num_threads_leftside, num_threads - num_threads_leftside);
+ iam + num_threads_leftside, num_threads - num_threads_leftside, wait);
+ wait = parent_wait;
}
}
+ }
+ }
- /**
+ /**
* @brief Quicksort step doing load-balanced local sort.
* @param tls Array of thread-local storages.
* @param comp Comparator.
@@ -225,7 +230,7 @@
template<typename RandomAccessIterator, typename Comparator>
inline void
qsb_local_sort_with_helping(QSBThreadLocal<RandomAccessIterator>** tls,
- Comparator& comp, int iam)
+ Comparator& comp, int iam, bool wait)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -292,10 +297,8 @@
split_pos2 = __gnu_sequential::partition(split_pos1 + 1, end, pred);
}
else
- {
// Only skip the pivot.
split_pos2 = split_pos1 + 1;
- }
// Elements equal to pivot are done.
elements_done += (split_pos2 - split_pos1);
@@ -345,8 +348,8 @@
#endif
// Look for new work.
- bool success = false;
- while (*tl.elements_leftover > 0 && !success
+ bool successfully_stolen = false;
+ while (wait && *tl.elements_leftover > 0 && !successfully_stolen
#if _GLIBCXX_ASSERTIONS
// Possible dead-lock.
&& (omp_get_wtime() < (search_start + 1.0))
@@ -357,11 +360,11 @@
victim = rng(num_threads);
// Large pieces.
- success = (victim != iam) && tls[victim]->leftover_parts.pop_back(current);
- if (!success)
+ successfully_stolen = (victim != iam) && tls[victim]->leftover_parts.pop_back(current);
+ if (!successfully_stolen)
yield();
#if !defined(__ICC) && !defined(__ECC)
-#pragma omp flush
+ #pragma omp flush
#endif
}
@@ -372,7 +375,7 @@
_GLIBCXX_PARALLEL_ASSERT(omp_get_wtime() < (search_start + 1.0));
}
#endif
- if (!success)
+ if (!successfully_stolen)
{
#if _GLIBCXX_ASSERTIONS
_GLIBCXX_PARALLEL_ASSERT(*tl.elements_leftover == 0);
@@ -395,7 +398,8 @@
inline void
parallel_sort_qsb(RandomAccessIterator begin, RandomAccessIterator end,
Comparator comp,
- typename std::iterator_traits<RandomAccessIterator>::difference_type n, int num_threads)
+ typename std::iterator_traits<RandomAccessIterator>::difference_type n,
+ thread_index_t num_threads)
{
_GLIBCXX_CALL(end - begin)
@@ -413,12 +417,12 @@
if (num_threads > n)
num_threads = static_cast<thread_index_t>(n);
+ // Initialize thread local storage
tls_type** tls = new tls_type*[num_threads];
+ difference_type queue_size = num_threads * (thread_index_t)(log2(n) + 1);
+ for (thread_index_t t = 0; t < num_threads; ++t)
+ tls[t] = new QSBThreadLocal<RandomAccessIterator>(queue_size);
-#pragma omp parallel num_threads(num_threads)
- // Initialize variables per processor.
- qsb_initialize(tls, num_threads * (thread_index_t)(log2(n) + 1));
-
// There can never be more than ceil(log2(n)) ranges on the stack, because
// 1. Only one processor pushes onto the stack
// 2. The largest range has at most length n
@@ -435,13 +439,13 @@
}
// Initial splitting, recursively.
- int old_nested = omp_get_nested();
- omp_set_nested(true);
+// int old_nested = omp_get_nested();
+// omp_set_nested(true);
// Main recursion call.
- qsb_conquer(tls, begin, begin + n, comp, 0, num_threads);
+ qsb_conquer(tls, begin, begin + n, comp, 0, num_threads, true);
- omp_set_nested(old_nested);
+// omp_set_nested(old_nested);
#if _GLIBCXX_ASSERTIONS
// All stack must be empty.
Index: include/parallel/set_operations.h
===================================================================
--- include/parallel/set_operations.h (revision 130225)
+++ include/parallel/set_operations.h (working copy)
@@ -355,7 +355,6 @@
typedef typename traits_type::difference_type difference_type;
typedef typename std::pair<InputIterator, InputIterator> iterator_pair;
-
if (begin1 == end1)
return op.first_empty(begin2, end2, result);
@@ -364,27 +363,33 @@
const difference_type size = (end1 - begin1) + (end2 - begin2);
+ const iterator_pair sequence[ 2 ] =
+ { std::make_pair(begin1, end1), std::make_pair(begin2, end2) } ;
+ OutputIterator return_value = result;
+ difference_type *borders;
+ iterator_pair *block_begins;
+ difference_type* lengths;
+
thread_index_t num_threads = std::min<difference_type>(std::min(end1 - begin1, end2 - begin2), get_max_threads());
- difference_type borders[num_threads + 2];
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+
+ borders = new difference_type[num_threads + 2];
equally_split(size, num_threads + 1, borders);
-
- const iterator_pair sequence[ 2 ] = { std::make_pair(begin1, end1), std::make_pair(begin2, end2) } ;
-
- iterator_pair block_begins[num_threads + 1];
-
+ block_begins = new iterator_pair[num_threads + 1];
// Very start.
block_begins[0] = std::make_pair(begin1, begin2);
- difference_type length[num_threads];
+ lengths = new difference_type[num_threads];
+ } //single
- OutputIterator return_value = result;
+ thread_index_t iam = omp_get_thread_num();
-#pragma omp parallel num_threads(num_threads)
- {
// Result from multiseq_partition.
InputIterator offset[2];
- const int iam = omp_get_thread_num();
-
const difference_type rank = borders[iam + 1];
multiseq_partition(sequence, sequence + 2, rank, offset, op.comp);
@@ -404,7 +409,7 @@
iterator_pair block_end = block_begins[ iam + 1 ] = iterator_pair(offset[ 0 ], offset[ 1 ]);
// Make sure all threads have their block_begin result written out.
-#pragma omp barrier
+ #pragma omp barrier
iterator_pair block_begin = block_begins[ iam ];
@@ -413,16 +418,16 @@
if (iam == 0)
{
// The first thread can copy already.
- length[ iam ] = op.invoke(block_begin.first, block_end.first, block_begin.second, block_end.second, result) - result;
+ lengths[ iam ] = op.invoke(block_begin.first, block_end.first, block_begin.second, block_end.second, result) - result;
}
else
{
- length[ iam ] = op.count(block_begin.first, block_end.first,
+ lengths[ iam ] = op.count(block_begin.first, block_end.first,
block_begin.second, block_end.second);
}
// Make sure everyone wrote their lengths.
-#pragma omp barrier
+ #pragma omp barrier
OutputIterator r = result;
@@ -430,7 +435,7 @@
{
// Do the last block.
for (int i = 0; i < num_threads; ++i)
- r += length[i];
+ r += lengths[i];
block_begin = block_begins[num_threads];
@@ -441,7 +446,7 @@
else
{
for (int i = 0; i < iam; ++i)
- r += length[ i ];
+ r += lengths[ i ];
// Reset begins for copy pass.
op.invoke(block_begin.first, block_end.first,
@@ -475,7 +480,9 @@
template<typename InputIterator, typename OutputIterator>
OutputIterator
- set_intersection(InputIterator begin1, InputIterator end1, InputIterator begin2, InputIterator end2, OutputIterator result)
+ set_intersection(InputIterator begin1, InputIterator end1,
+ InputIterator begin2, InputIterator end2,
+ OutputIterator result)
{
typedef std::iterator_traits<InputIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -496,7 +503,9 @@
template<typename InputIterator, typename OutputIterator, typename Comparator>
OutputIterator
- parallel_set_symmetric_difference(InputIterator begin1, InputIterator end1, InputIterator begin2, InputIterator end2, OutputIterator result, Comparator comp)
+ parallel_set_symmetric_difference(InputIterator begin1, InputIterator end1,
+ InputIterator begin2, InputIterator end2,
+ OutputIterator result, Comparator comp)
{
return parallel_set_operation(begin1, end1, begin2, end2, result,
symmetric_difference_func<InputIterator, OutputIterator, Comparator>(comp));
@@ -505,11 +514,3 @@
}
#endif // _GLIBCXX_SET_ALGORITHM_
-
-
-
-
-
-
-
-
Index: include/parallel/unique_copy.h
===================================================================
--- include/parallel/unique_copy.h (revision 130225)
+++ include/parallel/unique_copy.h (working copy)
@@ -62,27 +62,35 @@
typedef typename traits_type::difference_type difference_type;
difference_type size = last - first;
- int num_threads = __gnu_parallel::get_max_threads();
- difference_type counter[num_threads + 1];
if (size == 0)
return result;
// Let the first thread process two parts.
- difference_type borders[num_threads + 2];
- __gnu_parallel::equally_split(size, num_threads + 1, borders);
+ difference_type *counter;
+ difference_type *borders;
+ thread_index_t num_threads = get_max_threads();
// First part contains at least one element.
-#pragma omp parallel num_threads(num_threads)
+ #pragma omp parallel num_threads(num_threads)
{
- int iam = omp_get_thread_num();
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ borders = new difference_type[num_threads + 2];
+ equally_split(size, num_threads + 1, borders);
+ counter = new difference_type[num_threads + 1];
+ }
+ thread_index_t iam = omp_get_thread_num();
+
difference_type begin, end;
// Check for length without duplicates
// Needed for position in output
difference_type i = 0;
OutputIterator out = result;
+
if (iam == 0)
{
begin = borders[0] + 1; // == 1
@@ -170,6 +178,8 @@
for (int t = 0; t < num_threads + 1; t++)
end_output += counter[t];
+ delete[] borders;
+
return result + end_output;
}
Index: include/parallel/multiway_mergesort.h
===================================================================
--- include/parallel/multiway_mergesort.h (revision 130225)
+++ include/parallel/multiway_mergesort.h (working copy)
@@ -71,6 +71,9 @@
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
+ /** @brief Number of threads involved. */
+ thread_index_t num_threads;
+
/** @brief Input begin. */
RandomAccessIterator source;
@@ -105,62 +108,55 @@
/** @brief Pieces of data to merge @c [thread][sequence] */
std::vector<Piece<difference_type> >* pieces;
- };
- /** @brief Thread local data for PMWMS. */
- template<typename RandomAccessIterator>
- struct PMWMSSorterPU
- {
- /** @brief Total number of thread involved. */
- thread_index_t num_threads;
- /** @brief Number of owning thread. */
- thread_index_t iam;
/** @brief Stable sorting desired. */
bool stable;
- /** @brief Pointer to global data. */
- PMWMSSortingData<RandomAccessIterator>* sd;
- };
+};
/**
* @brief Select samples from a sequence.
- * @param d Pointer to thread-local data. Result will be placed in
- * @c d->ds->samples.
+ * @param sd Pointer to algorithm data. Result will be placed in
+ * @c sd->samples.
* @param num_samples Number of samples to select.
*/
template<typename RandomAccessIterator, typename _DifferenceTp>
inline void
- determine_samples(PMWMSSorterPU<RandomAccessIterator>* d,
+ determine_samples(PMWMSSortingData<RandomAccessIterator>* sd,
_DifferenceTp& num_samples)
{
typedef _DifferenceTp difference_type;
- PMWMSSortingData<RandomAccessIterator>* sd = d->sd;
+ thread_index_t iam = omp_get_thread_num();
- num_samples = Settings::sort_mwms_oversampling * d->num_threads - 1;
+ num_samples =
+ Settings::sort_mwms_oversampling * sd->num_threads - 1;
- difference_type* es = static_cast<difference_type*>(__builtin_alloca(sizeof(difference_type) * (num_samples + 2)));
+ difference_type* es = new difference_type[num_samples + 2];
- equally_split(sd->starts[d->iam + 1] - sd->starts[d->iam], num_samples + 1, es);
+ equally_split(sd->starts[iam + 1] - sd->starts[iam],
+ num_samples + 1, es);
for (difference_type i = 0; i < num_samples; i++)
- sd->samples[d->iam * num_samples + i] = sd->source[sd->starts[d->iam] + es[i + 1]];
+ sd->samples[iam * num_samples + i] =
+ sd->source[sd->starts[iam] + es[i + 1]];
+
+ delete[] es;
}
/** @brief PMWMS code executed by each thread.
- * @param d Pointer to thread-local data.
+ * @param sd Pointer to algorithm data.
* @param comp Comparator.
*/
template<typename RandomAccessIterator, typename Comparator>
inline void
- parallel_sort_mwms_pu(PMWMSSorterPU<RandomAccessIterator>* d,
+ parallel_sort_mwms_pu(PMWMSSortingData<RandomAccessIterator>* sd,
Comparator& comp)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
- PMWMSSortingData<RandomAccessIterator>* sd = d->sd;
- thread_index_t iam = d->iam;
+ thread_index_t iam = omp_get_thread_num();
// Length of this thread's chunk, before merging.
difference_type length_local = sd->starts[iam + 1] - sd->starts[iam];
@@ -174,44 +170,49 @@
typedef value_type* SortingPlacesIterator;
// Sort in temporary storage, leave space for sentinel.
- sd->sorting_places[iam] = sd->temporaries[iam] = static_cast<value_type*>(::operator new(sizeof(value_type) * (length_local + 1)));
+ sd->sorting_places[iam] = sd->temporaries[iam] =
+ static_cast<value_type*>(
+ ::operator new(sizeof(value_type) * (length_local + 1)));
// Copy there.
- std::uninitialized_copy(sd->source + sd->starts[iam], sd->source + sd->starts[iam] + length_local, sd->sorting_places[iam]);
+ std::uninitialized_copy(sd->source + sd->starts[iam],
+ sd->source + sd->starts[iam] + length_local,
+ sd->sorting_places[iam]);
#endif
// Sort locally.
- if (d->stable)
- __gnu_sequential::stable_sort(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp);
+ if (sd->stable)
+ __gnu_sequential::stable_sort(sd->sorting_places[iam],
+ sd->sorting_places[iam] + length_local,
+ comp);
else
- __gnu_sequential::sort(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp);
+ __gnu_sequential::sort(sd->sorting_places[iam],
+ sd->sorting_places[iam] + length_local,
+ comp);
-#if _GLIBCXX_ASSERTIONS
- _GLIBCXX_PARALLEL_ASSERT(is_sorted(sd->sorting_places[iam], sd->sorting_places[iam] + length_local, comp));
-#endif
-
// Invariant: locally sorted subsequence in sd->sorting_places[iam],
// sd->sorting_places[iam] + length_local.
if (Settings::sort_splitting == Settings::SAMPLING)
{
difference_type num_samples;
- determine_samples(d, num_samples);
+ determine_samples(sd, num_samples);
#pragma omp barrier
#pragma omp single
- __gnu_sequential::sort(sd->samples,
- sd->samples + (num_samples * d->num_threads),
+ __gnu_sequential::sort(sd->samples,
+ sd->samples + (num_samples * sd->num_threads),
comp);
#pragma omp barrier
- for (int s = 0; s < d->num_threads; s++)
+ for (int s = 0; s < sd->num_threads; s++)
{
// For each sequence.
if (num_samples * iam > 0)
- sd->pieces[iam][s].begin = std::lower_bound(sd->sorting_places[s],
+ sd->pieces[iam][s].begin =
+ std::lower_bound(sd->sorting_places[s],
sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s],
sd->samples[num_samples * iam],
comp)
@@ -220,43 +221,47 @@
// Absolute beginning.
sd->pieces[iam][s].begin = 0;
- if ((num_samples * (iam + 1)) < (num_samples * d->num_threads))
- sd->pieces[iam][s].end = std::lower_bound(sd->sorting_places[s],
- sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s], sd->samples[num_samples * (iam + 1)], comp)
+ if ((num_samples * (iam + 1)) < (num_samples * sd->num_threads))
+ sd->pieces[iam][s].end =
+ std::lower_bound(sd->sorting_places[s],
+ sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s],
+ sd->samples[num_samples * (iam + 1)], comp)
- sd->sorting_places[s];
else
// Absolute end.
sd->pieces[iam][s].end = sd->starts[s + 1] - sd->starts[s];
}
-
}
else if (Settings::sort_splitting == Settings::EXACT)
{
#pragma omp barrier
- std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> > seqs(d->num_threads);
- for (int s = 0; s < d->num_threads; s++)
- seqs[s] = std::make_pair(sd->sorting_places[s], sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s]);
+ std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> >
+ seqs(sd->num_threads);
+ for (int s = 0; s < sd->num_threads; s++)
+ seqs[s] = std::make_pair(sd->sorting_places[s],
+ sd->sorting_places[s] + sd->starts[s + 1] - sd->starts[s]);
- std::vector<SortingPlacesIterator> offsets(d->num_threads);
+ std::vector<SortingPlacesIterator> offsets(sd->num_threads);
- // If not last thread.
- if (iam < d->num_threads - 1)
- multiseq_partition(seqs.begin(), seqs.end(), sd->starts[iam + 1], offsets.begin(), comp);
+ // if not last thread
+ if (iam < sd->num_threads - 1)
+ multiseq_partition(seqs.begin(), seqs.end(),
+ sd->starts[iam + 1], offsets.begin(), comp);
- for (int seq = 0; seq < d->num_threads; seq++)
+ for (int seq = 0; seq < sd->num_threads; seq++)
{
- // For each sequence.
- if (iam < (d->num_threads - 1))
+ // for each sequence
+ if (iam < (sd->num_threads - 1))
sd->pieces[iam][seq].end = offsets[seq] - seqs[seq].first;
else
- // Absolute end of this sequence.
+ // very end of this sequence
sd->pieces[iam][seq].end = sd->starts[seq + 1] - sd->starts[seq];
}
#pragma omp barrier
- for (int seq = 0; seq < d->num_threads; seq++)
+ for (int seq = 0; seq < sd->num_threads; seq++)
{
// For each sequence.
if (iam > 0)
@@ -269,7 +274,7 @@
// Offset from target begin, length after merging.
difference_type offset = 0, length_am = 0;
- for (int s = 0; s < d->num_threads; s++)
+ for (int s = 0; s < sd->num_threads; s++)
{
length_am += sd->pieces[iam][s].end - sd->pieces[iam][s].begin;
offset += sd->pieces[iam][s].begin;
@@ -279,33 +284,30 @@
// Merge to temporary storage, uninitialized creation not possible
// since there is no multiway_merge calling the placement new
// instead of the assignment operator.
- sd->merging_places[iam] = sd->temporaries[iam] = static_cast<value_type*>(::operator new(sizeof(value_type) * length_am));
+ sd->merging_places[iam] = sd->temporaries[iam] =
+ static_cast<value_type*>(
+ ::operator new(sizeof(value_type) * length_am));
#else
// Merge directly to target.
sd->merging_places[iam] = sd->source + offset;
#endif
- std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> > seqs(d->num_threads);
+ std::vector<std::pair<SortingPlacesIterator, SortingPlacesIterator> >
+ seqs(sd->num_threads);
- for (int s = 0; s < d->num_threads; s++)
+ for (int s = 0; s < sd->num_threads; s++)
{
- seqs[s] = std::make_pair(sd->sorting_places[s] + sd->pieces[iam][s].begin, sd->sorting_places[s] + sd->pieces[iam][s].end);
-
-#if _GLIBCXX_ASSERTIONS
- _GLIBCXX_PARALLEL_ASSERT(is_sorted(seqs[s].first, seqs[s].second, comp));
-#endif
+ seqs[s] = std::make_pair(sd->sorting_places[s] + sd->pieces[iam][s].begin,
+ sd->sorting_places[s] + sd->pieces[iam][s].end);
}
- multiway_merge(seqs.begin(), seqs.end(), sd->merging_places[iam], comp, length_am, d->stable, false, sequential_tag());
+ multiway_merge(seqs.begin(), seqs.end(), sd->merging_places[iam], comp, length_am, sd->stable, false, sequential_tag());
-#if _GLIBCXX_ASSERTIONS
- _GLIBCXX_PARALLEL_ASSERT(is_sorted(sd->merging_places[iam], sd->merging_places[iam] + length_am, comp));
-#endif
+ #pragma omp barrier
-# pragma omp barrier
-
#if _GLIBCXX_MULTIWAY_MERGESORT_COPY_LAST
// Write back.
- std::copy(sd->merging_places[iam], sd->merging_places[iam] + length_am,
+ std::copy(sd->merging_places[iam],
+ sd->merging_places[iam] + length_am,
sd->source + offset);
#endif
@@ -322,13 +324,14 @@
*/
template<typename RandomAccessIterator, typename Comparator>
inline void
- parallel_sort_mwms(RandomAccessIterator begin, RandomAccessIterator end,
+ parallel_sort_mwms(RandomAccessIterator begin, RandomAccessIterator end,
Comparator comp,
- typename std::iterator_traits<RandomAccessIterator>::difference_type n,
- int num_threads, bool stable)
+ typename std::iterator_traits<RandomAccessIterator>::difference_type n,
+ int num_threads,
+ bool stable)
{
_GLIBCXX_CALL(n)
-
+
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
@@ -336,12 +339,21 @@
if (n <= 1)
return;
- // At least one element per thread.
+ // at least one element per thread
if (num_threads > n)
num_threads = static_cast<thread_index_t>(n);
+ // shared variables
PMWMSSortingData<RandomAccessIterator> sd;
+ difference_type* starts;
+ #pragma omp parallel num_threads(num_threads)
+ {
+ num_threads = omp_get_num_threads(); //no more threads than requested
+
+ #pragma omp single
+ {
+ sd.num_threads = num_threads;
sd.source = begin;
sd.temporaries = new value_type*[num_threads];
@@ -355,12 +367,10 @@
if (Settings::sort_splitting == Settings::SAMPLING)
{
- unsigned int sz = Settings::sort_mwms_oversampling * num_threads - 1;
- sz *= num_threads;
-
- // Equivalent to value_type[sz], without need of default construction.
- sz *= sizeof(value_type);
- sd.samples = static_cast<value_type*>(::operator new(sz));
+ unsigned int size =
+ (Settings::sort_mwms_oversampling * num_threads - 1) * num_threads;
+ sd.samples = static_cast<value_type*>(
+ ::operator new(size * sizeof(value_type)));
}
else
sd.samples = NULL;
@@ -369,28 +379,24 @@
sd.pieces = new std::vector<Piece<difference_type> >[num_threads];
for (int s = 0; s < num_threads; s++)
sd.pieces[s].resize(num_threads);
- PMWMSSorterPU<RandomAccessIterator>* pus = new PMWMSSorterPU<RandomAccessIterator>[num_threads];
- difference_type* starts = sd.starts = new difference_type[num_threads + 1];
+ starts = sd.starts = new difference_type[num_threads + 1];
+ sd.stable = stable;
difference_type chunk_length = n / num_threads;
difference_type split = n % num_threads;
- difference_type start = 0;
+ difference_type pos = 0;
for (int i = 0; i < num_threads; i++)
{
- starts[i] = start;
- start += (i < split) ? (chunk_length + 1) : chunk_length;
- pus[i].num_threads = num_threads;
- pus[i].iam = i;
- pus[i].sd = &sd;
- pus[i].stable = stable;
+ starts[i] = pos;
+ pos += (i < split) ? (chunk_length + 1) : chunk_length;
}
- starts[num_threads] = start;
+ starts[num_threads] = pos;
+ }
// Now sort in parallel.
-#pragma omp parallel num_threads(num_threads)
- parallel_sort_mwms_pu(&(pus[omp_get_thread_num()]), comp);
+ parallel_sort_mwms_pu(&sd, comp);
+ }
- // XXX sd as RAII
delete[] starts;
delete[] sd.temporaries;
delete[] sd.sorting_places;
@@ -401,10 +407,7 @@
delete[] sd.offsets;
delete[] sd.pieces;
-
- delete[] pus;
}
+} //namespace __gnu_parallel
-}
-
#endif
Index: include/parallel/search.h
===================================================================
--- include/parallel/search.h (revision 130225)
+++ include/parallel/search.h (working copy)
@@ -103,27 +103,32 @@
// Where is first occurrence of pattern? defaults to end.
difference_type result = (end1 - begin1);
+ difference_type *splitters;
// Pattern too long.
if (input_length < 0)
return end1;
- thread_index_t num_threads = std::max<difference_type>(1, std::min<difference_type>(input_length, __gnu_parallel::get_max_threads()));
-
omp_lock_t result_lock;
omp_init_lock(&result_lock);
- difference_type borders[num_threads + 1];
- __gnu_parallel::equally_split(input_length, num_threads, borders);
+ thread_index_t num_threads = std::max<difference_type>(1, std::min<difference_type>(input_length, __gnu_parallel::get_max_threads()));
difference_type advances[pattern_length];
calc_borders(begin2, pattern_length, advances);
-#pragma omp parallel num_threads(num_threads)
+ #pragma omp parallel num_threads(num_threads)
{
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ splitters = new difference_type[num_threads + 1];
+ equally_split(input_length, num_threads, splitters);
+ }
+
thread_index_t iam = omp_get_thread_num();
- difference_type start = borders[iam], stop = borders[iam + 1];
+ difference_type start = splitters[iam], stop = splitters[iam + 1];
difference_type pos_in_pattern = 0;
bool found_pattern = false;
@@ -131,7 +136,7 @@
while (start <= stop && !found_pattern)
{
// Get new value of result.
-#pragma omp flush(result)
+ #pragma omp flush(result)
// No chance for this thread to find first occurrence.
if (result < start)
break;
@@ -153,10 +158,12 @@
start += (pos_in_pattern - advances[pos_in_pattern]);
pos_in_pattern = (advances[pos_in_pattern] < 0) ? 0 : advances[pos_in_pattern];
}
- }
+ } //parallel
omp_destroy_lock(&result_lock);
+ delete[] splitters;
+
// Return iterator on found element.
return (begin1 + result);
}
Index: include/parallel/partition.h
===================================================================
--- include/parallel/partition.h (revision 130225)
+++ include/parallel/partition.h (working copy)
@@ -54,12 +54,12 @@
* @param begin Begin iterator of input sequence to split.
* @param end End iterator of input sequence to split.
* @param pred Partition predicate, possibly including some kind of pivot.
- * @param max_num_threads Maximum number of threads to use for this task.
+ * @param num_threads Maximum number of threads to use for this task.
* @return Number of elements not fulfilling the predicate. */
template<typename RandomAccessIterator, typename Predicate>
- inline typename std::iterator_traits<RandomAccessIterator>::difference_type
+ typename std::iterator_traits<RandomAccessIterator>::difference_type
parallel_partition(RandomAccessIterator begin, RandomAccessIterator end,
- Predicate pred, thread_index_t max_num_threads)
+ Predicate pred, thread_index_t num_threads)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -74,25 +74,35 @@
_GLIBCXX_VOLATILE difference_type leftover_left, leftover_right;
_GLIBCXX_VOLATILE difference_type leftnew, rightnew;
- bool* reserved_left, * reserved_right;
+ bool* reserved_left = NULL, * reserved_right = NULL;
- reserved_left = new bool[max_num_threads];
- reserved_right = new bool[max_num_threads];
-
difference_type chunk_size;
- if (Settings::partition_chunk_share > 0.0)
- chunk_size = std::max((difference_type)Settings::partition_chunk_size, (difference_type)((double)n * Settings::partition_chunk_share / (double)max_num_threads));
- else
- chunk_size = Settings::partition_chunk_size;
omp_lock_t result_lock;
omp_init_lock(&result_lock);
- // At least good for two processors.
- while (right - left + 1 >= 2 * max_num_threads * chunk_size)
+ //at least two chunks per thread
+ if(right - left + 1 >= 2 * num_threads * chunk_size)
+ #pragma omp parallel num_threads(num_threads)
{
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ reserved_left = new bool[num_threads];
+ reserved_right = new bool[num_threads];
+
+ if (Settings::partition_chunk_share > 0.0)
+ chunk_size = std::max<difference_type>(Settings::partition_chunk_size,
+ (double)n * Settings::partition_chunk_share / (double)num_threads);
+ else
+ chunk_size = Settings::partition_chunk_size;
+ }
+
+ while (right - left + 1 >= 2 * num_threads * chunk_size)
+ {
+ #pragma omp single
+ {
difference_type num_chunks = (right - left + 1) / chunk_size;
- thread_index_t num_threads = (int)std::min((difference_type)max_num_threads, num_chunks / 2);
for (int r = 0; r < num_threads; r++)
{
@@ -101,9 +111,8 @@
}
leftover_left = 0;
leftover_right = 0;
+ } //implicit barrier
-#pragma omp parallel num_threads(num_threads)
- {
// Private.
difference_type thread_left, thread_left_border, thread_right, thread_right_border;
thread_left = left + 1;
@@ -167,21 +176,21 @@
// Now swap the leftover chunks to the right places.
if (thread_left <= thread_left_border)
-#pragma omp atomic
+ #pragma omp atomic
leftover_left++;
if (thread_right >= thread_right_border)
-#pragma omp atomic
+ #pragma omp atomic
leftover_right++;
-#pragma omp barrier
+ #pragma omp barrier
-#pragma omp single
+ #pragma omp single
{
leftnew = left - leftover_left * chunk_size;
rightnew = right + leftover_right * chunk_size;
}
-#pragma omp barrier
+ #pragma omp barrier
// <=> thread_left_border + (chunk_size - 1) >= leftnew
if (thread_left <= thread_left_border
@@ -199,7 +208,7 @@
reserved_right[((thread_right_border - 1) - right) / chunk_size] = true;
}
-#pragma omp barrier
+ #pragma omp barrier
if (thread_left <= thread_left_border && thread_left_border < leftnew)
{
@@ -219,11 +228,12 @@
_GLIBCXX_PARALLEL_ASSERT(swapstart != -1);
#endif
- std::swap_ranges(begin + thread_left_border - (chunk_size - 1), begin + thread_left_border + 1, begin + swapstart);
+ std::swap_ranges(begin + thread_left_border - (chunk_size - 1),
+ begin + thread_left_border + 1,
+ begin + swapstart);
}
- if (thread_right >= thread_right_border
- && thread_right_border > rightnew)
+ if (thread_right >= thread_right_border && thread_right_border > rightnew)
{
// Find spot and swap
difference_type swapstart = -1;
@@ -241,12 +251,14 @@
_GLIBCXX_PARALLEL_ASSERT(swapstart != -1);
#endif
- std::swap_ranges(begin + thread_right_border, begin + thread_right_border + chunk_size, begin + swapstart);
+ std::swap_ranges(begin + thread_right_border,
+ begin + thread_right_border + chunk_size,
+ begin + swapstart);
}
#if _GLIBCXX_ASSERTIONS
-#pragma omp barrier
+ #pragma omp barrier
-#pragma omp single
+ #pragma omp single
{
for (int r = 0; r < leftover_left; r++)
_GLIBCXX_PARALLEL_ASSERT(reserved_left[r]);
@@ -254,14 +266,16 @@
_GLIBCXX_PARALLEL_ASSERT(reserved_right[r]);
}
-#pragma omp barrier
+ #pragma omp barrier
#endif
-#pragma omp barrier
+ #pragma omp barrier
+
left = leftnew;
right = rightnew;
}
- } // end "recursion"
+ #pragma omp flush(left, right)
+ } // end "recursion" //parallel
difference_type final_left = left, final_right = right;
Index: include/parallel/partial_sum.h
===================================================================
--- include/parallel/partial_sum.h (revision 130225)
+++ include/parallel/partial_sum.h (working copy)
@@ -58,7 +58,8 @@
* @return End iterator of output sequence. */
template<typename InputIterator, typename OutputIterator, typename BinaryOperation>
inline OutputIterator
- parallel_partial_sum_basecase(InputIterator begin, InputIterator end,
+ parallel_partial_sum_basecase(
+ InputIterator begin, InputIterator end,
OutputIterator result, BinaryOperation bin_op,
typename std::iterator_traits<InputIterator>::value_type value)
{
@@ -87,24 +88,34 @@
*/
template<typename InputIterator, typename OutputIterator, typename BinaryOperation>
OutputIterator
- parallel_partial_sum_linear(InputIterator begin, InputIterator end,
+ parallel_partial_sum_linear(
+ InputIterator begin, InputIterator end,
OutputIterator result, BinaryOperation bin_op,
- typename std::iterator_traits<InputIterator>::difference_type n, int num_threads)
+ typename std::iterator_traits<InputIterator>::difference_type n)
{
typedef std::iterator_traits<InputIterator> traits_type;
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
- if (num_threads > (n - 1))
- num_threads = static_cast<thread_index_t>(n - 1);
+ thread_index_t num_threads = std::min<difference_type>(get_max_threads(), n - 1);
+
if (num_threads < 2)
{
*result = *begin;
return parallel_partial_sum_basecase(begin + 1, end, result + 1, bin_op, *begin);
}
- difference_type* borders = static_cast<difference_type*>(__builtin_alloca(sizeof(difference_type) * (num_threads + 2)));
+ difference_type* borders;
+ value_type* sums;
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+
+ borders = new difference_type[num_threads + 2];
+
if (Settings::partial_sum_dilatation == 1.0f)
equally_split(n, num_threads + 1, borders);
else
@@ -119,43 +130,43 @@
borders[num_threads + 1] = n;
}
- value_type* sums = static_cast<value_type*>(::operator new(sizeof(value_type) * num_threads));
+ sums = static_cast<value_type*>(::operator new(sizeof(value_type) * num_threads));
OutputIterator target_end;
+ } //single
-#pragma omp parallel num_threads(num_threads)
+ int iam = omp_get_thread_num();
+ if (iam == 0)
{
- int id = omp_get_thread_num();
- if (id == 0)
- {
*result = *begin;
- parallel_partial_sum_basecase(begin + 1, begin + borders[1],
+ parallel_partial_sum_basecase(begin + 1, begin + borders[1],
result + 1, bin_op, *begin);
sums[0] = *(result + borders[1] - 1);
}
else
{
- sums[id] = std::accumulate(begin + borders[id] + 1,
- begin + borders[id + 1],
- *(begin + borders[id]),
+ sums[iam] = std::accumulate(begin + borders[iam] + 1,
+ begin + borders[iam + 1],
+ *(begin + borders[iam]),
bin_op, __gnu_parallel::sequential_tag());
}
-#pragma omp barrier
+ #pragma omp barrier
-#pragma omp single
- parallel_partial_sum_basecase(sums + 1, sums + num_threads, sums + 1,
+ #pragma omp single
+ parallel_partial_sum_basecase(sums + 1, sums + num_threads, sums + 1,
bin_op, sums[0]);
-#pragma omp barrier
+ #pragma omp barrier
// Still same team.
- parallel_partial_sum_basecase(begin + borders[id + 1],
- begin + borders[id + 2],
- result + borders[id + 1], bin_op,
- sums[id]);
- }
+ parallel_partial_sum_basecase(begin + borders[iam + 1],
+ begin + borders[iam + 2],
+ result + borders[iam + 1], bin_op,
+ sums[iam]);
+ } //parallel
- delete [] sums;
+ delete[] sums;
+ delete[] borders;
return result + n;
}
@@ -171,7 +182,7 @@
parallel_partial_sum(InputIterator begin, InputIterator end,
OutputIterator result, BinaryOperation bin_op)
{
- _GLIBCXX_CALL(begin - end);
+ _GLIBCXX_CALL(begin - end)
typedef std::iterator_traits<InputIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -179,14 +190,11 @@
difference_type n = end - begin;
- int num_threads = get_max_threads();
-
switch (Settings::partial_sum_algorithm)
{
case Settings::LINEAR:
// Need an initial offset.
- return parallel_partial_sum_linear(begin, end, result, bin_op,
- n, num_threads);
+ return parallel_partial_sum_linear(begin, end, result, bin_op, n);
default:
// Partial_sum algorithm not implemented.
_GLIBCXX_PARALLEL_ASSERT(0);
Index: include/parallel/find.h
===================================================================
--- include/parallel/find.h (revision 130225)
+++ include/parallel/find.h (working copy)
@@ -10,7 +10,7 @@
// This library is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURstartE. See the GNU
// General Public License for more details.
// You should have received a copy of the GNU General Public License
@@ -100,27 +100,30 @@
typedef typename traits_type::value_type value_type;
difference_type length = end1 - begin1;
-
difference_type result = length;
+ difference_type* borders;
- const thread_index_t num_threads = get_max_threads();
omp_lock_t result_lock;
omp_init_lock(&result_lock);
- difference_type* borders = static_cast<difference_type*>(__builtin_alloca(sizeof(difference_type) * (num_threads + 1)));
-
+ thread_index_t num_threads = get_max_threads();
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ borders = new difference_type[num_threads + 1];
equally_split(length, num_threads, borders);
+ } //single
-#pragma omp parallel shared(result) num_threads(num_threads)
- {
- int iam = omp_get_thread_num();
- difference_type pos = borders[iam], limit = borders[iam + 1];
+ thread_index_t iam = omp_get_thread_num();
+ difference_type start = borders[iam], stop = borders[iam + 1];
- RandomAccessIterator1 i1 = begin1 + pos;
- RandomAccessIterator2 i2 = begin2 + pos;
- for (; pos < limit; pos++)
+ RandomAccessIterator1 i1 = begin1 + start;
+ RandomAccessIterator2 i2 = begin2 + start;
+ for (difference_type pos = start; pos < stop; ++pos)
{
-#pragma omp flush(result)
+ #pragma omp flush(result)
// Result has been set to something lower.
if (result < pos)
break;
@@ -128,17 +131,19 @@
if (selector(i1, i2, pred))
{
omp_set_lock(&result_lock);
- if (result > pos)
+ if (pos < result)
result = pos;
omp_unset_lock(&result_lock);
break;
}
- i1++;
- i2++;
+ ++i1;
+ ++i2;
}
- }
+ } //parallel
omp_destroy_lock(&result_lock);
+ delete[] borders;
+
return std::pair<RandomAccessIterator1, RandomAccessIterator2>(begin1 + result, begin2 + result);
}
@@ -192,20 +197,23 @@
return find_seq_result;
// Index of beginning of next free block (after sequential find).
- difference_type next_block_pos = sequential_search_size;
+ difference_type next_block_start = sequential_search_size;
difference_type result = length;
- const thread_index_t num_threads = get_max_threads();
omp_lock_t result_lock;
omp_init_lock(&result_lock);
-#pragma omp parallel shared(result) num_threads(num_threads)
+ thread_index_t num_threads = get_max_threads();
+ #pragma omp parallel shared(result) num_threads(num_threads)
{
+ #pragma omp single
+ num_threads = omp_get_num_threads();
+
// Not within first k elements -> start parallel.
thread_index_t iam = omp_get_thread_num();
difference_type block_size = Settings::find_initial_block_size;
- difference_type start = fetch_and_add<difference_type>(&next_block_pos, block_size);
+ difference_type start = fetch_and_add<difference_type>(&next_block_start, block_size);
// Get new block, update pointer to next block.
difference_type stop = std::min<difference_type>(length, start + block_size);
@@ -214,7 +222,7 @@
while (start < length)
{
-#pragma omp flush(result)
+ #pragma omp flush(result)
// Get new value of result.
if (result < start)
{
@@ -231,7 +239,7 @@
result = local_result.first - begin1;
// Result cannot be in future blocks, stop algorithm.
- fetch_and_add<difference_type>(&next_block_pos, length);
+ fetch_and_add<difference_type>(&next_block_start, length);
}
omp_unset_lock(&result_lock);
}
@@ -239,10 +247,10 @@
block_size = std::min<difference_type>(block_size * Settings::find_increasing_factor, Settings::find_maximum_block_size);
// Get new block, update pointer to next block.
- start = fetch_and_add<difference_type>(&next_block_pos, block_size);
+ start = fetch_and_add<difference_type>(&next_block_start, block_size);
stop = (length < (start + block_size)) ? length : (start + block_size);
}
- }
+ } //parallel
omp_destroy_lock(&result_lock);
@@ -295,37 +303,38 @@
return find_seq_result;
difference_type result = length;
- const thread_index_t num_threads = get_max_threads();
-
omp_lock_t result_lock;
omp_init_lock(&result_lock);
// Not within first sequential_search_size elements -> start parallel.
-#pragma omp parallel shared(result) num_threads(num_threads)
+
+ thread_index_t num_threads = get_max_threads();
+ #pragma omp parallel shared(result) num_threads(num_threads)
{
+ #pragma omp single
+ num_threads = omp_get_num_threads();
+
thread_index_t iam = omp_get_thread_num();
difference_type block_size = Settings::find_initial_block_size;
- difference_type start, stop;
-
// First element of thread's current iteration.
difference_type iteration_start = sequential_search_size;
// Where to work (initialization).
- start = iteration_start + iam * block_size;
- stop = std::min<difference_type>(length, start + block_size);
+ difference_type start = iteration_start + iam * block_size;
+ difference_type stop = std::min<difference_type>(length, start + block_size);
std::pair<RandomAccessIterator1, RandomAccessIterator2> local_result;
while (start < length)
{
// Get new value of result.
-#pragma omp flush(result)
+ #pragma omp flush(result)
// No chance to find first element.
if (result < start)
break;
-
- local_result = selector.sequential_algorithm(begin1 + start, begin1 + stop, begin2 + start, pred);
+ local_result = selector.sequential_algorithm(begin1 + start, begin1 + stop,
+ begin2 + start, pred);
if (local_result.first != (begin1 + stop))
{
omp_set_lock(&result_lock);
@@ -342,7 +351,7 @@
start = iteration_start + iam * block_size;
stop = std::min<difference_type>(length, start + block_size);
}
- }
+ } //parallel
omp_destroy_lock(&result_lock);
@@ -353,4 +362,3 @@
} // end namespace
#endif
-
Index: include/parallel/omp_loop.h
===================================================================
--- include/parallel/omp_loop.h (revision 130225)
+++ include/parallel/omp_loop.h (working copy)
@@ -43,6 +43,7 @@
#include <parallel/settings.h>
#include <parallel/basic_iterator.h>
+#include <parallel/base.h>
namespace __gnu_parallel
{
@@ -63,34 +64,47 @@
* std::count_n()).
* @return User-supplied functor (that may contain a part of the result).
*/
- template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
+ template<typename RandomAccessIterator,
+ typename Op,
+ typename Fu,
+ typename Red,
+ typename Result>
Op
- for_each_template_random_access_omp_loop(RandomAccessIterator begin, RandomAccessIterator end, Op o, Fu& f, Red r, Result base, Result& output, typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
+ for_each_template_random_access_omp_loop(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op o, Fu& f, Red r, Result base, Result& output,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
- typedef typename std::iterator_traits<RandomAccessIterator>::difference_type difference_type;
+ typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
+ difference_type;
- thread_index_t num_threads = (get_max_threads() < (end - begin)) ? get_max_threads() : static_cast<thread_index_t>((end - begin));
- Result *thread_results = new Result[num_threads];
difference_type length = end - begin;
+ thread_index_t num_threads = __gnu_parallel::min<difference_type>(get_max_threads(), length);
- for (thread_index_t i = 0; i < num_threads; i++)
- {
- thread_results[i] = r(thread_results[i], f(o, begin+i));
- }
+ Result *thread_results;
-#pragma omp parallel num_threads(num_threads)
+ #pragma omp parallel num_threads(num_threads)
{
-#pragma omp for schedule(dynamic, Settings::workstealing_chunk_size)
- for (difference_type pos = 0; pos < length; pos++)
+ #pragma omp single
{
- thread_results[omp_get_thread_num()] = r(thread_results[omp_get_thread_num()], f(o, begin+pos));
+ num_threads = omp_get_num_threads();
+ thread_results = new Result[num_threads];
+
+ for (thread_index_t i = 0; i < num_threads; i++)
+ thread_results[i] = Result();
}
- }
+ thread_index_t iam = omp_get_thread_num();
+
+ #pragma omp for schedule(dynamic, Settings::workstealing_chunk_size)
+ for (difference_type pos = 0; pos < length; pos++)
+ thread_results[iam] =
+ r(thread_results[iam], f(o, begin+pos));
+ } //parallel
+
for (thread_index_t i = 0; i < num_threads; i++)
- {
output = r(output, thread_results[i]);
- }
delete [] thread_results;
@@ -100,6 +114,7 @@
return o;
}
+
} // end namespace
#endif
Index: include/parallel/multiway_merge.h
===================================================================
--- include/parallel/multiway_merge.h (revision 130225)
+++ include/parallel/multiway_merge.h (working copy)
@@ -1334,182 +1334,183 @@
template<typename RandomAccessIteratorIterator, typename RandomAccessIterator3, typename _DifferenceTp, typename Comparator>
RandomAccessIterator3
parallel_multiway_merge(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, Comparator comp, _DifferenceTp length, bool stable, bool sentinel)
- {
- _GLIBCXX_CALL(length)
+ {
+ _GLIBCXX_CALL(length)
- typedef _DifferenceTp difference_type;
- typedef typename std::iterator_traits<RandomAccessIteratorIterator>::value_type::first_type
- RandomAccessIterator1;
- typedef typename std::iterator_traits<RandomAccessIterator1>::value_type
- value_type;
+ typedef _DifferenceTp difference_type;
+ typedef typename std::iterator_traits<RandomAccessIteratorIterator>::value_type::first_type
+ RandomAccessIterator1;
+ typedef typename std::iterator_traits<RandomAccessIterator1>::value_type
+ value_type;
-#if _GLIBCXX_ASSERTIONS
- for (RandomAccessIteratorIterator rii = seqs_begin; rii != seqs_end; rii++)
- _GLIBCXX_PARALLEL_ASSERT(is_sorted((*rii).first, (*rii).second, comp));
-#endif
+ // k sequences.
+ int k = static_cast<int>(seqs_end - seqs_begin);
- // k sequences.
- int k = static_cast<int>(seqs_end - seqs_begin);
+ difference_type total_length = 0;
+ for (RandomAccessIteratorIterator raii = seqs_begin; raii != seqs_end; raii++)
+ total_length += LENGTH(*raii);
- difference_type total_length = 0;
- for (RandomAccessIteratorIterator raii = seqs_begin; raii != seqs_end; raii++)
- total_length += LENGTH(*raii);
+ _GLIBCXX_CALL(total_length)
- _GLIBCXX_CALL(total_length)
+ if (total_length == 0 || k == 0)
+ return target;
- if (total_length == 0 || k == 0)
- return target;
+ bool tight = (total_length == length);
- thread_index_t num_threads = static_cast<thread_index_t>(std::min(static_cast<difference_type>(get_max_threads()), total_length));
+ std::vector<std::pair<difference_type, difference_type> >* pieces;
- bool tight = (total_length == length);
+ thread_index_t num_threads = static_cast<thread_index_t>(std::min(static_cast<difference_type>(get_max_threads()), total_length));
- // Thread t will have to merge pieces[iam][0..k - 1]
- std::vector<std::pair<difference_type, difference_type> >* pieces = new std::vector<std::pair<difference_type, difference_type> >[num_threads];
- for (int s = 0; s < num_threads; s++)
- pieces[s].resize(k);
+ #pragma omp parallel num_threads (num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ // Thread t will have to merge pieces[iam][0..k - 1]
+ pieces = new std::vector<std::pair<difference_type, difference_type> >[num_threads];
+ for (int s = 0; s < num_threads; s++)
+ pieces[s].resize(k);
- difference_type num_samples = Settings::merge_oversampling * num_threads;
+ difference_type num_samples = Settings::merge_oversampling * num_threads;
- if (Settings::multiway_merge_splitting == Settings::SAMPLING)
- {
- value_type* samples = static_cast<value_type*>(::operator new(sizeof(value_type) * k * num_samples));
- // Sample.
- for (int s = 0; s < k; s++)
- for (int i = 0; (difference_type)i < num_samples; i++)
- {
- difference_type sample_index = static_cast<difference_type>(LENGTH(seqs_begin[s]) * (double(i + 1) / (num_samples + 1)) * (double(length) / total_length));
- samples[s * num_samples + i] = seqs_begin[s].first[sample_index];
- }
+ if (Settings::multiway_merge_splitting == Settings::SAMPLING)
+ {
+ value_type* samples = static_cast<value_type*>(::operator new(sizeof(value_type) * k * num_samples));
+ // Sample.
+ for (int s = 0; s < k; s++)
+ for (int i = 0; (difference_type)i < num_samples; i++)
+ {
+ difference_type sample_index = static_cast<difference_type>(LENGTH(seqs_begin[s]) * (double(i + 1) / (num_samples + 1)) * (double(length) / total_length));
+ samples[s * num_samples + i] = seqs_begin[s].first[sample_index];
+ }
- if (stable)
- __gnu_sequential::stable_sort(samples, samples + (num_samples * k), comp);
- else
- __gnu_sequential::sort(samples, samples + (num_samples * k), comp);
+ if (stable)
+ __gnu_sequential::stable_sort(samples, samples + (num_samples * k), comp);
+ else
+ __gnu_sequential::sort(samples, samples + (num_samples * k), comp);
- for (int slab = 0; slab < num_threads; slab++)
- // For each slab / processor.
- for (int seq = 0; seq < k; seq++)
- {
- // For each sequence.
- if (slab > 0)
- pieces[slab][seq].first = std::upper_bound(seqs_begin[seq].first, seqs_begin[seq].second, samples[num_samples * k * slab / num_threads], comp) - seqs_begin[seq].first;
- else
- {
- // Absolute beginning.
- pieces[slab][seq].first = 0;
- }
- if ((slab + 1) < num_threads)
- pieces[slab][seq].second = std::upper_bound(seqs_begin[seq].first, seqs_begin[seq].second, samples[num_samples * k * (slab + 1) / num_threads], comp) - seqs_begin[seq].first;
- else
- pieces[slab][seq].second = LENGTH(seqs_begin[seq]); //absolute ending
- }
- delete[] samples;
- }
- else
- {
- // (Settings::multiway_merge_splitting == Settings::EXACT).
- std::vector<RandomAccessIterator1>* offsets = new std::vector<RandomAccessIterator1>[num_threads];
- std::vector<std::pair<RandomAccessIterator1, RandomAccessIterator1> > se(k);
+ for (int slab = 0; slab < num_threads; slab++)
+ // For each slab / processor.
+ for (int seq = 0; seq < k; seq++)
+ {
+ // For each sequence.
+ if (slab > 0)
+ pieces[slab][seq].first = std::upper_bound(seqs_begin[seq].first, seqs_begin[seq].second, samples[num_samples * k * slab / num_threads], comp) - seqs_begin[seq].first;
+ else
+ {
+ // Absolute beginning.
+ pieces[slab][seq].first = 0;
+ }
+ if ((slab + 1) < num_threads)
+ pieces[slab][seq].second = std::upper_bound(seqs_begin[seq].first, seqs_begin[seq].second, samples[num_samples * k * (slab + 1) / num_threads], comp) - seqs_begin[seq].first;
+ else
+ pieces[slab][seq].second = LENGTH(seqs_begin[seq]); //absolute ending
+ }
+ delete[] samples;
+ }
+ else
+ {
+ // (Settings::multiway_merge_splitting == Settings::EXACT).
+ std::vector<RandomAccessIterator1>* offsets = new std::vector<RandomAccessIterator1>[num_threads];
+ std::vector<std::pair<RandomAccessIterator1, RandomAccessIterator1> > se(k);
- copy(seqs_begin, seqs_end, se.begin());
+ copy(seqs_begin, seqs_end, se.begin());
- difference_type* borders = static_cast<difference_type*>(__builtin_alloca(sizeof(difference_type) * (num_threads + 1)));
- equally_split(length, num_threads, borders);
+ difference_type* borders = new difference_type[num_threads + 1];
+ equally_split(length, num_threads, borders);
- for (int s = 0; s < (num_threads - 1); s++)
- {
- offsets[s].resize(k);
- multiseq_partition(se.begin(), se.end(), borders[s + 1],
- offsets[s].begin(), comp);
+ for (int s = 0; s < (num_threads - 1); s++)
+ {
+ offsets[s].resize(k);
+ multiseq_partition(se.begin(), se.end(), borders[s + 1],
+ offsets[s].begin(), comp);
- // Last one also needed and available.
- if (!tight)
- {
- offsets[num_threads - 1].resize(k);
- multiseq_partition(se.begin(), se.end(),
- difference_type(length),
- offsets[num_threads - 1].begin(), comp);
- }
- }
+ // Last one also needed and available.
+ if (!tight)
+ {
+ offsets[num_threads - 1].resize(k);
+ multiseq_partition(se.begin(), se.end(),
+ difference_type(length),
+ offsets[num_threads - 1].begin(), comp);
+ }
+ }
- for (int slab = 0; slab < num_threads; slab++)
- {
- // For each slab / processor.
- for (int seq = 0; seq < k; seq++)
- {
- // For each sequence.
- if (slab == 0)
- {
- // Absolute beginning.
- pieces[slab][seq].first = 0;
- }
- else
- pieces[slab][seq].first = pieces[slab - 1][seq].second;
- if (!tight || slab < (num_threads - 1))
- pieces[slab][seq].second = offsets[slab][seq] - seqs_begin[seq].first;
- else
- {
- // slab == num_threads - 1
- pieces[slab][seq].second = LENGTH(seqs_begin[seq]);
- }
- }
- }
- delete[] offsets;
- }
+ for (int slab = 0; slab < num_threads; slab++)
+ {
+ // For each slab / processor.
+ for (int seq = 0; seq < k; seq++)
+ {
+ // For each sequence.
+ if (slab == 0)
+ {
+ // Absolute beginning.
+ pieces[slab][seq].first = 0;
+ }
+ else
+ pieces[slab][seq].first = pieces[slab - 1][seq].second;
+ if (!tight || slab < (num_threads - 1))
+ pieces[slab][seq].second = offsets[slab][seq] - seqs_begin[seq].first;
+ else
+ {
+ // slab == num_threads - 1
+ pieces[slab][seq].second = LENGTH(seqs_begin[seq]);
+ }
+ }
+ }
+ delete[] offsets;
+ }
+ } //single
-# pragma omp parallel num_threads(num_threads)
- {
- thread_index_t iam = omp_get_thread_num();
+ thread_index_t iam = omp_get_thread_num();
- difference_type target_position = 0;
+ difference_type target_position = 0;
- for (int c = 0; c < k; c++)
- target_position += pieces[iam][c].first;
+ for (int c = 0; c < k; c++)
+ target_position += pieces[iam][c].first;
- if (k > 2)
- {
- std::pair<RandomAccessIterator1, RandomAccessIterator1>* chunks = new std::pair<RandomAccessIterator1, RandomAccessIterator1>[k];
+ if (k > 2)
+ {
+ std::pair<RandomAccessIterator1, RandomAccessIterator1>* chunks = new std::pair<RandomAccessIterator1, RandomAccessIterator1>[k];
- difference_type local_length = 0;
- for (int s = 0; s < k; s++)
- {
- chunks[s] = std::make_pair(seqs_begin[s].first + pieces[iam][s].first, seqs_begin[s].first + pieces[iam][s].second);
- local_length += LENGTH(chunks[s]);
- }
+ difference_type local_length = 0;
+ for (int s = 0; s < k; s++)
+ {
+ chunks[s] = std::make_pair(seqs_begin[s].first + pieces[iam][s].first, seqs_begin[s].first + pieces[iam][s].second);
+ local_length += LENGTH(chunks[s]);
+ }
- multiway_merge(chunks, chunks + k, target + target_position, comp,
- std::min(local_length, length - target_position),
- stable, false, sequential_tag());
+ multiway_merge(chunks, chunks + k, target + target_position, comp,
+ std::min(local_length, length - target_position),
+ stable, false, sequential_tag());
- delete[] chunks;
- }
- else if (k == 2)
- {
- RandomAccessIterator1 begin0 = seqs_begin[0].first + pieces[iam][0].first, begin1 = seqs_begin[1].first + pieces[iam][1].first;
- merge_advance(begin0,
- seqs_begin[0].first + pieces[iam][0].second,
- begin1,
- seqs_begin[1].first + pieces[iam][1].second,
- target + target_position,
- (pieces[iam][0].second - pieces[iam][0].first) + (pieces[iam][1].second - pieces[iam][1].first),
- comp);
- }
- }
+ delete[] chunks;
+ }
+ else if (k == 2)
+ {
+ RandomAccessIterator1 begin0 = seqs_begin[0].first + pieces[iam][0].first, begin1 = seqs_begin[1].first + pieces[iam][1].first;
+ merge_advance(begin0,
+ seqs_begin[0].first + pieces[iam][0].second,
+ begin1,
+ seqs_begin[1].first + pieces[iam][1].second,
+ target + target_position,
+ (pieces[iam][0].second - pieces[iam][0].first) + (pieces[iam][1].second - pieces[iam][1].first),
+ comp);
+ }
+ } //parallel
-#if _GLIBCXX_ASSERTIONS
- _GLIBCXX_PARALLEL_ASSERT(is_sorted(target, target + length, comp));
-#endif
+ #if _GLIBCXX_ASSERTIONS
+ _GLIBCXX_PARALLEL_ASSERT(is_sorted(target, target + length, comp));
+ #endif
- // Update ends of sequences.
- for (int s = 0; s < k; s++)
- seqs_begin[s].first += pieces[num_threads - 1][s].second;
+ // Update ends of sequences.
+ for (int s = 0; s < k; s++)
+ seqs_begin[s].first += pieces[num_threads - 1][s].second;
- delete[] pieces;
+ delete[] pieces;
- return target + length;
- }
+ return target + length;
+ }
/**
* @brief Multi-way merging front-end.
Index: include/parallel/workstealing.h
===================================================================
--- include/parallel/workstealing.h (revision 130225)
+++ include/parallel/workstealing.h (working copy)
@@ -98,11 +98,12 @@
*/
template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
Op
- for_each_template_random_access_workstealing(RandomAccessIterator begin,
- RandomAccessIterator end,
- Op op, Fu& f, Red r,
- Result base, Result& output,
- typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
+ for_each_template_random_access_workstealing(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op op, Fu& f, Red r,
+ Result base, Result& output,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
_GLIBCXX_CALL(end - begin)
@@ -120,173 +121,177 @@
// Total number of threads currently working.
thread_index_t busy = 0;
- thread_index_t num_threads = get_max_threads();
- difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
+ Job<difference_type> *job;
+
omp_lock_t output_lock;
omp_init_lock(&output_lock);
- // No more threads than jobs, at least one thread.
- difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
- num_threads = static_cast<thread_index_t>(num_threads_max);
-
- // Create job description array.
- Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
-
// Write base value to output.
output = base;
-#pragma omp parallel shared(busy) num_threads(num_threads)
- {
- // Initialization phase.
+ // No more threads than jobs, at least one thread.
+ thread_index_t num_threads =
+ __gnu_parallel::max<thread_index_t>(1, __gnu_parallel::min<difference_type>(length, get_max_threads()));
- // Flags for every thread if it is doing productive work.
- bool iam_working = false;
+ #pragma omp parallel shared(busy) num_threads(num_threads)
+ {
- // Thread id.
- thread_index_t iam = omp_get_thread_num();
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
- // This job.
- Job<difference_type>& my_job = job[iam * stride];
+ // Create job description array.
+ job = new Job<difference_type>[num_threads * stride];
+ }
- // Random number (for work stealing).
- thread_index_t victim;
+ // Initialization phase.
- // Local value for reduction.
- Result result = Result();
+ // Flags for every thread if it is doing productive work.
+ bool iam_working = false;
- // Number of elements to steal in one attempt.
- difference_type steal;
+ // Thread id.
+ thread_index_t iam = omp_get_thread_num();
- // Every thread has its own random number generator (modulo num_threads).
- random_number rand_gen(iam, num_threads);
+ // This job.
+ Job<difference_type>& my_job = job[iam * stride];
-#pragma omp atomic
- // This thread is currently working.
- busy++;
+ // Random number (for work stealing).
+ thread_index_t victim;
- iam_working = true;
+ // Local value for reduction.
+ Result result = Result();
- // How many jobs per thread? last thread gets the rest.
- my_job.first = static_cast<difference_type>(iam * (length / num_threads));
+ // Number of elements to steal in one attempt.
+ difference_type steal;
- my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
- my_job.load = my_job.last - my_job.first + 1;
+ // Every thread has its own random number generator (modulo num_threads).
+ random_number rand_gen(iam, num_threads);
- // Init result with first value (to have a base value for reduction).
- if (my_job.first <= my_job.last)
- {
- // Cannot use volatile variable directly.
- difference_type my_first = my_job.first;
- result = f(op, begin + my_first);
- my_job.first++;
- my_job.load--;
- }
+ // This thread is currently working.
+ #pragma omp atomic
+ busy++;
- RandomAccessIterator current;
+ iam_working = true;
-#pragma omp barrier
+ // How many jobs per thread? last thread gets the rest.
+ my_job.first = static_cast<difference_type>(iam * (length / num_threads));
- // Actual work phase
- // Work on own or stolen start
- while (busy > 0)
- {
- // Work until no productive thread left.
-#pragma omp flush(busy)
+ my_job.last = (iam == (num_threads - 1)) ?
+ (length - 1) : ((iam + 1) * (length / num_threads) - 1);
+ my_job.load = my_job.last - my_job.first + 1;
- // Thread has own work to do
- while (my_job.first <= my_job.last)
- {
- // fetch-and-add call
- // Reserve current job block (size chunk_size) in my queue.
- difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
+ // Init result with first value (to have a base value for reduction).
+ if (my_job.first <= my_job.last)
+ {
+ // Cannot use volatile variable directly.
+ difference_type my_first = my_job.first;
+ result = f(op, begin + my_first);
+ my_job.first++;
+ my_job.load--;
+ }
- // Update load, to make the three values consistent,
- // first might have been changed in the meantime
- my_job.load = my_job.last - my_job.first + 1;
- for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
- {
- // Yes: process it!
- current = begin + current_job;
- current_job++;
+ RandomAccessIterator current;
- // Do actual work.
- result = r(result, f(op, current));
- }
+ #pragma omp barrier
-#pragma omp flush(busy)
+ // Actual work phase
+ // Work on own or stolen start
+ while (busy > 0)
+ {
+ // Work until no productive thread left.
+ #pragma omp flush(busy)
- }
+ // Thread has own work to do
+ while (my_job.first <= my_job.last)
+ {
+ // fetch-and-add call
+ // Reserve current job block (size chunk_size) in my queue.
+ difference_type current_job =
+ fetch_and_add<difference_type>(&(my_job.first), chunk_size);
- // After reaching this point, a thread's job list is empty.
- if (iam_working)
- {
-#pragma omp atomic
- // This thread no longer has work.
- busy--;
+ // Update load, to make the three values consistent,
+ // first might have been changed in the meantime
+ my_job.load = my_job.last - my_job.first + 1;
+ for (difference_type job_counter = 0;
+ job_counter < chunk_size && current_job <= my_job.last;
+ job_counter++)
+ {
+ // Yes: process it!
+ current = begin + current_job;
+ current_job++;
- iam_working = false;
- }
+ // Do actual work.
+ result = r(result, f(op, current));
+ }
- difference_type supposed_first, supposed_last, supposed_load;
- do
- {
- // Find random nonempty deque (not own) and do consistency check.
- yield();
-#pragma omp flush(busy)
- victim = rand_gen();
- supposed_first = job[victim * stride].first;
- supposed_last = job[victim * stride].last;
- supposed_load = job[victim * stride].load;
- }
- while (busy > 0
- && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
+ #pragma omp flush(busy)
+ }
- if (busy == 0)
- break;
+ // After reaching this point, a thread's job list is empty.
+ if (iam_working)
+ {
+ // This thread no longer has work.
+ #pragma omp atomic
+ busy--;
- if (supposed_load > 0)
- {
- // Has work and work to do.
- // Number of elements to steal (at least one).
- steal = (supposed_load < 2) ? 1 : supposed_load / 2;
+ iam_working = false;
+ }
- // Protects against stealing threads
- // omp_set_lock(&(job[victim * stride].lock));
+ difference_type supposed_first, supposed_last, supposed_load;
+ do
+ {
+ // Find random nonempty deque (not own) and do consistency check.
+ yield();
+ #pragma omp flush(busy)
+ victim = rand_gen();
+ supposed_first = job[victim * stride].first;
+ supposed_last = job[victim * stride].last;
+ supposed_load = job[victim * stride].load;
+ }
+ while (busy > 0
+ && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
- // Push victim's start forward.
- difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
- difference_type stolen_try = stolen_first + steal - difference_type(1);
+ if (busy == 0)
+ break;
- // Protects against working thread
- // omp_unset_lock(&(job[victim * stride].lock));
+ if (supposed_load > 0)
+ {
+ // Has work and work to do.
+ // Number of elements to steal (at least one).
+ steal = (supposed_load < 2) ? 1 : supposed_load / 2;
- my_job.first = stolen_first;
-
- // Avoid std::min dependencies.
- my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
+ // Protects against stealing threads
+ // omp_set_lock(&(job[victim * stride].lock));
- my_job.load = my_job.last - my_job.first + 1;
+ // Push victim's start forward.
+ difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
+ difference_type stolen_try = stolen_first + steal - difference_type(1);
- //omp_unset_lock(&(my_job.lock));
+ // Protects against working thread
+ // omp_unset_lock(&(job[victim * stride].lock));
-#pragma omp atomic
- // Has potential work again.
- busy++;
- iam_working = true;
+ my_job.first = stolen_first;
+ my_job.last = __gnu_parallel::min(stolen_try, supposed_last);
+ my_job.load = my_job.last - my_job.first + 1;
-#pragma omp flush(busy)
- }
-#pragma omp flush(busy)
- } // end while busy > 0
- // Add accumulated result to output.
- omp_set_lock(&output_lock);
- output = r(output, result);
- omp_unset_lock(&output_lock);
+ //omp_unset_lock(&(my_job.lock));
- //omp_destroy_lock(&(my_job.lock));
- }
+ // Has potential work again.
+ #pragma omp atomic
+ busy++;
+ iam_working = true;
+ #pragma omp flush(busy)
+ }
+ #pragma omp flush(busy)
+ } // end while busy > 0
+ // Add accumulated result to output.
+ omp_set_lock(&output_lock);
+ output = r(output, result);
+ omp_unset_lock(&output_lock);
+ }
+
delete[] job;
// Points to last element processed (needed as return value for
Index: include/parallel/base.h
===================================================================
--- include/parallel/base.h (revision 130225)
+++ include/parallel/base.h (working copy)
@@ -92,6 +92,20 @@
b = (int)((x >> 0 ) & lcas_t_mask);
}
+ /** @brief Equivalent to std::min. */
+ template<typename T>
+ const T& min(const T& a, const T& b)
+ {
+ return (a < b) ? a : b;
+ };
+
+ /** @brief Equivalent to std::max. */
+ template<typename T>
+ const T& max(const T& a, const T& b)
+ {
+ return (a > b) ? a : b;
+ };
+
/** @brief Constructs predicate for equality from strict weak
* ordering predicate
*/
Index: include/parallel/par_loop.h
===================================================================
--- include/parallel/par_loop.h (revision 130225)
+++ include/parallel/par_loop.h (working copy)
@@ -41,6 +41,7 @@
#include <omp.h>
#include <parallel/settings.h>
+#include <parallel/base.h>
namespace __gnu_parallel
{
@@ -65,45 +66,47 @@
*/
template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
Op
- for_each_template_random_access_ed(RandomAccessIterator begin,
- RandomAccessIterator end, Op o, Fu& f,
- Red r, Result base, Result& output,
- typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
+ for_each_template_random_access_ed(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op o, Fu& f, Red r, Result base, Result& output,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::difference_type difference_type;
const difference_type length = end - begin;
- const difference_type settings_threads = static_cast<difference_type>(get_max_threads());
- const difference_type dmin = settings_threads < length ? settings_threads : length;
- const difference_type dmax = dmin > 1 ? dmin : 1;
+ Result *thread_results;
- thread_index_t num_threads = static_cast<thread_index_t>(dmax);
+ thread_index_t num_threads = __gnu_parallel::min<difference_type>(get_max_threads(), length);
+ #pragma omp parallel num_threads(num_threads)
+ {
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ thread_results = new Result[num_threads];
+ }
- Result *thread_results = new Result[num_threads];
+ thread_index_t iam = omp_get_thread_num();
-#pragma omp parallel num_threads(num_threads)
- {
- // Neutral element.
- Result reduct = Result();
+ // Neutral element.
+ Result reduct = Result();
- thread_index_t p = num_threads;
- thread_index_t iam = omp_get_thread_num();
- difference_type start = iam * length / p;
- difference_type limit = (iam == p - 1) ? length : (iam + 1) * length / p;
+ difference_type start = equally_split_point(length, num_threads, iam),
+ stop = equally_split_point(length, num_threads, iam + 1);
- if (start < limit)
- {
- reduct = f(o, begin + start);
- start++;
- }
+ if (start < stop)
+ {
+ reduct = f(o, begin + start);
+ ++start;
+ }
- for (; start < limit; start++)
- reduct = r(reduct, f(o, begin + start));
+ for (; start < stop; ++start)
+ reduct = r(reduct, f(o, begin + start));
- thread_results[iam] = reduct;
- }
+ thread_results[iam] = reduct;
+ } //parallel
for (thread_index_t i = 0; i < num_threads; i++)
output = r(output, thread_results[i]);
Index: include/parallel/features.h
===================================================================
--- include/parallel/features.h (revision 130225)
+++ include/parallel/features.h (working copy)
@@ -66,7 +66,7 @@
* @brief Include guarded (sequences may run empty) loser tree,
* moving objects.
* @see __gnu_parallel::Settings multiway_merge_algorithm */
-#define _GLIBCXX_LOSER_TREE 0
+#define _GLIBCXX_LOSER_TREE 1
#endif
#ifndef _GLIBCXX_LOSER_TREE_EXPLICIT
Index: include/parallel/quicksort.h
===================================================================
--- include/parallel/quicksort.h (revision 130225)
+++ include/parallel/quicksort.h (working copy)
@@ -55,9 +55,9 @@
template<typename RandomAccessIterator, typename Comparator>
inline typename std::iterator_traits<RandomAccessIterator>::difference_type
parallel_sort_qs_divide(RandomAccessIterator begin, RandomAccessIterator end,
- Comparator comp,
- typename std::iterator_traits<RandomAccessIterator>::difference_type pivot_rank,
- typename std::iterator_traits<RandomAccessIterator>::difference_type num_samples, thread_index_t num_threads)
+ Comparator comp,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type pivot_rank,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type num_samples, thread_index_t num_threads)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -65,13 +65,15 @@
difference_type n = end - begin;
num_samples = std::min(num_samples, n);
- value_type* samples = static_cast<value_type*>(__builtin_alloca(sizeof(value_type) * num_samples));
+ // Allocate uninitialized, to avoid default constructor.
+ value_type* samples = static_cast<value_type*>(operator new(num_samples * sizeof(value_type)));
+
for (difference_type s = 0; s < num_samples; s++)
{
- const unsigned long long index = static_cast<unsigned long long>(s)
- * n / num_samples;
- samples[s] = begin[index];
+ const unsigned long long index = static_cast<unsigned long long>(s)
+ * n / num_samples;
+ new(samples + s) value_type(begin[index]);
}
__gnu_sequential::sort(samples, samples + num_samples, comp);
@@ -101,8 +103,8 @@
if (num_threads <= 1)
{
- __gnu_sequential::sort(begin, end, comp);
- return;
+ __gnu_sequential::sort(begin, end, comp);
+ return;
}
difference_type n = end - begin, pivot_rank;
@@ -110,14 +112,14 @@
if (n <= 1)
return;
- thread_index_t num_processors_left;
+ thread_index_t num_threads_left;
if ((num_threads % 2) == 1)
- num_processors_left = num_threads / 2 + 1;
+ num_threads_left = num_threads / 2 + 1;
else
- num_processors_left = num_threads / 2;
+ num_threads_left = num_threads / 2;
- pivot_rank = n * num_processors_left / num_threads;
+ pivot_rank = n * num_threads_left / num_threads;
difference_type split = parallel_sort_qs_divide(begin, end, comp, pivot_rank,
Settings::sort_qs_num_samples_preset, num_threads);
@@ -125,9 +127,9 @@
#pragma omp parallel sections
{
#pragma omp section
- parallel_sort_qs_conquer(begin, begin + split, comp, num_processors_left);
+ parallel_sort_qs_conquer(begin, begin + split, comp, num_threads_left);
#pragma omp section
- parallel_sort_qs_conquer(begin + split, end, comp, num_threads - num_processors_left);
+ parallel_sort_qs_conquer(begin + split, end, comp, num_threads - num_threads_left);
}
}
@@ -144,8 +146,8 @@
template<typename RandomAccessIterator, typename Comparator>
inline void
parallel_sort_qs(RandomAccessIterator begin, RandomAccessIterator end,
- Comparator comp,
- typename std::iterator_traits<RandomAccessIterator>::difference_type n, int num_threads)
+ Comparator comp,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type n, int num_threads)
{
_GLIBCXX_CALL(n)
@@ -165,12 +167,9 @@
// Hard to avoid.
omp_set_num_threads(num_threads);
- bool old_nested = (omp_get_nested() != 0);
- omp_set_nested(true);
parallel_sort_qs_conquer(begin, begin + n, comp, num_threads);
- omp_set_nested(old_nested);
}
-} //namespace __gnu_parallel
+} //namespace __gnu_parallel
#endif
Index: include/parallel/compiletime_settings.h
===================================================================
--- include/parallel/compiletime_settings.h (revision 130225)
+++ include/parallel/compiletime_settings.h (working copy)
@@ -53,24 +53,33 @@
#define _GLIBCXX_CALL(n) printf(" %s:\niam = %d, n = %ld, num_threads = %d\n", __PRETTY_FUNCTION__, omp_get_thread_num(), (n), get_max_threads());
#endif
+#ifndef _GLIBCXX_SCALE_DOWN_FPU
/** @brief Use floating-point scaling instead of modulo for mapping
* random numbers to a range. This can be faster on certain CPUs. */
#define _GLIBCXX_SCALE_DOWN_FPU 0
+#endif
+#ifndef _GLIBCXX_ASSERTIONS
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Should be switched on only locally. */
#define _GLIBCXX_ASSERTIONS 0
+#endif
+#ifndef _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_L1
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Consider the size of the L1 cache for __gnu_parallel::parallel_random_shuffle(). */
#define _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_L1 0
+#endif
+#ifndef _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB
/** @brief Switch on many _GLIBCXX_PARALLEL_ASSERTions in parallel code.
* Consider the size of the TLB for __gnu_parallel::parallel_random_shuffle(). */
#define _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB 0
+#endif
+#ifndef _GLIBCXX_MULTIWAY_MERGESORT_COPY_LAST
/** @brief First copy the data, sort it locally, and merge it back
* (0); or copy it back after everything is done (1).
*
* Recommendation: 0 */
#define _GLIBCXX_MULTIWAY_MERGESORT_COPY_LAST 0
-
+#endif
Index: include/parallel/equally_split.h
===================================================================
--- include/parallel/equally_split.h (revision 130225)
+++ include/parallel/equally_split.h (working copy)
@@ -39,30 +39,51 @@
namespace __gnu_parallel
{
- /** @brief Function to split a sequence into parts of almost equal size.
+ /** @brief Function to num_longer_chunks a sequence into parts of almost equal size.
*
- * The resulting sequence s of length p+1 contains the splitting
+ * The resulting sequence s of length num_threads+1 contains the splitting
* positions when splitting the range [0,n) into parts of almost
* equal size (plus minus 1). The first entry is 0, the last one
* n. There may result empty parts.
* @param n Number of elements
- * @param p Number of parts
+ * @param num_threads Number of parts
* @param s Splitters
- * @returns End of splitter sequence, i. e. @c s+p+1 */
+ * @returns End of splitter sequence, i. e. @c s+num_threads+1 */
template<typename _DifferenceTp, typename OutputIterator>
OutputIterator
- equally_split(_DifferenceTp n, thread_index_t p, OutputIterator s)
+ equally_split(_DifferenceTp n, thread_index_t num_threads, OutputIterator s)
{
typedef _DifferenceTp difference_type;
- difference_type chunk_length = n / p, split = n % p, start = 0;
- for (int i = 0; i < p; i++)
+ difference_type chunk_length = n / num_threads, num_longer_chunks = n % num_threads, pos = 0;
+ for (thread_index_t i = 0; i < num_threads; ++i)
{
- *s++ = start;
- start += (difference_type(i) < split) ? (chunk_length + 1) : chunk_length;
+ *s++ = pos;
+ pos += (i < num_longer_chunks) ? (chunk_length + 1) : chunk_length;
}
*s++ = n;
return s;
}
+
+
+ /** @brief Function to num_longer_chunks a sequence into parts of almost equal size.
+ *
+ * Returns the position of the splitting point between
+ * thread number thread_no (included) and
+ * thread number thread_no+1 (excluded).
+ * @param n Number of elements
+ * @param num_threads Number of parts
+ * @returns Splitting point */
+ template<typename _DifferenceTp>
+ _DifferenceTp
+ equally_split_point(_DifferenceTp n, thread_index_t num_threads, thread_index_t thread_no)
+ {
+ typedef _DifferenceTp difference_type;
+ difference_type chunk_length = n / num_threads, num_longer_chunks = n % num_threads;
+ if(thread_no < num_longer_chunks)
+ return thread_no * (chunk_length + 1);
+ else
+ return num_longer_chunks * (chunk_length + 1) + (thread_no - num_longer_chunks) * chunk_length;
+ }
}
#endif
Index: include/parallel/omp_loop_static.h
===================================================================
--- include/parallel/omp_loop_static.h (revision 130225)
+++ include/parallel/omp_loop_static.h (working copy)
@@ -64,39 +64,47 @@
* std::count_n()).
* @return User-supplied functor (that may contain a part of the result).
*/
- template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
+ template<typename RandomAccessIterator,
+ typename Op,
+ typename Fu,
+ typename Red,
+ typename Result>
Op
- for_each_template_random_access_omp_loop_static(RandomAccessIterator begin,
- RandomAccessIterator end,
- Op o, Fu& f, Red r,
- Result base, Result& output,
- typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
+ for_each_template_random_access_omp_loop_static(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op o, Fu& f, Red r, Result base, Result& output,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
{
- typedef std::iterator_traits<RandomAccessIterator> traits_type;
- typedef typename traits_type::difference_type difference_type;
+ typedef typename std::iterator_traits<RandomAccessIterator>::difference_type
+ difference_type;
- thread_index_t num_threads = (get_max_threads() < (end - begin)) ? get_max_threads() : (end - begin);
- Result *thread_results = new Result[num_threads];
difference_type length = end - begin;
+ thread_index_t num_threads = std::min<difference_type>(get_max_threads(), length);
- for (thread_index_t i = 0; i < num_threads; i++)
+ Result *thread_results;
+
+ #pragma omp parallel num_threads(num_threads)
{
- thread_results[i] = r(thread_results[i], f(o, begin+i));
- }
+ #pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+ thread_results = new Result[num_threads];
-#pragma omp parallel num_threads(num_threads)
- {
-#pragma omp for schedule(static, Settings::workstealing_chunk_size)
- for (difference_type pos = 0; pos < length; pos++)
- {
- thread_results[omp_get_thread_num()] = r(thread_results[omp_get_thread_num()], f(o, begin+pos));
- }
- }
+ for (thread_index_t i = 0; i < num_threads; i++)
+ thread_results[i] = Result();
+ }
+ thread_index_t iam = omp_get_thread_num();
+
+ #pragma omp for schedule(static, Settings::workstealing_chunk_size)
+ for (difference_type pos = 0; pos < length; pos++)
+ thread_results[iam] =
+ r(thread_results[iam], f(o, begin+pos));
+ } //parallel
+
for (thread_index_t i = 0; i < num_threads; i++)
- {
- output = r(output, thread_results[i]);
- }
+ output = r(output, thread_results[i]);
delete [] thread_results;
@@ -106,6 +114,7 @@
return o;
}
+
} // end namespace
#endif
Index: include/parallel/random_shuffle.h
===================================================================
--- include/parallel/random_shuffle.h (revision 130225)
+++ include/parallel/random_shuffle.h (working copy)
@@ -99,9 +99,6 @@
/** @brief Number of threads participating in total. */
int num_threads;
- /** @brief Number of owning thread. */
- int iam;
-
/** @brief Begin index for bins taken care of by this thread. */
bin_index bins_begin;
@@ -135,9 +132,9 @@
typedef typename traits_type::value_type value_type;
typedef typename traits_type::difference_type difference_type;
- DRSSorterPU<RandomAccessIterator, RandomNumberGenerator>* d = &pus[omp_get_thread_num()];
+ thread_index_t iam = omp_get_thread_num();
+ DRSSorterPU<RandomAccessIterator, RandomNumberGenerator>* d = &pus[iam];
DRandomShufflingGlobalData<RandomAccessIterator>* sd = d->sd;
- thread_index_t iam = d->iam;
// Indexing: dist[bin][processor]
difference_type length = sd->starts[iam + 1] - sd->starts[iam];
@@ -258,7 +255,7 @@
*/
template<typename RandomAccessIterator, typename RandomNumberGenerator>
inline void
- parallel_random_shuffle_drs(RandomAccessIterator begin, RandomAccessIterator end, typename std::iterator_traits<RandomAccessIterator>::difference_type n, int num_threads, RandomNumberGenerator& rng)
+ parallel_random_shuffle_drs(RandomAccessIterator begin, RandomAccessIterator end, typename std::iterator_traits<RandomAccessIterator>::difference_type n, thread_index_t num_threads, RandomNumberGenerator& rng)
{
typedef std::iterator_traits<RandomAccessIterator> traits_type;
typedef typename traits_type::value_type value_type;
@@ -280,82 +277,87 @@
// No more buckets than TLB entries, power of 2
// Power of 2 and at least one element per bin, at most the TLB size.
- num_bins = std::min(n, (difference_type)num_bins_cache);
+ num_bins = std::min<difference_type>(n, num_bins_cache);
#if _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB
// 2 TLB entries needed per bin.
- num_bins = std::min((difference_type)Settings::TLB_size / 2, num_bins);
+ num_bins = std::min<difference_type>(Settings::TLB_size / 2, num_bins);
#endif
num_bins = round_up_to_pow2(num_bins);
if (num_bins < num_bins_cache)
{
#endif
- // Now try the L2 cache
- // Must fit into L2
- num_bins_cache = static_cast<bin_index>(std::max((difference_type)1, (difference_type)(n / (Settings::L2_cache_size / sizeof(value_type)))));
- num_bins_cache = round_up_to_pow2(num_bins_cache);
+ // Now try the L2 cache
+ // Must fit into L2
+ num_bins_cache = static_cast<bin_index>(std::max((difference_type)1, (difference_type)(n / (Settings::L2_cache_size / sizeof(value_type)))));
+ num_bins_cache = round_up_to_pow2(num_bins_cache);
- // No more buckets than TLB entries, power of 2.
- num_bins = static_cast<bin_index>(std::min(n, (difference_type)num_bins_cache));
- // Power of 2 and at least one element per bin, at most the TLB size.
+ // No more buckets than TLB entries, power of 2.
+ num_bins = static_cast<bin_index>(std::min(n, (difference_type)num_bins_cache));
+ // Power of 2 and at least one element per bin, at most the TLB size.
#if _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_TLB
- // 2 TLB entries needed per bin.
- num_bins = std::min((difference_type)Settings::TLB_size / 2, num_bins);
+ // 2 TLB entries needed per bin.
+ num_bins = std::min((difference_type)Settings::TLB_size / 2, num_bins);
#endif
- num_bins = round_up_to_pow2(num_bins);
+ num_bins = round_up_to_pow2(num_bins);
#if _GLIBCXX_RANDOM_SHUFFLE_CONSIDER_L1
}
#endif
- num_threads = std::min((bin_index)num_threads, (bin_index)num_bins);
+ num_threads = std::min<bin_index>(num_threads, num_bins);
if (num_threads <= 1)
return sequential_random_shuffle(begin, end, rng);
DRandomShufflingGlobalData<RandomAccessIterator> sd(begin);
+ DRSSorterPU<RandomAccessIterator, random_number >* pus;
+ difference_type* starts;
- DRSSorter