concurrent_priority_queue.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023 
00024 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
00025 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
00026 #endif
00027 
00028 #include "atomic.h"
00029 #include "cache_aligned_allocator.h"
00030 #include "tbb_exception.h"
00031 #include "tbb_stddef.h"
00032 #include "tbb_profiling.h"
00033 #include <iterator>
00034 #include <functional>
00035 
00036 namespace tbb {
00037 namespace interface5 {
00038 
00040 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00041 class concurrent_priority_queue {
00042  public:
00044     typedef T value_type;
00045 
00047     typedef T& reference;
00048 
00050     typedef const T& const_reference;
00051 
00053     typedef size_t size_type;
00054 
00056     typedef ptrdiff_t difference_type;
00057 
00059     typedef A allocator_type;
00060 
00062     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : my_helper(a) { 
00063         internal_construct(0); 
00064     }
00065     
00067     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : my_helper(a) { 
00068         internal_construct(init_capacity); 
00069     }
00070     
00072     template<typename InputIterator>
00073     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : my_helper(a)
00074     {
00075         internal_iterator_construct(begin, end, typename std::iterator_traits<InputIterator>::iterator_category());
00076     }
00077     
00079 
00080     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a = allocator_type()) : my_helper(a)
00081     {
00082         internal_construct(src.my_size);
00083         my_size = src.my_size;
00084         mark = src.mark;
00085         __TBB_TRY {
00086             internal_copy(src.data, data, my_size);
00087         } __TBB_CATCH(...) {
00088             my_helper.deallocate(data, my_capacity);
00089             __TBB_RETHROW();
00090         }
00091         heapify();
00092     }
00093 
00095 
00096     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00097         if (this !=&src) {
00098             concurrent_priority_queue copy(src);
00099             copy.swap(*this);
00100         }
00101         return *this;
00102     }
00103 
00105     ~concurrent_priority_queue() { internal_destroy(); }
00106 
00108 
00109     bool empty() const { return my_size==0; }
00110 
00112 
00113     size_type size() const { return my_size; } 
00114 
00116 
00117     size_type capacity() const { 
00118         return my_capacity; 
00119     }
00120 
00122     void push(const_reference elem) {
00123         cpq_operation op_data(elem, PUSH_OP);
00124         insert_handle_wait(&op_data);
00125         if (op_data.result == FAILED) { // Copy constructor with elem threw exception
00126             tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00127         }
00128     }
00129     
00131 
00133     bool try_pop(reference elem) {
00134         cpq_operation op_data(POP_OP);
00135         op_data.elem = &elem;
00136         insert_handle_wait(&op_data);
00137         return op_data.result==SUCCESS;
00138     }
00139 
00141     void reserve(size_type new_cap) {
00142         cpq_operation op_data(RESERVE_OP);
00143         op_data.sz = new_cap;
00144         insert_handle_wait(&op_data);
00145         if (op_data.result == FAILED) { // Copy constructors threw exception during array resize
00146             tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00147         }
00148     }
00149 
00151 
00153     void clear() {
00154         for (size_type i=my_size; i>0; --i) {
00155             data[i-1].~value_type();
00156         }
00157         my_size = 0;
00158         mark = 0; 
00159     }
00160 
00162     void shrink_to_fit() {
00163         internal_reserve(my_size);
00164     }
00165 
00167     void swap(concurrent_priority_queue& q) {
00168         std::swap(data, q.data);
00169         std::swap(my_size, q.my_size);        
00170         std::swap(my_capacity, q.my_capacity);
00171         std::swap(mark, q.mark);
00172         std::swap(my_helper, q.my_helper);
00173     }
00174 
00176     allocator_type get_allocator() const { return my_helper; }
00177 
00178 private:
00179     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
00180     enum operation_status { WAITING = 0, SUCCESS, FAILED };
00181     class cpq_operation {
00182      public:
00183         operation_type type;
00184         uintptr_t result;
00185         union {
00186             value_type *elem;
00187             size_type sz;
00188         };
00189         cpq_operation *next;
00190         cpq_operation(const_reference e, operation_type t) : 
00191             type(t), result(WAITING), elem(const_cast<value_type*>(&e)), next(NULL) {}
00192         cpq_operation(operation_type t) : type(t), result(WAITING), next(NULL) {}
00193     };
00194 
00196     tbb::atomic<cpq_operation *> operation_list;
00198     char padding1[ tbb::internal::NFS_MaxLineSize - sizeof(tbb::atomic<cpq_operation *>)];
00199 
00201     uintptr_t handler_busy;
00203     size_type my_size;
00205     size_type mark;
00207     char padding2[ tbb::internal::NFS_MaxLineSize - sizeof(uintptr_t) - 2*sizeof(size_type)];
00208 
00210     size_type my_capacity;
00212 
00230     value_type *data;
00232     struct helper_type : public allocator_type {
00234         helper_type(allocator_type const& a) : allocator_type(a) {}
00236         Compare compare;
00237     };
00238     helper_type my_helper;
00239 
00241     void internal_construct(size_type init_sz) {
00242         my_size = my_capacity = mark = 0;
00243         data = NULL;
00244         operation_list = NULL;
00245         handler_busy = 0;
00246         internal_reserve(init_sz);
00247     }
00248 
00250     template <typename ForwardIterator>
00251     void internal_iterator_construct(ForwardIterator begin, ForwardIterator end, std::forward_iterator_tag) {
00252         internal_construct(std::distance(begin, end));
00253         my_size = my_capacity;
00254         size_type i=0;
00255         __TBB_TRY {
00256             for(; begin != end; ++begin, ++i)
00257                 new (&data[i]) value_type(*begin);
00258         } __TBB_CATCH(...) {
00259             my_size = i;
00260             clear();
00261             my_helper.deallocate(data, my_capacity);
00262             __TBB_RETHROW();
00263         }
00264         heapify();
00265     }
00266 
00268     template <typename InputIterator>
00269     void internal_iterator_construct(InputIterator begin, InputIterator end, std::input_iterator_tag) {
00270         internal_construct(32); 
00271         size_type i=0;
00272         __TBB_TRY {
00273             for(; begin != end; ++begin, ++i) {
00274                 if (i>=my_capacity)
00275                     internal_reserve(my_capacity<<1);
00276                 new (&data[i]) value_type(*begin);
00277                 ++my_size;
00278             }
00279         } __TBB_CATCH(...) {
00280             clear();
00281             my_helper.deallocate(data, my_capacity);
00282             __TBB_RETHROW();
00283         }
00284         heapify();
00285         shrink_to_fit();
00286     }
00287 
00289     void internal_destroy() { 
00290 #if TBB_USE_ASSERT        
00291         cpq_operation *op_list = operation_list.fetch_and_store((cpq_operation *)(internal::poisoned_ptr));
00292         __TBB_ASSERT(op_list==NULL,"concurrent_priority_queue destroyed with pending operations.\n");
00293         __TBB_ASSERT(!handler_busy,"concurrent_priority_queue destroyed with pending operations.\n");
00294 #endif
00295         clear();
00296         if (data) my_helper.deallocate(data, my_capacity);
00297     }
00298 
00300 
00301     void internal_reserve(size_type desired_capacity);
00302 
00304 
00305     void internal_copy(const value_type *src, value_type *dst, size_type sz) {
00306         size_type i=0;
00307         __TBB_TRY {
00308             for (; i<sz; ++i) new (&dst[i]) value_type(src[i]);
00309         } __TBB_CATCH(...) {
00310             // clean up dst
00311             for (; i>0; --i) {
00312                 dst[i-1].~value_type();
00313             }
00314             __TBB_RETHROW();
00315         }
00316     }
00317 
00319     void heapify();
00320 
00322 
00323     void reheap();
00324 
00326     void handle_operations();
00327 
00329     void insert_handle_wait(cpq_operation *op) {
00330         cpq_operation *res = operation_list, *tmp;
00331 
00332         using namespace tbb::internal;
00333 
00334         __TBB_ASSERT(operation_list!=(cpq_operation *)poisoned_ptr, "Attempt to use destroyed concurrent_priority_queue.\n");
00335         // insert the operation in the queue
00336         do {
00337             op->next = tmp = res;
00338             // ITT note: &operation_list+1 tag is used to cover accesses to all ops in operation_list.
00339             // This thread has created the operation, and now releases it so that the handler thread 
00340             // may handle the operations w/o triggering a race condition; thus this tag will be acquired
00341             // just before the operations are handled in handle_operations.
00342             call_itt_notify(releasing, &operation_list+1);
00343         } while ((res = operation_list.compare_and_swap(op, tmp)) != tmp);
00344         if (!tmp) { // first in the list; handle the operations
00345             // ITT note: &operation_list tag covers access to the handler_busy flag, which this
00346             // waiting handler thread will try to set upon entering handle_operations.
00347             call_itt_notify(acquired, &operation_list);
00348             handle_operations();
00349             __TBB_ASSERT(op->result, NULL);
00350         }
00351         else { // not first; wait for op to be ready
00352             call_itt_notify(prepare, &(op->result));
00353             spin_wait_while_eq(op->result, uintptr_t(WAITING));
00354             itt_load_word_with_acquire(op->result);
00355         }
00356     }
00357 };
00358     
00360 template <typename T, typename Compare, typename A>
00361 void concurrent_priority_queue<T, Compare, A>::internal_reserve(size_type desired_capacity) {
00362     value_type *tmp_data = NULL;
00363     
00364     // don't reduce queue capacity below content size
00365     if (desired_capacity<my_size) desired_capacity = my_size;
00366     // handle special case of complete queue removal
00367     if (desired_capacity==0) {
00368         // assert that data was properly cleared before this
00369         __TBB_ASSERT(my_size==0, NULL);
00370         __TBB_ASSERT(mark==0, NULL);
00371         if (data) my_helper.deallocate(data, my_capacity);
00372         data = NULL;
00373         my_capacity = 0;
00374         return;
00375     }
00376     // allocate the new array
00377     tmp_data = static_cast<value_type*>(my_helper.allocate(desired_capacity));
00378     if( !tmp_data )
00379         tbb::internal::throw_exception(tbb::internal::eid_bad_alloc);
00380     if (data) {
00381         // fill new array with old contents, if any
00382         __TBB_TRY {
00383             internal_copy(data, tmp_data, my_size);
00384         } __TBB_CATCH(...) {
00385             my_helper.deallocate(tmp_data, desired_capacity);
00386             __TBB_RETHROW();
00387         }
00388         // clear and delete old array
00389         for (size_type i=my_size; i>0; --i) {
00390             data[i-1].~value_type();
00391         }
00392         my_helper.deallocate(data, my_capacity);
00393     }
00394     // else simply put the new array in data
00395     // update data and my_capacity
00396     data = tmp_data;
00397     my_capacity = desired_capacity;
00398 }
00399 
00401 template <typename T, typename Compare, typename A>
00402 void concurrent_priority_queue<T, Compare, A>::heapify() {
00403     value_type *loc = data;
00404 
00405     if (!mark) mark = 1;
00406     for (; mark<my_size; ++mark) { // for each unheapified element under my_size
00407         size_type cur_pos = mark; 
00408         value_type to_place = loc[mark];
00409         do { // push to_place up the heap
00410             size_type parent = (cur_pos-1)>>1;
00411             if (!my_helper.compare(loc[parent], to_place)) break;
00412             loc[cur_pos] = loc[parent];
00413             cur_pos = parent;
00414         } while( cur_pos );
00415         loc[cur_pos] = to_place;
00416     }
00417 }
00418     
00420 
00422 template <typename T, typename Compare, typename A>
00423 void concurrent_priority_queue<T, Compare, A>::reheap() {
00424     size_type cur_pos=0, child=1;
00425     value_type *loc = data;
00426     
00427     while (child < mark) {
00428         size_type target = child;
00429         if (child+1 < mark && my_helper.compare(loc[child], loc[child+1])) ++target;
00430         // target now has the higher priority child
00431         if (my_helper.compare(loc[target], loc[my_size])) break;
00432         loc[cur_pos] = loc[target];
00433         cur_pos = target;
00434         child = (cur_pos<<1)+1;
00435     }
00436     loc[cur_pos] = loc[my_size];
00437 }
00438 
00439 template <typename T, typename Compare, typename A>
00440 void concurrent_priority_queue<T, Compare, A>::handle_operations() {
00441     cpq_operation *op_list, *tmp, *pop_list=NULL;
00442 
00443     using namespace tbb::internal;
00444 
00445     // get the handler_busy: only one thread can possibly spin here at a time
00446     // ITT note: &handler_busy tag covers access to the actual queue as it is passed between active 
00447     // and waiting handlers.  Below, the waiting handler waits until the active handler releases, 
00448     // and the waiting handler acquires &handler_busy as it becomes the active_handler. The 
00449     // release point is at the end of this function, when all operations in the list have been applied
00450     // to the queue.
00451     call_itt_notify(prepare, &handler_busy);
00452     spin_wait_until_eq(handler_busy, uintptr_t(0));
00453     call_itt_notify(acquired, &handler_busy);
00454     // acquire not necessary here due to causality rule and surrounding atomics
00455     __TBB_store_with_release(handler_busy, 1);
00456 
00457     // grab the operation list
00458     // ITT note: &operation_list tag covers access to the handler_busy flag itself.
00459     // Capturing the state of the operation_list signifies that handler_busy has been set and a new
00460     // active handler will now process that list's operations.
00461     call_itt_notify(releasing, &operation_list);
00462     op_list = operation_list.fetch_and_store(NULL);
00463     // ITT note: &operation_list+1 tag is used to cover accesses to all ops in operation_list.
00464     // The threads that created each operation released this tag so that this handler thread 
00465     // could handle the operations w/o triggering a race condition; thus this handler thread
00466     // now acquires this tag just before handling the operations.
00467     call_itt_notify(acquired, &operation_list+1);
00468 
00469     // first pass processes all constant time operations: pushes, tops, some pops. Also reserve.
00470     while (op_list) {
00471         __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00472         tmp = op_list;
00473         op_list = op_list->next;
00474         if (tmp->type == PUSH_OP) {
00475             __TBB_TRY {
00476                 if (my_size >= my_capacity) internal_reserve(my_capacity?my_capacity<<1:1);
00477                 new (&data[my_size]) value_type(*(tmp->elem)); // copy the data
00478                 ++my_size;
00479                 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00480             } __TBB_CATCH(...) {
00481                 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00482             }
00483         }
00484         else if (tmp->type == POP_OP) {
00485             if (!my_size) {
00486                 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00487             }
00488             else {
00489                 if (mark < my_size && my_helper.compare(data[0], data[my_size-1])) {
00490                     // there are newly pushed elems and the last one is higher than top
00491                     *(tmp->elem) = data[my_size-1]; // copy the data
00492                     data[my_size-1].~value_type();
00493                     --my_size;
00494                     itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00495                     __TBB_ASSERT(mark<=my_size, NULL);
00496                 }
00497                 else {
00498                     tmp->next = pop_list;
00499                     pop_list = tmp;
00500                 }
00501             }
00502         }
00503         else {
00504             __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
00505             __TBB_TRY {
00506                 internal_reserve(tmp->sz);
00507                 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00508             } __TBB_CATCH(...) {
00509                 itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00510             };
00511         }
00512     }
00513 
00514     // second pass processes pop operations
00515     while (pop_list) {
00516         tmp = pop_list;
00517         pop_list = pop_list->next;
00518         __TBB_ASSERT(tmp->type == POP_OP, NULL);
00519         if (!my_size) {
00520             itt_store_word_with_release(tmp->result, uintptr_t(FAILED));
00521         }
00522         else {
00523             __TBB_ASSERT(mark<=my_size, NULL);
00524             if (mark < my_size && my_helper.compare(data[0], data[my_size-1])) {
00525                 // there are newly pushed elems and the last one is higher than top
00526                 *(tmp->elem) = data[my_size-1]; // copy the data
00527                 --my_size;
00528                 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00529                 data[my_size].~value_type();
00530             }
00531             else { // extract and push the last element down heap
00532                 *(tmp->elem) = data[0]; // copy the data
00533                 if (mark == my_size) --mark;
00534                 --my_size;
00535                 itt_store_word_with_release(tmp->result, uintptr_t(SUCCESS));
00536                 data[0] = data[my_size];
00537                 if (my_size > 1) // don't reheap for heap of size 1
00538                     reheap();
00539                 data[my_size].~value_type();
00540             }
00541             __TBB_ASSERT(mark<=my_size, NULL);
00542         }
00543     }
00544 
00545     // heapify any leftover pushed elements before doing the next batch of operations
00546     if (mark<my_size) heapify();
00547     __TBB_ASSERT(mark<=my_size, NULL);
00548     
00549     // release the handler_busy
00550     itt_store_word_with_release(handler_busy, uintptr_t(0));
00551 }
00552 
00553 } // namespace interface5
00554 
00555 using interface5::concurrent_priority_queue;
00556 
00557 } // namespace tbb
00558 
00559 
00560 #endif /* __TBB_concurrent_priority_queue_H */

Copyright © 2005-2010 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.