00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_graph_H
00022 #define __TBB_graph_H
00023
00024 #if !TBB_PREVIEW_GRAPH
00025 #error Set TBB_PREVIEW_GRAPH to include graph.h
00026 #endif
00027
00028 #include "tbb_stddef.h"
00029 #include "atomic.h"
00030 #include "spin_mutex.h"
00031 #include "spin_rw_mutex.h"
00032 #include "null_rw_mutex.h"
00033 #include "task.h"
00034 #include "concurrent_vector.h"
00035
00036
00037 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00038 #define TBB_PREVIEW_TUPLE 1
00039 #include "compat/tuple"
00040 #else
00041 #include <tuple>
00042 #endif
00043
00044 #include<list>
00045 #include<queue>
00046
00047
00058 namespace tbb {
00059
00061 class graph_node {
00062 public:
00063 virtual ~graph_node() {}
00064 };
00065
00067 class continue_msg {};
00068
00069 template< typename T > class sender;
00070 template< typename T > class receiver;
00071 class continue_receiver;
00072
00074 template< typename T >
00075 class sender {
00076 public:
00078 typedef T output_type;
00079
00081 typedef receiver<T> successor_type;
00082
00083 virtual ~sender() {}
00084
00086 virtual bool register_successor( successor_type &r ) = 0;
00087
00089 virtual bool remove_successor( successor_type &r ) = 0;
00090
00092 virtual bool try_get( T & ) { return false; }
00093
00095 virtual bool try_reserve( T & ) { return false; }
00096
00098 virtual bool try_release( ) { return false; }
00099
00101 virtual bool try_consume( ) { return false; }
00102
00103 };
00104
00105
00107 template< typename T >
00108 class receiver {
00109 public:
00110
00112 typedef T input_type;
00113
00115 typedef sender<T> predecessor_type;
00116
00118 virtual ~receiver() {}
00119
00121 virtual bool try_put( T t ) = 0;
00122
00124 virtual bool register_predecessor( predecessor_type & ) { return false; }
00125
00127 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00128
00129 };
00130
00132
00133 class continue_receiver : public receiver< continue_msg > {
00134 public:
00135
00137 typedef continue_msg input_type;
00138
00140 typedef sender< continue_msg > predecessor_type;
00141
00143 continue_receiver( int number_of_predecessors = 0 ) {
00144 my_predecessor_count = number_of_predecessors;
00145 my_current_count = 0;
00146 }
00147
00149 virtual ~continue_receiver() { }
00150
00152 bool register_predecessor( predecessor_type & ) {
00153 tbb::spin_mutex::scoped_lock l(my_mutex);
00154 ++my_predecessor_count;
00155 return true;
00156 }
00157
00159
00162 bool remove_predecessor( predecessor_type & ) {
00163 tbb::spin_mutex::scoped_lock l(my_mutex);
00164 --my_predecessor_count;
00165 return true;
00166 }
00167
00169
00171 bool try_put( input_type ) {
00172 {
00173 tbb::spin_mutex::scoped_lock l(my_mutex);
00174 if ( ++my_current_count < my_predecessor_count )
00175 return true;
00176 else
00177 my_current_count = 0;
00178 }
00179 execute();
00180 return true;
00181 }
00182
00183 protected:
00184
00185 tbb::spin_mutex my_mutex;
00186 int my_predecessor_count;
00187 int my_current_count;
00188
00190
00192 virtual void execute() = 0;
00193
00194 };
00195
00197 namespace internal {
00198
00200 enum node_state { node_state_idle=0, node_state_nonidle=1, node_state_inactive=2 };
00201
00202
00204 template< typename Output >
00205 class source_body : no_assign {
00206 public:
00207 virtual ~source_body() {}
00208 virtual bool operator()(Output &output) = 0;
00209 };
00210
00212 template< typename Output, typename Body>
00213 class source_body_leaf : public source_body<Output> {
00214 public:
00215 source_body_leaf( Body _body ) : body(_body) { }
00216 bool operator()(Output &output) { return body( output ); }
00217 private:
00218 Body body;
00219 };
00220
00222 template< typename Input, typename Output >
00223 class function_body : no_assign {
00224 public:
00225 virtual ~function_body() {}
00226 virtual Output operator()(Input input) = 0;
00227 };
00228
00230 template <typename Input, typename Output, typename B>
00231 class function_body_leaf : public function_body< Input, Output > {
00232 public:
00233 function_body_leaf( B _body ) : body(_body) { }
00234 Output operator()(Input i) { return body(i); }
00235
00236 private:
00237 B body;
00238 };
00239
00241 template <typename B>
00242 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
00243 public:
00244 function_body_leaf( B _body ) : body(_body) { }
00245 continue_msg operator()( continue_msg i ) {
00246 body(i);
00247 return i;
00248 }
00249
00250 private:
00251 B body;
00252 };
00253
00255 template <typename Input, typename B>
00256 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
00257 public:
00258 function_body_leaf( B _body ) : body(_body) { }
00259 continue_msg operator()(Input i) {
00260 body(i);
00261 return continue_msg();
00262 }
00263
00264 private:
00265 B body;
00266 };
00267
00269 template <typename Output, typename B>
00270 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
00271 public:
00272 function_body_leaf( B _body ) : body(_body) { }
00273 Output operator()(continue_msg i) {
00274 return body(i);
00275 }
00276
00277 private:
00278 B body;
00279 };
00280
00282 template< typename NodeType >
00283 class forward_task : public task {
00284
00285 NodeType &my_node;
00286
00287 public:
00288
00289 forward_task( NodeType &n ) : my_node(n) {}
00290
00291 task *execute() {
00292 my_node.forward();
00293 return NULL;
00294 }
00295 };
00296
00298 template< typename NodeType, typename Input >
00299 class apply_body_task : public task {
00300
00301 NodeType &my_node;
00302 Input my_input;
00303
00304 public:
00305
00306 apply_body_task( NodeType &n, Input i ) : my_node(n), my_input(i) {}
00307
00308 task *execute() {
00309 my_node.apply_body( my_input );
00310 return NULL;
00311 }
00312 };
00313
00315 template< typename NodeType >
00316 class source_task : public task {
00317
00318 NodeType &my_node;
00319
00320 public:
00321
00322 source_task( NodeType &n ) : my_node(n) {}
00323
00324 task *execute() {
00325 my_node.apply_body( );
00326 return NULL;
00327 }
00328 };
00329
00331 template< typename Input, typename Output >
00332 struct empty_body {
00333 Output operator()( Input & ) const { return Output(); }
00334 };
00335
00337 template< typename T >
00338 class node_cache {
00339
00340 public:
00341
00342 typedef size_t size_type;
00343
00344 bool empty() {
00345 tbb::spin_mutex::scoped_lock lock( my_mutex );
00346 return internal_empty();
00347 }
00348
00349 void add( T &n ) {
00350 tbb::spin_mutex::scoped_lock lock( my_mutex );
00351 internal_push(n);
00352 }
00353
00354 void remove( T &n ) {
00355 tbb::spin_mutex::scoped_lock lock( my_mutex );
00356 for ( size_t i = internal_size(); i != 0; --i ) {
00357 T &s = internal_pop();
00358 if ( &s != &n ) {
00359 internal_push(s);
00360 }
00361 }
00362 }
00363
00364 protected:
00365
00366 tbb::spin_mutex my_mutex;
00367 std::queue< T * > my_q;
00368
00369
00370 inline bool internal_empty( ) {
00371 return my_q.empty();
00372 }
00373
00374
00375 inline size_type internal_size( ) {
00376 return my_q.size();
00377 }
00378
00379
00380 inline void internal_push( T &n ) {
00381 my_q.push(&n);
00382 }
00383
00384
00385 inline T &internal_pop() {
00386 T *v = my_q.front();
00387 my_q.pop();
00388 return *v;
00389 }
00390
00391 };
00392
00394 template< typename T >
00395 class predecessor_cache : public node_cache< sender<T> > {
00396 public:
00397
00398 typedef T output_type;
00399 typedef sender<output_type> predecessor_type;
00400 typedef receiver<output_type> successor_type;
00401
00402 predecessor_cache( ) : my_owner( NULL ) { }
00403
00404 void set_owner( successor_type *owner ) { my_owner = owner; }
00405
00406 bool get_item( output_type &v ) {
00407
00408 bool msg = false;
00409
00410 do {
00411 predecessor_type *src;
00412 {
00413 tbb::spin_mutex::scoped_lock lock( node_cache<predecessor_type>::my_mutex );
00414 if ( this->internal_empty() ) {
00415 break;
00416 }
00417 src = &this->internal_pop();
00418 }
00419
00420
00421 msg = src->try_get( v );
00422
00423 if (msg == false) {
00424
00425 if ( my_owner)
00426 src->register_successor( *my_owner );
00427 } else {
00428
00429 this->add(*src);
00430 }
00431 } while ( msg == false );
00432 return msg;
00433 }
00434
00435 protected:
00436
00437 successor_type *my_owner;
00438
00439 };
00440
00442 template< typename T >
00443 class reservable_predecessor_cache : public predecessor_cache< T > {
00444 public:
00445
00446 typedef T output_type;
00447 typedef sender<T> predecessor_type;
00448 typedef receiver<T> successor_type;
00449
00450 reservable_predecessor_cache( ) : reserved_src(NULL) { }
00451
00452 bool
00453 try_reserve( output_type &v ) {
00454 bool msg = false;
00455
00456 do {
00457 {
00458 tbb::spin_mutex::scoped_lock lock( node_cache<predecessor_type>::my_mutex );
00459 if ( reserved_src || this->internal_empty() )
00460 return false;
00461
00462 reserved_src = &this->internal_pop();
00463 }
00464
00465
00466 msg = reserved_src->try_reserve( v );
00467
00468 if (msg == false) {
00469 tbb::spin_mutex::scoped_lock lock( node_cache<predecessor_type>::my_mutex );
00470
00471 reserved_src->register_successor( *this->my_owner );
00472 reserved_src = NULL;
00473 } else {
00474
00475 this->add( *reserved_src );
00476 }
00477 } while ( msg == false );
00478
00479 return msg;
00480 }
00481
00482 bool
00483 try_release( ) {
00484 reserved_src->try_release( );
00485 reserved_src = NULL;
00486 return true;
00487 }
00488
00489 bool
00490 try_consume( ) {
00491 reserved_src->try_consume( );
00492 reserved_src = NULL;
00493 return true;
00494 }
00495
00496 private:
00497
00498 predecessor_type *reserved_src;
00499
00500 };
00501
00502
00504 template<typename T, typename M=spin_rw_mutex >
00505 class successor_cache : no_copy {
00506 protected:
00507
00508 typedef M my_mutex_type;
00509 my_mutex_type my_mutex;
00510
00511 typedef std::list< receiver<T> * > my_successors_type;
00512 my_successors_type my_successors;
00513
00514 sender<T> *my_owner;
00515
00516 public:
00517
00518 successor_cache( ) : my_owner(NULL) {}
00519
00520 void set_owner( sender<T> *owner ) { my_owner = owner; }
00521
00522 virtual ~successor_cache() {}
00523
00524 void register_successor( receiver<T> &r ) {
00525 typename my_mutex_type::scoped_lock l(my_mutex, true);
00526 my_successors.push_back( &r );
00527 }
00528
00529 void remove_successor( receiver<T> &r ) {
00530 typename my_mutex_type::scoped_lock l(my_mutex, true);
00531 for ( typename my_successors_type::iterator i = my_successors.begin();
00532 i != my_successors.end(); ++i ) {
00533 if ( *i == & r ) {
00534 my_successors.erase(i);
00535 break;
00536 }
00537 }
00538 }
00539
00540 bool empty() {
00541 typename my_mutex_type::scoped_lock l(my_mutex, false);
00542 return my_successors.empty();
00543 }
00544
00545 virtual bool try_put( T t ) = 0;
00546 };
00547
00549 template<>
00550 class successor_cache< continue_msg > : no_copy {
00551 protected:
00552
00553 typedef spin_rw_mutex my_mutex_type;
00554 my_mutex_type my_mutex;
00555
00556 typedef std::list< receiver<continue_msg> * > my_successors_type;
00557 my_successors_type my_successors;
00558
00559 sender<continue_msg> *my_owner;
00560
00561 public:
00562
00563 successor_cache( ) : my_owner(NULL) {}
00564
00565 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
00566
00567 virtual ~successor_cache() {}
00568
00569 void register_successor( receiver<continue_msg> &r ) {
00570 my_mutex_type::scoped_lock l(my_mutex, true);
00571 my_successors.push_back( &r );
00572 if ( my_owner )
00573 r.register_predecessor( *my_owner );
00574 }
00575
00576 void remove_successor( receiver<continue_msg> &r ) {
00577 my_mutex_type::scoped_lock l(my_mutex, true);
00578 for ( my_successors_type::iterator i = my_successors.begin();
00579 i != my_successors.end(); ++i ) {
00580 if ( *i == & r ) {
00581 if ( my_owner )
00582 r.remove_predecessor( *my_owner );
00583 my_successors.erase(i);
00584 break;
00585 }
00586 }
00587 }
00588
00589 bool empty() {
00590 my_mutex_type::scoped_lock l(my_mutex, false);
00591 return my_successors.empty();
00592 }
00593
00594 virtual bool try_put( continue_msg t ) = 0;
00595
00596 };
00597
00599 template<typename T, typename M=spin_rw_mutex>
00600 class broadcast_cache : public successor_cache<T, M> {
00601 typedef M my_mutex_type;
00602 typedef std::list< receiver<T> * > my_successors_type;
00603
00604 public:
00605
00606 broadcast_cache( ) {}
00607
00608 bool try_put( T t ) {
00609 bool msg = false;
00610 bool upgraded = false;
00611 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00612 typename my_successors_type::iterator i = this->my_successors.begin();
00613 while ( i != this->my_successors.end() ) {
00614 if ( (*i)->try_put( t ) == true ) {
00615 ++i;
00616 msg = true;
00617 } else {
00618 if ( (*i)->register_predecessor(*this->my_owner) ) {
00619 if (!upgraded) {
00620 l.upgrade_to_writer();
00621 upgraded = true;
00622 }
00623 i = this->my_successors.erase(i);
00624 }
00625 else {
00626 ++i;
00627 }
00628 }
00629 }
00630 return msg;
00631 }
00632 };
00633
00635 template<typename T, typename M=spin_rw_mutex >
00636 class round_robin_cache : public successor_cache<T, M> {
00637 typedef size_t size_type;
00638 typedef M my_mutex_type;
00639 typedef std::list< receiver<T> * > my_successors_type;
00640
00641 public:
00642
00643 round_robin_cache( ) {}
00644
00645 size_type size() {
00646 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00647 return this->my_successors.size();
00648 }
00649
00650 bool try_put( T t ) {
00651 bool upgraded = false;
00652 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00653 typename my_successors_type::iterator i = this->my_successors.begin();
00654 while ( i != this->my_successors.end() ) {
00655 if ( (*i)->try_put( t ) ) {
00656 return true;
00657 } else {
00658 if ( (*i)->register_predecessor(*this->my_owner) ) {
00659 if (!upgraded) {
00660 l.upgrade_to_writer();
00661 upgraded = true;
00662 }
00663 i = this->my_successors.erase(i);
00664 }
00665 else {
00666 ++i;
00667 }
00668 }
00669 }
00670 return false;
00671 }
00672 };
00673
00674 template<typename T>
00675 class decrementer : public continue_receiver, internal::no_copy {
00676
00677 T *my_node;
00678
00679 void execute() {
00680 my_node->decrement_counter();
00681 }
00682
00683 public:
00684
00685 typedef continue_msg input_type;
00686 typedef continue_msg output_type;
00687 decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
00688 void set_owner( T *node ) { my_node = node; }
00689 };
00690
00691 }
00693
00694
00696
00697 class graph : internal::no_copy {
00698
00699 template< typename Body >
00700 class run_task : public task {
00701 public:
00702 run_task( Body& body ) : my_body(body) {}
00703 task *execute() {
00704 my_body();
00705 return NULL;
00706 }
00707 private:
00708 Body my_body;
00709 };
00710
00711 template< typename Receiver, typename Body >
00712 class run_and_put_task : public task {
00713 public:
00714 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00715 task *execute() {
00716 my_receiver.try_put( my_body() );
00717 return NULL;
00718 }
00719 private:
00720 Receiver &my_receiver;
00721 Body my_body;
00722 };
00723
00724 public:
00725
00727 enum concurrency { unlimited = 0, serial = 1 };
00728
00730 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00731 my_root_task->set_ref_count(1);
00732 }
00733
00735
00737 ~graph() {
00738 wait_for_all();
00739 my_root_task->set_ref_count(0);
00740 task::destroy( *my_root_task );
00741 }
00742
00743
00745
00747 void increment_wait_count() {
00748 if (my_root_task)
00749 my_root_task->increment_ref_count();
00750 }
00751
00753
00755 void decrement_wait_count() {
00756 if (my_root_task)
00757 my_root_task->decrement_ref_count();
00758 }
00759
00761
00763 template< typename Receiver, typename Body >
00764 void run( Receiver &r, Body body ) {
00765 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00766 run_and_put_task< Receiver, Body >( r, body ) );
00767 }
00768
00770
00772 template< typename Body >
00773 void run( Body body ) {
00774 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00775 run_task< Body >( body ) );
00776 }
00777
00779
00780 void wait_for_all() {
00781 if (my_root_task)
00782 my_root_task->wait_for_all();
00783 my_root_task->set_ref_count(1);
00784 }
00785
00787 task * root_task() {
00788 return my_root_task;
00789 }
00790
00791 private:
00792
00793 task *my_root_task;
00794
00795 };
00796
00797
00799 namespace internal {
00800
00802 template< typename Input, typename Output >
00803 class function_input : public receiver<Input>, no_assign {
00804
00805 typedef sender<Input> predecessor_type;
00806
00807 public:
00808
00810 typedef Input input_type;
00811
00813 typedef Output output_type;
00814
00816 template< typename Body >
00817 function_input( graph &g, size_t max_concurrency, Body& body )
00818 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(internal::node_state_idle),
00819 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
00820 my_predecessors.set_owner(this);
00821 }
00822
00824 virtual ~function_input() { delete my_body; }
00825
00827 virtual bool try_put( input_type t ) {
00828 if ( my_max_concurrency == 0 ) {
00829 spawn_body_task( t );
00830 return true;
00831 } else {
00832 tbb::spin_mutex::scoped_lock lock(my_mutex);
00833 if ( my_concurrency < my_max_concurrency ) {
00834 ++my_concurrency;
00835 spawn_body_task( t );
00836 return true;
00837 } else {
00838 return false;
00839 }
00840 }
00841 }
00842
00844 bool register_predecessor( predecessor_type &src ) {
00845 tbb::spin_mutex::scoped_lock lock(my_mutex);
00846 my_predecessors.add( src );
00847 if ( my_concurrency < my_max_concurrency ) {
00848 spawn_forward_task();
00849 }
00850 return true;
00851 }
00852
00854 bool remove_predecessor( predecessor_type &src ) {
00855 tbb::spin_mutex::scoped_lock lock(my_mutex);
00856 my_predecessors.remove(src);
00857 return true;
00858 }
00859
00860 protected:
00861
00862 tbb::spin_mutex my_mutex;
00863 task *my_root_task;
00864 const size_t my_max_concurrency;
00865 size_t my_concurrency;
00866 function_body<input_type, output_type> *my_body;
00867 predecessor_cache<input_type> my_predecessors;
00868
00869 friend class apply_body_task< function_input< input_type, output_type >, input_type >;
00870
00871 virtual broadcast_cache<output_type > &successors() = 0;
00872
00874 void apply_body( input_type &i ) {
00875 successors().try_put( (*my_body)(i) );
00876 if ( my_max_concurrency != 0 ) {
00877 tbb::spin_mutex::scoped_lock lock(my_mutex);
00878 --my_concurrency;
00879 if ( !my_predecessors.empty( ) ) {
00880 spawn_forward_task();
00881 }
00882 }
00883 }
00884
00885 friend class forward_task< function_input< input_type, output_type > >;
00886
00888 void forward( ) {
00889 __TBB_ASSERT( my_max_concurrency != 0, NULL );
00890 {
00891 tbb::spin_mutex::scoped_lock lock(my_mutex);
00892 if ( my_concurrency >= my_max_concurrency ) {
00893 return;
00894 }
00895 ++my_concurrency;
00896 }
00897
00898 input_type i;
00899
00900 if ( my_predecessors.get_item( i ) ) {
00901 apply_body( i );
00902 } else {
00903 tbb::spin_mutex::scoped_lock lock(my_mutex);
00904 --my_concurrency;
00905 if ( !my_predecessors.empty( ) )
00906 spawn_forward_task();
00907 }
00908 }
00909
00911 inline void spawn_body_task( input_type &input ) {
00912 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00913 apply_body_task< function_input< input_type, output_type >, input_type >( *this, input ) );
00914 }
00915
00917 inline void spawn_forward_task( ) {
00918 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00919 forward_task< function_input< input_type, output_type > >( *this ) );
00920 }
00921
00922 };
00923
00925 template< typename Output >
00926 class continue_input : public continue_receiver {
00927 public:
00928
00930 typedef continue_msg input_type;
00931
00933 typedef Output output_type;
00934
00935 template< typename Body >
00936 continue_input( graph &g, Body& body )
00937 : my_root_task(g.root_task()),
00938 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
00939
00940 template< typename Body >
00941 continue_input( graph &g, int number_of_predecessors, Body& body )
00942 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()),
00943 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
00944
00945 protected:
00946
00947 task *my_root_task;
00948 function_body<input_type, output_type> *my_body;
00949
00950 virtual broadcast_cache<output_type > &successors() = 0;
00951
00952 friend class apply_body_task< continue_input< Output >, continue_msg >;
00953
00955 void apply_body( input_type ) {
00956 successors().try_put( (*my_body)( continue_msg() ) );
00957 }
00958
00960 void execute( ) {
00961 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00962 apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) );
00963 }
00964 };
00965
00967 template< typename Output >
00968 class function_output : public sender<Output> {
00969 public:
00970
00971 typedef Output output_type;
00972
00973 function_output() { }
00974
00976 bool register_successor( receiver<output_type> &r ) {
00977 successors().register_successor( r );
00978 return true;
00979 }
00980
00982 bool remove_successor( receiver<output_type> &r ) {
00983 successors().remove_successor( r );
00984 return true;
00985 }
00986
00987 protected:
00988
00989 virtual broadcast_cache<output_type > &successors() = 0;
00990
00991 };
00992
00993 }
00995
00997 template < typename Output >
00998 class source_node : public graph_node, public sender< Output > {
00999 public:
01000
01002 typedef Output output_type;
01003
01005 typedef receiver< Output > successor_type;
01006
01008 template< typename Body >
01009 source_node( graph &g, Body body, bool is_active = true )
01010 : my_root_task(g.root_task()), my_state( is_active ? internal::node_state_idle : internal::node_state_inactive ),
01011 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
01012 my_reserved(false), my_has_cached_item(false) {
01013 my_successors.set_owner(this);
01014 }
01015
01017 ~source_node() { delete my_body; }
01018
01020 bool register_successor( receiver<output_type> &r ) {
01021 tbb::spin_mutex::scoped_lock lock(my_mutex);
01022 my_successors.register_successor(r);
01023 if ( my_state != internal::node_state_inactive )
01024 spawn_put();
01025 return true;
01026 }
01027
01029 bool remove_successor( receiver<output_type> &r ) {
01030 tbb::spin_mutex::scoped_lock lock(my_mutex);
01031 my_successors.remove_successor(r);
01032 return true;
01033 }
01034
01036 bool try_get( output_type &v ) {
01037 tbb::spin_mutex::scoped_lock lock(my_mutex);
01038 if ( my_reserved )
01039 return false;
01040
01041 if ( my_has_cached_item ) {
01042 v = my_cached_item;
01043 my_has_cached_item = false;
01044 } else if ( (*my_body)(v) == false ) {
01045 return false;
01046 }
01047 return true;
01048 }
01049
01051 bool try_reserve( output_type &v ) {
01052 tbb::spin_mutex::scoped_lock lock(my_mutex);
01053 if ( my_reserved ) {
01054 return false;
01055 }
01056
01057 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
01058 my_has_cached_item = true;
01059
01060 if ( my_has_cached_item ) {
01061 v = my_cached_item;
01062 my_reserved = true;
01063 return true;
01064 } else {
01065 return false;
01066 }
01067 }
01068
01070
01071 bool try_release( ) {
01072 tbb::spin_mutex::scoped_lock lock(my_mutex);
01073 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
01074 my_reserved = false;
01075 spawn_put();
01076 return true;
01077 }
01078
01080 bool try_consume( ) {
01081 tbb::spin_mutex::scoped_lock lock(my_mutex);
01082 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
01083 my_reserved = false;
01084 my_has_cached_item = false;
01085 if ( !my_successors.empty() ) {
01086 spawn_put();
01087 }
01088 return true;
01089 }
01090
01092 void activate() {
01093 tbb::spin_mutex::scoped_lock lock(my_mutex);
01094 my_state = internal::node_state_idle;
01095 if ( !my_successors.empty() )
01096 spawn_put();
01097 }
01098
01099 private:
01100
01101 task *my_root_task;
01102 tbb::spin_mutex my_mutex;
01103 internal::node_state my_state;
01104 internal::source_body<output_type> *my_body;
01105 internal::broadcast_cache< output_type > my_successors;
01106 bool my_reserved;
01107 bool my_has_cached_item;
01108 output_type my_cached_item;
01109
01110 friend class internal::source_task< source_node< output_type > >;
01111
01113 void apply_body( ) {
01114 output_type v;
01115 if ( try_reserve(v) == false )
01116 return;
01117
01118 if ( my_successors.try_put( v ) )
01119 try_consume();
01120 else
01121 try_release();
01122 }
01123
01125 void spawn_put( ) {
01126 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01127 internal::source_task< source_node< output_type > >( *this ) );
01128 }
01129
01130 };
01131
01133 template <typename Input, typename Output = continue_msg >
01134 class function_node : public graph_node, public internal::function_input<Input,Output>, public internal::function_output<Output> {
01135 public:
01136
01137 typedef Input input_type;
01138 typedef Output output_type;
01139 typedef sender< input_type > predecessor_type;
01140 typedef receiver< output_type > successor_type;
01141
01143 template< typename Body >
01144 function_node( graph &g, size_t concurrency, Body body )
01145 : internal::function_input<input_type,output_type>( g, concurrency, body ) {
01146 my_successors.set_owner(this);
01147 }
01148
01149 protected:
01150
01151 internal::broadcast_cache<output_type> my_successors;
01152 internal::broadcast_cache<output_type> &successors () { return my_successors; }
01153
01154 };
01155
01157 template <typename Output>
01158 class executable_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
01159 public:
01160
01161 typedef continue_msg input_type;
01162 typedef Output output_type;
01163 typedef sender< input_type > predecessor_type;
01164 typedef receiver< output_type > successor_type;
01165
01167 template <typename Body >
01168 executable_node( graph &g, Body body )
01169 : internal::continue_input<output_type>( g, body ) {
01170 my_successors.set_owner(this);
01171 }
01172
01174 template <typename Body >
01175 executable_node( graph &g, int number_of_predecessors, Body body )
01176 : internal::continue_input<output_type>( g, number_of_predecessors, body ) {
01177 my_successors.set_owner(this);
01178 }
01179
01180 protected:
01181
01182 internal::broadcast_cache<output_type> my_successors;
01183 internal::broadcast_cache<output_type> &successors () { return my_successors; }
01184
01185 };
01186
01187
01188
01189 template< typename T >
01190 class overwrite_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01191 public:
01192
01193 typedef T input_type;
01194 typedef T output_type;
01195 typedef sender< input_type > predecessor_type;
01196 typedef receiver< output_type > successor_type;
01197
01198 overwrite_node() : my_buffer_is_valid(false) {
01199 my_successors.set_owner( this );
01200 }
01201
01202 ~overwrite_node() {}
01203
01204 bool register_successor( successor_type &s ) {
01205 tbb::spin_mutex::scoped_lock l( my_mutex );
01206 my_successors.register_successor( s );
01207 return true;
01208 }
01209 bool remove_successor( successor_type &s ) {
01210 tbb::spin_mutex::scoped_lock l( my_mutex );
01211 my_successors.remove_successor(s);
01212 return true;
01213 }
01214
01215 bool try_put( T v ) {
01216 tbb::spin_mutex::scoped_lock l( my_mutex );
01217 my_buffer = v;
01218 my_buffer_is_valid = true;
01219 my_successors.try_put(v);
01220 return true;
01221 }
01222
01223 bool try_get( T &v ) {
01224 tbb::spin_mutex::scoped_lock l( my_mutex );
01225 if ( my_buffer_is_valid ) {
01226 v = my_buffer;
01227 return true;
01228 } else {
01229 return false;
01230 }
01231 }
01232
01233 bool is_valid() {
01234 tbb::spin_mutex::scoped_lock l( my_mutex );
01235 return my_buffer_is_valid;
01236 }
01237
01238 void clear() {
01239 tbb::spin_mutex::scoped_lock l( my_mutex );
01240 my_buffer_is_valid = false;
01241 }
01242
01243 protected:
01244
01245 tbb::spin_mutex my_mutex;
01246 internal::broadcast_cache< T, null_rw_mutex > my_successors;
01247 T my_buffer;
01248 bool my_buffer_is_valid;
01249
01250 };
01251
01252 template< typename T >
01253 class write_once_node : public overwrite_node<T> {
01254 public:
01255
01256 typedef T input_type;
01257 typedef T output_type;
01258 typedef sender< input_type > predecessor_type;
01259 typedef receiver< output_type > successor_type;
01260
01261 bool try_put( T v ) {
01262 tbb::spin_mutex::scoped_lock l( this->my_mutex );
01263 if ( this->my_buffer_is_valid ) {
01264 return false;
01265 } else {
01266 this->my_buffer = v;
01267 this->my_buffer_is_valid = true;
01268 this->my_successors.try_put(v);
01269 return true;
01270 }
01271 }
01272 };
01273
01275
01276 class continue_node : public executable_node< continue_msg > {
01277 public:
01278
01279 typedef continue_msg input_type;
01280 typedef continue_msg output_type;
01281 typedef sender< input_type > predecessor_type;
01282 typedef receiver< output_type > successor_type;
01283
01284 continue_node( graph &g ) : executable_node<continue_msg>( g, internal::empty_body< continue_msg, continue_msg>() ) {}
01285 };
01286
01288 template <typename T>
01289 class broadcast_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01290
01291 internal::broadcast_cache<T> my_successors;
01292
01293 public:
01294
01295 typedef T input_type;
01296 typedef T output_type;
01297 typedef sender< input_type > predecessor_type;
01298 typedef receiver< output_type > successor_type;
01299
01300 broadcast_node( ) {
01301 my_successors.set_owner( this );
01302 }
01303
01305 virtual bool register_successor( receiver<T> &r ) {
01306 my_successors.register_successor( r );
01307 return true;
01308 }
01309
01311 virtual bool remove_successor( receiver<T> &r ) {
01312 my_successors.remove_successor( r );
01313 return true;
01314 }
01315
01316 bool try_put( T t ) {
01317 my_successors.try_put(t);
01318 return true;
01319 }
01320
01321 };
01322
01323
01324
01325
01326 enum aggregator_operation_status { WAITING = 0, SUCCESS, FAILED };
01327
01329
01332 template <typename AggregatorOperation>
01333 class aggregator {
01334 public:
01335 aggregator() : handler_busy(false) { pending_operations = NULL; }
01336 virtual ~aggregator() {}
01337
01339 void insert_operation(AggregatorOperation *op) {
01340 AggregatorOperation *tmp = pending_operations, *res;
01341
01342 op->next = tmp;
01343 while ((res = pending_operations.compare_and_swap(op, tmp)) != tmp)
01344 op->next = tmp = res;
01345 if (!tmp) {
01346 start_handle_operations();
01347 __TBB_ASSERT(op->status, NULL);
01348 }
01349 else {
01350 tbb::internal::spin_wait_while_eq(op->status, (uintptr_t)WAITING);
01351 __TBB_load_with_acquire(op->status);
01352 }
01353 }
01354
01356
01357 virtual void handle_operations(AggregatorOperation *op_list) = 0;
01358
01359 private:
01361 tbb::atomic<AggregatorOperation *> pending_operations;
01363 bool handler_busy;
01364
01366 void start_handle_operations() {
01367 AggregatorOperation *op_list;
01368
01369
01370 tbb::internal::spin_wait_until_eq(handler_busy, false);
01371
01372 __TBB_store_with_release(handler_busy, true);
01373
01374 op_list = pending_operations.fetch_and_store(NULL);
01375
01376 handle_operations(op_list);
01377
01378 __TBB_store_with_release(handler_busy, false);
01379 }
01380 };
01381
01383 template <typename T>
01384 class buffer_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01385 public:
01386
01387 typedef T input_type;
01388 typedef T output_type;
01389 typedef sender< input_type > predecessor_type;
01390 typedef receiver< output_type > successor_type;
01391
01392 protected:
01393 typedef size_t size_type;
01394 typedef std::pair< T, bool > item_type;
01395
01396 internal::round_robin_cache< T, null_rw_mutex > my_successors;
01397
01398 task *my_parent;
01399 item_type *my_array;
01400 size_type my_array_size;
01401 static const size_type initial_buffer_size = 4;
01402 size_type my_head;
01403 size_type my_tail;
01404 tbb::spin_mutex my_mutex;
01405 bool my_reserved;
01406 size_type my_reserved_id;
01407
01408 enum operation_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
01409
01410
01411 class buffer_operation {
01412 public:
01413 char type;
01414 T *elem;
01415 uintptr_t status;
01416 successor_type *r;
01417 buffer_operation *next;
01418 buffer_operation(const T& e, operation_type t) :
01419 type(char(t)), elem(const_cast<T*>(&e)), status(WAITING),
01420 r(NULL), next(NULL) {}
01421 buffer_operation(operation_type t) :
01422 type(char(t)), status(WAITING), r(NULL), next(NULL) {}
01423 };
01424
01425 class my_agg_t : public aggregator<buffer_operation> {
01426 public:
01427 bool forwarder_busy;
01428 buffer_node<T> *parent;
01429 my_agg_t(buffer_node<T> *_p) : aggregator<buffer_operation>(),
01430 forwarder_busy(false), parent(_p) {}
01431 virtual void handle_operations(buffer_operation *op_list) {
01432 buffer_operation *tmp;
01433 bool try_forwarding=false;
01434 while (op_list) {
01435 tmp = op_list;
01436 op_list = op_list->next;
01437 switch (tmp->type) {
01438 case reg_succ: parent->my_successors.register_successor(*(tmp->r));
01439 __TBB_store_with_release(tmp->status, SUCCESS);
01440 try_forwarding = true; break;
01441 case rem_succ: parent->my_successors.remove_successor(*(tmp->r));
01442 __TBB_store_with_release(tmp->status, SUCCESS); break;
01443 case req_item: parent->internal_pop(tmp); break;
01444 case res_item: parent->internal_reserve(tmp); break;
01445 case rel_res: parent->my_array[parent->my_head&(parent->my_array_size-1)].second = true;
01446 parent->my_reserved = false;
01447 __TBB_store_with_release(tmp->status, SUCCESS);
01448 try_forwarding = true; break;
01449 case con_res: parent->internal_consume(tmp); try_forwarding = true; break;
01450 case put_item: parent->internal_push(tmp); try_forwarding = true; break;
01451 case try_fwd: parent->internal_forward(tmp); break;
01452 }
01453 }
01454 if (try_forwarding && !forwarder_busy) {
01455 forwarder_busy = true;
01456 task::enqueue( * new ( task::allocate_additional_child_of( *(parent->my_parent) ) )
01457 internal::forward_task< buffer_node<input_type> >( *parent ) );
01458 }
01459 }
01460 };
01461
01462 my_agg_t *my_aggregator;
01463
01464 friend class internal::forward_task< buffer_node< T > >;
01465
01467 virtual void forward() {
01468 buffer_operation op_data(try_fwd);
01469 do {
01470 op_data.status = WAITING;
01471 my_aggregator->insert_operation(&op_data);
01472 } while (op_data.status == SUCCESS);
01473 }
01474
01476 virtual void internal_forward(buffer_operation *op) {
01477 T i_copy;
01478 bool success = false;
01479 size_type counter = my_successors.size();
01480
01481 while (counter>0 && my_tail>my_head && my_array[ (my_tail-1) & (my_array_size-1)].second == true ) {
01482 i_copy = my_array[ (my_tail-1) & (my_array_size-1)].first;
01483 bool msg = my_successors.try_put(i_copy);
01484 if ( msg == true ) {
01485 my_array[ (my_tail-1) & (my_array_size-1)].second = false;
01486 --my_tail;
01487 success = true;
01488 }
01489 --counter;
01490 }
01491 if (success && !counter)
01492 __TBB_store_with_release(op->status, SUCCESS);
01493 else {
01494 __TBB_store_with_release(op->status, FAILED);
01495 my_aggregator->forwarder_busy = false;
01496 }
01497
01498 }
01499
01500 virtual void internal_push(buffer_operation *op) {
01501 while( my_tail-my_head >= my_array_size ) {
01502 grow_my_array( my_tail - my_head + 1 );
01503 }
01504 my_array[my_tail&(my_array_size-1)] = std::make_pair( *(op->elem), true );
01505 ++my_tail;
01506 __TBB_store_with_release(op->status, SUCCESS);
01507 }
01508 virtual void internal_pop(buffer_operation *op) {
01509 if ( my_array[(my_tail-1) & (my_array_size-1)].second == false ) {
01510 __TBB_store_with_release(op->status, FAILED);
01511 }
01512 else {
01513 *(op->elem) = my_array[(my_tail-1) & (my_array_size-1)].first;
01514 my_array[(my_tail-1) & (my_array_size-1)].second = false;
01515 --my_tail;
01516 __TBB_store_with_release(op->status, SUCCESS);
01517 }
01518 }
01519 virtual void internal_reserve(buffer_operation *op) {
01520 if (my_reserved == true || my_array[ my_head & (my_array_size-1)].second == false ) {
01521 __TBB_store_with_release(op->status, FAILED);
01522 }
01523 else {
01524 my_reserved = true;
01525 *(op->elem) = my_array[ my_head & (my_array_size-1)].first;
01526 my_array[ my_head & (my_array_size-1)].second = false;
01527 __TBB_store_with_release(op->status, SUCCESS);
01528 }
01529 }
01530 virtual void internal_consume(buffer_operation *op) {
01531 my_reserved = false;
01532 ++my_head;
01533 __TBB_store_with_release(op->status, SUCCESS);
01534 }
01535
01537 void grow_my_array( size_t minimum_size ) {
01538 size_type old_size = my_array_size;
01539 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
01540 while( new_size<minimum_size )
01541 new_size*=2;
01542
01543 item_type* new_array = cache_aligned_allocator<item_type>().allocate(new_size);
01544 item_type* old_array = my_array;
01545
01546 for( size_type i=0; i<new_size; ++i )
01547 new_array[i].second = false;
01548
01549 size_t t=my_head;
01550 for( size_type i=0; i<old_size; ++i, ++t )
01551 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
01552 my_array = new_array;
01553 my_array_size = new_size;
01554 if( old_array )
01555 cache_aligned_allocator<item_type>().deallocate(old_array,old_size);
01556 }
01557
01558 public:
01560 buffer_node( graph &g ) : my_parent( g.root_task() ), my_array(NULL), my_array_size(0), my_head(0), my_tail(0), my_reserved(false) {
01561 my_successors.set_owner(this);
01562 grow_my_array(initial_buffer_size);
01563 my_aggregator = new my_agg_t(this);
01564 }
01565
01566
01567
01568
01569
01571
01572 bool register_successor( receiver<output_type> &r ) {
01573 buffer_operation op_data(reg_succ);
01574 op_data.r = &r;
01575 my_aggregator->insert_operation(&op_data);
01576 return true;
01577 }
01578
01580
01582 bool remove_successor( receiver<output_type> &r ) {
01583 r.remove_predecessor(*this);
01584 buffer_operation op_data(rem_succ);
01585 op_data.r = &r;
01586 my_aggregator->insert_operation(&op_data);
01587 return true;
01588 }
01589
01591
01593 bool try_get( T &v ) {
01594 buffer_operation op_data(req_item);
01595 op_data.elem = &v;
01596 my_aggregator->insert_operation(&op_data);
01597 if (op_data.status==SUCCESS) return true;
01598 return false;
01599 }
01600
01602
01604 bool try_reserve( T &v ) {
01605 buffer_operation op_data(res_item);
01606 op_data.elem = &v;
01607 my_aggregator->insert_operation(&op_data);
01608 if (op_data.status==SUCCESS) return true;
01609 return false;
01610 }
01611
01613
01614 bool try_release() {
01615 buffer_operation op_data(rel_res);
01616 my_aggregator->insert_operation(&op_data);
01617 return true;
01618 }
01619
01621
01622 bool try_consume() {
01623 buffer_operation op_data(con_res);
01624 my_aggregator->insert_operation(&op_data);
01625 return true;
01626 }
01627
01629
01630 bool try_put(T t) {
01631 buffer_operation op_data(t, put_item);
01632 my_aggregator->insert_operation(&op_data);
01633 return true;
01634 }
01635 };
01636
01637
01639 template <typename T>
01640 class queue_node : public buffer_node<T> {
01641 protected:
01643 void internal_forward(typename buffer_node<T>::buffer_operation *op) {
01644 T i_copy;
01645 bool success = false;
01646 typename buffer_node<T>::size_type counter = this->my_successors.size();
01647 if (this->my_reserved || this->my_array[ this->my_head & (this->my_array_size-1)].second == false) {
01648 __TBB_store_with_release(op->status, FAILED);
01649 buffer_node<T>::my_aggregator->forwarder_busy = false;
01650 return;
01651 }
01652
01653 while (counter>0 && this->my_array[ this->my_head & (this->my_array_size-1)].second == true ) {
01654 i_copy = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01655 bool msg = this->my_successors.try_put(i_copy);
01656 if ( msg == true ) {
01657 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01658 ++(this->my_head);
01659 success = true;
01660 }
01661 --counter;
01662 }
01663 if (success && !counter)
01664 __TBB_store_with_release(op->status, SUCCESS);
01665 else {
01666 __TBB_store_with_release(op->status, FAILED);
01667 buffer_node<T>::my_aggregator->forwarder_busy = false;
01668 }
01669 }
01670
01671 void internal_pop(typename buffer_node<T>::buffer_operation *op) {
01672 if ( this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01673 __TBB_store_with_release(op->status, FAILED);
01674 }
01675 else {
01676 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01677 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01678 ++(this->my_head);
01679 __TBB_store_with_release(op->status, SUCCESS);
01680 }
01681 }
01682 void internal_reserve(typename buffer_node<T>::buffer_operation *op) {
01683 if (this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01684 __TBB_store_with_release(op->status, FAILED);
01685 }
01686 else {
01687 this->my_reserved = true;
01688 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01689 __TBB_store_with_release(op->status, SUCCESS);
01690 }
01691 }
01692 void internal_consume(typename buffer_node<T>::buffer_operation *op) {
01693 this->my_reserved = false;
01694 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01695 ++(this->my_head);
01696 __TBB_store_with_release(op->status, SUCCESS);
01697 }
01698
01699 public:
01700
01701 typedef T input_type;
01702 typedef T output_type;
01703 typedef sender< input_type > predecessor_type;
01704 typedef receiver< output_type > successor_type;
01705
01707 queue_node( graph &g ) : buffer_node<T>(g) {}
01708 };
01709
01711 template< typename T >
01712 class sequencer_node : public queue_node<T> {
01713 internal::function_body< T, size_t > *my_sequencer;
01714 public:
01715
01716 typedef T input_type;
01717 typedef T output_type;
01718 typedef sender< input_type > predecessor_type;
01719 typedef receiver< output_type > successor_type;
01720
01722 template< typename Sequencer >
01723 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g), my_sequencer( new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01724
01726 ~sequencer_node() { delete my_sequencer; }
01727
01728 private:
01729 void internal_push(typename buffer_node<T>::buffer_operation *op) {
01730 typename buffer_node<T>::size_type tag = (*my_sequencer)(*(op->elem));
01731
01732 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01733 while ( this->my_tail - this->my_head >= this->my_array_size ) {
01734 this->grow_my_array( this->my_tail - this->my_head + 1);
01735 }
01736 this->my_array[tag&(this->my_array_size-1)] = std::make_pair( *(op->elem), true );
01737 __TBB_store_with_release(op->status, SUCCESS);
01738 }
01739 };
01740
01742 template< typename T, typename Compare = std::less<T> >
01743 class priority_queue_node : public queue_node<T> {
01744 public:
01745
01746 typedef T input_type;
01747 typedef T output_type;
01748 typedef sender< input_type > predecessor_type;
01749 typedef receiver< output_type > successor_type;
01750
01751 typedef size_t size_type;
01753 priority_queue_node( graph &g ) : queue_node<T>(g), mark(0) {
01754 delete buffer_node<T>::my_aggregator;
01755 buffer_node<T>::my_aggregator = new my_prio_agg_t(this);
01756 }
01758 ~priority_queue_node() {}
01759
01760 private:
01761 typedef typename buffer_node<T>::buffer_operation prio_operation;
01762 typedef typename buffer_node<T>::my_agg_t buffer_aggregator;
01763 typedef typename buffer_node<T>::item_type item_type;
01764 Compare compare;
01765 size_type mark;
01766 input_type reserved_item;
01767 class my_prio_agg_t : public buffer_aggregator {
01768 public:
01769 my_prio_agg_t(priority_queue_node<T, Compare> *_p) :
01770 buffer_aggregator(_p) {}
01771 virtual void handle_operations(prio_operation *op_list) {
01772 prio_operation *tmp ;
01773 bool try_forwarding=false;
01774 priority_queue_node<T, Compare> *pa = dynamic_cast<priority_queue_node<T, Compare> *>(buffer_aggregator::parent);
01775 while (op_list) {
01776 tmp = op_list;
01777 op_list = op_list->next;
01778 switch (tmp->type) {
01779 case buffer_node<T>::reg_succ:
01780 pa->my_successors.register_successor(*(tmp->r));
01781 __TBB_store_with_release(tmp->status, SUCCESS);
01782 try_forwarding = true; break;
01783 case buffer_node<T>::rem_succ:
01784 pa->my_successors.remove_successor(*(tmp->r));
01785 __TBB_store_with_release(tmp->status, SUCCESS); break;
01786 case buffer_node<T>::put_item:
01787 pa->internal_push(tmp);
01788 try_forwarding = true; break;
01789 case buffer_node<T>::try_fwd:
01790 pa->internal_forward(tmp);
01791 break;
01792 case buffer_node<T>::rel_res:
01793 pa->internal_release(tmp);
01794 try_forwarding = true; break;
01795 case buffer_node<T>::con_res:
01796 pa->internal_consume(tmp);
01797 try_forwarding = true; break;
01798 case buffer_node<T>::req_item:
01799 pa->internal_pop(tmp);
01800 break;
01801 case buffer_node<T>::res_item:
01802 pa->internal_reserve(tmp);
01803 break;
01804 }
01805 }
01806
01807 if (pa->mark<pa->my_tail) pa->heapify();
01808 if (try_forwarding && !this->forwarder_busy) {
01809 this->forwarder_busy = true;
01810 task::enqueue( * new ( task::allocate_additional_child_of( *(pa->my_parent) ) )
01811 internal::forward_task< buffer_node<input_type> >(*pa) );
01812 }
01813 }
01814 };
01815
01816
01818 void internal_forward(prio_operation *op) {
01819 T i_copy;
01820 bool success = false;
01821 size_type counter = this->my_successors.size();
01822 if (this->my_reserved || this->my_tail == 0) {
01823 __TBB_store_with_release(op->status, FAILED);
01824 this->my_aggregator->forwarder_busy = false;
01825 return;
01826 }
01827
01828 while (counter>0 && this->my_tail > 0) {
01829 i_copy = this->my_array[0].first;
01830 bool msg = this->my_successors.try_put(i_copy);
01831 if ( msg == true ) {
01832 if (mark == this->my_tail) --mark;
01833 --(this->my_tail);
01834 this->my_array[0].first=this->my_array[this->my_tail].first;
01835 if (this->my_tail > 1)
01836 reheap();
01837 success = true;
01838 }
01839 --counter;
01840 }
01841 if (success && !counter)
01842 __TBB_store_with_release(op->status, SUCCESS);
01843 else {
01844 __TBB_store_with_release(op->status, FAILED);
01845 this->my_aggregator->forwarder_busy = false;
01846 }
01847 }
01848
01849 void internal_push(prio_operation *op) {
01850 if ( this->my_tail >= this->my_array_size )
01851 this->grow_my_array( this->my_tail + 1 );
01852 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01853 ++(this->my_tail);
01854 __TBB_store_with_release(op->status, SUCCESS);
01855 }
01856 void internal_pop(prio_operation *op) {
01857 if ( this->my_reserved == true || this->my_tail == 0 ) {
01858 __TBB_store_with_release(op->status, FAILED);
01859 }
01860 else {
01861 if (mark<this->my_tail &&
01862 compare(this->my_array[0].first,
01863 this->my_array[this->my_tail-1].first)) {
01864
01865
01866 *(op->elem) = this->my_array[this->my_tail-1].first;
01867 --(this->my_tail);
01868 __TBB_store_with_release(op->status, SUCCESS);
01869 }
01870 else {
01871 *(op->elem) = this->my_array[0].first;
01872 if (mark == this->my_tail) --mark;
01873 --(this->my_tail);
01874 __TBB_store_with_release(op->status, SUCCESS);
01875 this->my_array[0].first=this->my_array[this->my_tail].first;
01876 if (this->my_tail > 1)
01877 reheap();
01878 }
01879 }
01880 }
01881 void internal_reserve(prio_operation *op) {
01882 if (this->my_reserved == true || this->my_tail == 0) {
01883 __TBB_store_with_release(op->status, FAILED);
01884 }
01885 else {
01886 this->my_reserved = true;
01887 *(op->elem) = reserved_item = this->my_array[0].first;
01888 if (mark == this->my_tail) --mark;
01889 --(this->my_tail);
01890 __TBB_store_with_release(op->status, SUCCESS);
01891 this->my_array[0].first = this->my_array[this->my_tail].first;
01892 if (this->my_tail > 1)
01893 reheap();
01894 }
01895 }
01896 void internal_consume(prio_operation *op) {
01897 this->my_reserved = false;
01898 __TBB_store_with_release(op->status, SUCCESS);
01899 }
01900 void internal_release(prio_operation *op) {
01901 if (this->my_tail >= this->my_array_size)
01902 this->grow_my_array( this->my_tail + 1 );
01903 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01904 ++(this->my_tail);
01905 this->my_reserved = false;
01906 __TBB_store_with_release(op->status, SUCCESS);
01907 heapify();
01908 }
01909
01910 void heapify() {
01911 if (!mark) mark = 1;
01912 for (; mark<this->my_tail; ++mark) {
01913 size_type cur_pos = mark;
01914 input_type to_place = this->my_array[mark].first;
01915 do {
01916 size_type parent = (cur_pos-1)>>1;
01917 if (!compare(this->my_array[parent].first, to_place)) break;
01918 this->my_array[cur_pos].first = this->my_array[parent].first;
01919 cur_pos = parent;
01920 } while( cur_pos );
01921 this->my_array[cur_pos].first = to_place;
01922 }
01923 }
01924
01925 void reheap() {
01926 size_type cur_pos=0, child=1;
01927 while (child < mark) {
01928 size_type target = child;
01929 if (child+1<mark &&
01930 compare(this->my_array[child].first,
01931 this->my_array[child+1].first))
01932 ++target;
01933
01934 if (compare(this->my_array[target].first,
01935 this->my_array[this->my_tail].first))
01936 break;
01937 this->my_array[cur_pos].first = this->my_array[target].first;
01938 cur_pos = target;
01939 child = (cur_pos<<1)+1;
01940 }
01941 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01942 }
01943 };
01944
01946
01949 template< typename T >
01950 class limiter_node : public graph_node, public receiver< T >, public sender< T >, internal::no_copy {
01951 public:
01952
01953 typedef T input_type;
01954 typedef T output_type;
01955 typedef sender< input_type > predecessor_type;
01956 typedef receiver< output_type > successor_type;
01957
01958 private:
01959
01960 tbb::task *my_root_task;
01961 size_t my_threshold;
01962 size_t my_count;
01963 internal::predecessor_cache< T > my_predecessors;
01964 tbb::spin_mutex my_mutex;
01965 internal::broadcast_cache< T > my_successors;
01966
01967 friend class internal::forward_task< limiter_node<T> >;
01968
01969
01970 friend class internal::decrementer< limiter_node<T> >;
01971
01972 void decrement_counter() {
01973 input_type v;
01974
01975
01976 if ( my_predecessors.get_item( v ) == false
01977 || my_successors.try_put(v) == false ) {
01978 tbb::spin_mutex::scoped_lock lock(my_mutex);
01979 --my_count;
01980 if ( !my_predecessors.empty() )
01981 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01982 internal::forward_task< limiter_node<T> >( *this ) );
01983 }
01984 }
01985
01986 void forward() {
01987 {
01988 tbb::spin_mutex::scoped_lock lock(my_mutex);
01989 if ( my_count < my_threshold )
01990 ++my_count;
01991 else
01992 return;
01993 }
01994 decrement_counter();
01995 }
01996
01997 public:
01998
02000 internal::decrementer< limiter_node<T> > decrement;
02001
02003 limiter_node( graph &g, size_t threshold, int number_of_decrement_predecessors = 0 ) :
02004 my_root_task(g.root_task()), my_threshold(threshold), my_count(0), decrement(number_of_decrement_predecessors) {
02005 my_predecessors.set_owner(this);
02006 my_successors.set_owner(this);
02007 decrement.set_owner(this);
02008 }
02009
02011 bool register_successor( receiver<output_type> &r ) {
02012 my_successors.register_successor(r);
02013 return true;
02014 }
02015
02017
02018 bool remove_successor( receiver<output_type> &r ) {
02019 r.remove_predecessor(*this);
02020 my_successors.remove_successor(r);
02021 return true;
02022 }
02023
02025 bool try_put( T t ) {
02026 {
02027 tbb::spin_mutex::scoped_lock lock(my_mutex);
02028 if ( my_count >= my_threshold )
02029 return false;
02030 else
02031 ++my_count;
02032 }
02033
02034 bool msg = my_successors.try_put(t);
02035
02036 if ( msg != true ) {
02037 tbb::spin_mutex::scoped_lock lock(my_mutex);
02038 --my_count;
02039 if ( !my_predecessors.empty() )
02040 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02041 internal::forward_task< limiter_node<T> >( *this ) );
02042 }
02043
02044 return msg;
02045 }
02046
02048 bool register_predecessor( predecessor_type &src ) {
02049 tbb::spin_mutex::scoped_lock lock(my_mutex);
02050 my_predecessors.add( src );
02051 if ( my_count < my_threshold && !my_successors.empty() )
02052 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02053 internal::forward_task< limiter_node<T> >( *this ) );
02054 return true;
02055 }
02056
02058 bool remove_predecessor( predecessor_type &src ) {
02059 my_predecessors.remove( src );
02060 return true;
02061 }
02062
02063 };
02064
02065 namespace internal {
02066
02067 struct forwarding_base {
02068 virtual ~forwarding_base() {}
02069 virtual void decrement_port_count() = 0;
02070 virtual void increment_port_count() = 0;
02071 };
02072
02073 template< int N >
02074 struct join_helper {
02075
02076 template< typename TupleType, typename PortType >
02077 static inline void set_port_counter(TupleType &my_input, PortType *port) {
02078 std::get<N-1>( my_input ).set_port_counter(port);
02079 join_helper<N-1>::set_port_counter( my_input, port );
02080 }
02081 template< typename TupleType >
02082 static inline void consume_reservations( TupleType &my_input ) {
02083 std::get<N-1>( my_input ).consume();
02084 join_helper<N-1>::consume_reservations( my_input );
02085 }
02086
02087 template< typename TupleType >
02088 static inline void release_my_reservation( TupleType &my_input ) {
02089 std::get<N-1>( my_input ).release();
02090 }
02091
02092 template <typename TupleType>
02093 static inline void release_reservations( TupleType &my_input) {
02094 join_helper<N-1>::release_reservations(my_input);
02095 release_my_reservation(my_input);
02096 }
02097
02098 template< typename InputTuple, typename OutputTuple >
02099 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02100 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
02101 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
02102 release_my_reservation( my_input );
02103 return false;
02104 }
02105 return true;
02106 }
02107 };
02108
02109 template< >
02110 struct join_helper<1> {
02111
02112 template< typename TupleType, typename PortType >
02113 static inline void set_port_counter(TupleType &my_input, PortType *port) {
02114 std::get<0>( my_input ).set_port_counter(port);
02115 }
02116
02117 template< typename TupleType >
02118 static inline void consume_reservations( TupleType &my_input ) {
02119 std::get<0>( my_input ).consume();
02120 }
02121
02122 template< typename TupleType >
02123 static inline void release_my_reservation( TupleType &my_input ) {
02124 std::get<0>( my_input ).release();
02125 }
02126
02127 template<typename TupleType>
02128 static inline void release_reservations( TupleType &my_input) {
02129 release_my_reservation(my_input);
02130 }
02131
02132 template< typename InputTuple, typename OutputTuple >
02133 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02134 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
02135 }
02136
02137 };
02138
02140 template< typename T, typename JoinNodeType >
02141 class join_port : public receiver<T> {
02142 public:
02143
02145 typedef T input_type;
02146
02148 typedef sender<T> predecessor_type;
02149
02151 join_port() : my_join(NULL), reserved(false) {
02152 my_predecessors.set_owner( this );
02153 }
02154
02155
02156 join_port(const join_port& ) : receiver<T>() {
02157 my_join = NULL;
02158 reserved = false;
02159 my_predecessors.set_owner( this );
02160 }
02161
02162 void set_port_counter( forwarding_base *join) {
02163 my_join = join;
02164 }
02165
02167
02169 bool try_put( T ) {
02170 return false;
02171 }
02172
02174 bool register_predecessor( sender<T> &src ) {
02175 tbb::spin_mutex::scoped_lock l(my_mutex);
02176 bool no_predecessors = my_predecessors.empty();
02177 my_predecessors.add(src);
02178 if ( no_predecessors ) {
02179 my_join->decrement_port_count( );
02180 }
02181 return true;
02182 }
02183
02185 bool remove_predecessor( sender<T> &src ) {
02186 tbb::spin_mutex::scoped_lock l(my_mutex);
02187 my_predecessors.remove( src );
02188 return true;
02189 }
02190
02192 bool reserve( T &v ) {
02193 tbb::spin_mutex::scoped_lock l(my_mutex);
02194 if ( reserved ) {
02195 return false;
02196 }
02197 if ( my_predecessors.try_reserve( v ) ) {
02198 reserved = true;
02199 return true;
02200 } else if ( my_predecessors.empty() ) {
02201
02202 my_join->increment_port_count();
02203 }
02204 return false;
02205 }
02206
02208 void release( ) {
02209 tbb::spin_mutex::scoped_lock l(my_mutex);
02210 reserved = false;
02211 my_predecessors.try_release( );
02212 }
02213
02215 void consume( ) {
02216 tbb::spin_mutex::scoped_lock l(my_mutex);
02217 reserved = false;
02218 my_predecessors.try_consume( );
02219 }
02220
02221
02222 private:
02223
02224 tbb::spin_mutex my_mutex;
02225 forwarding_base *my_join;
02226 reservable_predecessor_cache< T > my_predecessors;
02227 bool reserved;
02228
02229 };
02230
02231 template< int N, typename InputTuple, typename OutputTuple >
02232 class join_node_base : public graph_node, public sender< OutputTuple >, public forwarding_base, no_copy {
02233 public:
02234
02235 typedef InputTuple input_tuple_type;
02236 typedef OutputTuple output_type;
02237
02238
02240 typedef receiver< output_type > successor_type;
02241
02243 join_node_base( graph &g ) : my_root_task(g.root_task()) {
02244 ports_with_no_inputs = N;
02245 join_helper<N>::set_port_counter(my_input, this);
02246 my_successors.set_owner(this);
02247 }
02248
02250 bool register_successor( successor_type &r ) {
02251 tbb::spin_mutex::scoped_lock lock(my_mutex);
02252 my_successors.register_successor(r);
02253 if ( ports_with_no_inputs == 0 )
02254 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02255 forward_task< join_node_base< N, InputTuple, OutputTuple > >( *this ) );
02256 return true;
02257 }
02258
02260 bool remove_successor( successor_type &r ) {
02261 tbb::spin_mutex::scoped_lock lock(my_mutex);
02262 my_successors.remove_successor(r);
02263 return true;
02264 }
02265
02267
02268 bool try_get( output_type &v ) {
02269 tbb::spin_mutex::scoped_lock l(my_mutex);
02270 if ( try_to_make_tuple( v ) ) {
02271 join_helper<N>::consume_reservations(my_input);
02272 return true;
02273 } else {
02274 return false;
02275 }
02276 }
02277
02278 protected:
02279 input_tuple_type my_input;
02280 private:
02281
02282 task *my_root_task;
02283 tbb::spin_mutex my_mutex;
02284 atomic<size_t> ports_with_no_inputs;
02285 broadcast_cache< output_type, null_rw_mutex > my_successors;
02286
02287 friend class forward_task< join_node_base< N, InputTuple, OutputTuple > >;
02288
02289 template< typename A, typename B > friend class join_port;
02290
02292 void forward() {
02293 tbb::spin_mutex::scoped_lock l(my_mutex);
02294 output_type out;
02295 bool msg = false;
02296 size_t pi = ports_with_no_inputs;
02297 if ( pi != 0 ) {
02298 return;
02299 }
02300
02301 while ( try_to_make_tuple( out ) ) {
02302 msg = my_successors.try_put( out );
02303 if ( msg == false ) {
02304 join_helper<N>::release_reservations(my_input);
02305 return;
02306 } else {
02307 join_helper<N>::consume_reservations(my_input);
02308 }
02309 }
02310 return;
02311 }
02312
02314 void increment_port_count() {
02315 ++ports_with_no_inputs;
02316 }
02317
02319 void decrement_port_count() {
02320 if ( (ports_with_no_inputs.fetch_and_decrement() - 1) == 0 ) {
02321 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02322 forward_task< join_node_base< N, InputTuple, OutputTuple > >( *this ) );
02323 }
02324 }
02325
02326 bool try_to_make_tuple( output_type &out ) {
02327 size_t pi = ports_with_no_inputs;
02328 if ( pi != 0 ) {
02329 return false;
02330 }
02331 return join_helper<N>::reserve(my_input, out);
02332 }
02333
02334 };
02335
02336 class null_element { };
02337
02338 }
02339
02340 template< typename T0, typename T1, typename T2 = internal::null_element, typename T3 = internal::null_element >
02341 class join_node
02342 : public internal::join_node_base< 4,
02343 std::tuple< internal::join_port< T0, join_node< T0, T1, T2, T3 > >,
02344 internal::join_port< T1, join_node< T0, T1, T2, T3 > >,
02345 internal::join_port< T2, join_node< T0, T1, T2, T3 > >,
02346 internal::join_port< T3, join_node< T0, T1, T2, T3 > > >,
02347 std::tuple< T0, T1, T2, T3 > >
02348 {
02349 public:
02350
02351 typedef std::tuple< internal::join_port< T0, join_node< T0, T1, T2, T3 > >,
02352 internal::join_port< T1, join_node< T0, T1, T2, T3 > >,
02353 internal::join_port< T2, join_node< T0, T1, T2, T3 > >,
02354 internal::join_port< T3, join_node< T0, T1, T2, T3 > > > port_tuple_type;
02355 typedef std::tuple< T0, T1, T2, T3 > output_type;
02356 typedef typename internal::join_node_base<4, port_tuple_type, output_type> base_type;
02357 typedef receiver< output_type > successor_type;
02358
02359 join_node( graph &g ) : internal::join_node_base< 4, port_tuple_type, output_type >( g ) {}
02360
02361 port_tuple_type& inputs() { return this->my_input; }
02362
02363 };
02364
02365 template< typename T0, typename T1, typename T2 >
02366 class join_node<T0,T1,T2,internal::null_element>
02367 : public internal::join_node_base< 3,
02368 std::tuple< internal::join_port< T0, join_node< T0, T1, T2 > >,
02369 internal::join_port< T1, join_node< T0, T1, T2 > >,
02370 internal::join_port< T2, join_node< T0, T1, T2 > > >,
02371 std::tuple< T0, T1, T2 > > {
02372 public:
02373
02374 typedef std::tuple< internal::join_port< T0, join_node< T0, T1, T2 > >,
02375 internal::join_port< T1, join_node< T0, T1, T2 > >,
02376 internal::join_port< T2, join_node< T0, T1, T2 > > > port_tuple_type;
02377 typedef std::tuple< T0, T1, T2 > output_type;
02378 typedef receiver< output_type > successor_type;
02379
02380 join_node( graph &g ) : internal::join_node_base< 3, port_tuple_type, output_type >( g ) {}
02381 port_tuple_type& inputs() { return this->my_input; }
02382
02383 };
02384
02385 template< typename T0, typename T1 >
02386 class join_node<T0,T1,internal::null_element,internal::null_element>
02387 : public internal::join_node_base< 2,
02388 std::tuple< internal::join_port< T0, join_node< T0, T1 > >,
02389 internal::join_port< T1, join_node< T0, T1 > > >,
02390 std::tuple< T0, T1 > > {
02391
02392 public:
02393
02394
02395 typedef std::tuple< internal::join_port< T0, join_node< T0, T1 > >,
02396 internal::join_port< T1, join_node< T0, T1 > > > port_tuple_type;
02397 typedef std::tuple< T0, T1 > output_type;
02398 typedef receiver< output_type > successor_type;
02399
02400 join_node( graph &g ) : internal::join_node_base< 2, port_tuple_type, output_type >( g ) {}
02401 port_tuple_type& inputs() { return this->my_input; }
02402
02403 };
02404
02405
02406
02407
02408
02410 template< typename T >
02411 inline void make_edge( sender<T> &p, receiver<T> &s ) {
02412 p.register_successor( s );
02413 }
02414
02416 template< typename T, typename SIterator >
02417 inline void make_edges( sender<T> &p, SIterator s_begin, SIterator s_end ) {
02418 for ( SIterator i = s_begin; i != s_end; ++i ) {
02419 make_edge( p, **i );
02420 }
02421 }
02422
02424 template< typename T, typename PIterator >
02425 inline void make_edges( PIterator p_begin, PIterator p_end, receiver<T> &s ) {
02426 for ( PIterator i = p_begin; i != p_end; ++i ) {
02427 make_edge( **i, s );
02428 }
02429 }
02430
02431 }
02432
02433 #endif
02434