pipeline.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028 
00029 namespace tbb {
00030 
00031 class pipeline;
00032 class filter;
00033 
00035 namespace internal {
00036 
00037 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00038 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00039 
00040 typedef unsigned long Token;
00041 typedef long tokendiff_t;
00042 class stage_task;
00043 class input_buffer;
00044 class pipeline_root_task;
00045 class pipeline_cleaner;
00046 
00047 } // namespace internal
00048 
00049 namespace interface5 {
00050     template<typename T, typename U> class filter_t;
00051 
00052     namespace internal {
00053         class pipeline_proxy;
00054     }
00055 }
00056 
00058 
00060 
00061 class filter: internal::no_copy {
00062 private:
00064     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00065     
00067     static const unsigned char filter_is_serial = 0x1; 
00068 
00070 
00072     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00073 
00075     static const unsigned char filter_is_bound = 0x1<<5;  
00076 
00078     static const unsigned char exact_exception_propagation =
00079 #if TBB_USE_CAPTURED_EXCEPTION
00080             0x0;
00081 #else
00082             0x1<<7;
00083 #endif /* TBB_USE_CAPTURED_EXCEPTION */
00084 
00085     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00086     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00087 public:
00088     enum mode {
00090         parallel = current_version | filter_is_out_of_order, 
00092         serial_in_order = current_version | filter_is_serial,
00094         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00096         serial = serial_in_order
00097     };
00098 protected:
00099     filter( bool is_serial_ ) : 
00100         next_filter_in_pipeline(not_in_pipeline()),
00101         my_input_buffer(NULL),
00102         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00103         prev_filter_in_pipeline(not_in_pipeline()),
00104         my_pipeline(NULL),
00105         next_segment(NULL)
00106     {}
00107     
00108     filter( mode filter_mode ) :
00109         next_filter_in_pipeline(not_in_pipeline()),
00110         my_input_buffer(NULL),
00111         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00112         prev_filter_in_pipeline(not_in_pipeline()),
00113         my_pipeline(NULL),
00114         next_segment(NULL)
00115     {}
00116 
00117 public:
00119     bool is_serial() const {
00120         return bool( my_filter_mode & filter_is_serial );
00121     }  
00122     
00124     bool is_ordered() const {
00125         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00126     }
00127 
00129     bool is_bound() const {
00130         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00131     }
00132 
00134 
00135     virtual void* operator()( void* item ) = 0;
00136 
00138 
00139     virtual __TBB_EXPORTED_METHOD ~filter();
00140 
00141 #if __TBB_TASK_GROUP_CONTEXT
00143 
00145     virtual void finalize( void* /*item*/ ) {};
00146 #endif
00147 
00148 private:
00150     filter* next_filter_in_pipeline;
00151 
00153     //  (pipeline has not yet reached end_of_input or this filter has not yet
00154     //  seen the last token produced by input_filter)
00155     bool has_more_work();
00156 
00158 
00159     internal::input_buffer* my_input_buffer;
00160 
00161     friend class internal::stage_task;
00162     friend class internal::pipeline_root_task;
00163     friend class pipeline;
00164     friend class thread_bound_filter;
00165 
00167     const unsigned char my_filter_mode;
00168 
00170     filter* prev_filter_in_pipeline;
00171 
00173     pipeline* my_pipeline;
00174 
00176 
00177     filter* next_segment;
00178 };
00179 
00181 
00182 class thread_bound_filter: public filter {
00183 public:
00184     enum result_type {
00185         // item was processed
00186         success,
00187         // item is currently not available
00188         item_not_available,
00189         // there are no more items to process
00190         end_of_stream
00191     };
00192 protected:
00193     thread_bound_filter(mode filter_mode): 
00194          filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
00195     {}
00196 public:
00198 
00203     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00204 
00206 
00210     result_type __TBB_EXPORTED_METHOD process_item();
00211 
00212 private:
00214     result_type internal_process_item(bool is_blocking);
00215 };
00216 
00218 
00219 class pipeline {
00220 public:
00222     __TBB_EXPORTED_METHOD pipeline();
00223 
00226     virtual __TBB_EXPORTED_METHOD ~pipeline();
00227 
00229     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00230 
00232     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00233 
00234 #if __TBB_TASK_GROUP_CONTEXT
00236     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00237 #endif
00238 
00240     void __TBB_EXPORTED_METHOD clear();
00241 
00242 private:
00243     friend class internal::stage_task;
00244     friend class internal::pipeline_root_task;
00245     friend class filter;
00246     friend class thread_bound_filter;
00247     friend class internal::pipeline_cleaner;
00248     friend class tbb::interface5::internal::pipeline_proxy;
00249 
00251     filter* filter_list;
00252 
00254     filter* filter_end;
00255 
00257     task* end_counter;
00258 
00260     atomic<internal::Token> input_tokens;
00261 
00263     atomic<internal::Token> token_counter;
00264 
00266     bool end_of_input;
00267 
00269     bool has_thread_bound_filters;
00270 
00272     void remove_filter( filter& filter_ );
00273 
00275     void __TBB_EXPORTED_METHOD inject_token( task& self );
00276 
00277 #if __TBB_TASK_GROUP_CONTEXT
00279     void clear_filters();
00280 #endif
00281 };
00282 
00283 //------------------------------------------------------------------------
00284 // Support for lambda-friendly parallel_pipeline interface
00285 //------------------------------------------------------------------------
00286 
00287 namespace interface5 {
00288 
00289 namespace internal {
00290     template<typename T, typename U, typename Body> class concrete_filter;
00291 }
00292 
00294 class flow_control {
00295     bool is_pipeline_stopped;
00296     flow_control() { is_pipeline_stopped = false; }
00297     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00298 public:
00299     void stop() { is_pipeline_stopped = true; }
00300 };
00301 
00303 namespace internal {
00304 
00305 template<typename T, typename U, typename Body>
00306 class concrete_filter: public tbb::filter {
00307     const Body& my_body;
00308 
00309     typedef typename tbb::tbb_allocator<U> u_allocator;
00310     typedef typename tbb::tbb_allocator<T> t_allocator;
00311 
00312     /*override*/ void* operator()(void* input) {
00313         T* temp_input = (T*)input;
00314         // Call user's operator()() here
00315         U* output_u = u_allocator().allocate(1);
00316         void* output = (void*) new (output_u) U(my_body(*temp_input)); 
00317         t_allocator().destroy(temp_input);
00318         t_allocator().deallocate(temp_input,1);
00319         return output;
00320     }
00321 
00322 public:
00323     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00324 };
00325 
00326 template<typename U, typename Body>
00327 class concrete_filter<void,U,Body>: public filter {
00328     const Body& my_body;
00329 
00330     typedef typename tbb::tbb_allocator<U> u_allocator;
00331 
00332     /*override*/void* operator()(void*) {
00333         flow_control control;
00334         U* output_u = u_allocator().allocate(1);
00335         (void) new (output_u) U(my_body(control));
00336         if(control.is_pipeline_stopped) {
00337             u_allocator().destroy(output_u);
00338             u_allocator().deallocate(output_u,1);
00339             output_u = NULL;
00340         }
00341         return (void*)output_u;
00342     }
00343 public:
00344     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00345 };
00346 
00347 template<typename T, typename Body>
00348 class concrete_filter<T,void,Body>: public filter {
00349     const Body& my_body;
00350    
00351     typedef typename tbb::tbb_allocator<T> t_allocator;
00352 
00353     /*override*/ void* operator()(void* input) {
00354         T* temp_input = (T*)input;
00355         my_body(*temp_input);
00356         t_allocator().destroy(temp_input);
00357         t_allocator().deallocate(temp_input,1);
00358         return NULL;
00359     }
00360 public:
00361     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00362 };
00363 
00364 template<typename Body>
00365 class concrete_filter<void,void,Body>: public filter {
00366     const Body& my_body;
00367     
00369     /*override*/ void* operator()(void*) {
00370         flow_control control;
00371         my_body(control);
00372         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
00373         return output;
00374     }
00375 public:
00376     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00377 };
00378 
00380 
00381 class pipeline_proxy {
00382     tbb::pipeline my_pipe;
00383 public:
00384     pipeline_proxy( const filter_t<void,void>& filter_chain );
00385     ~pipeline_proxy() {
00386         while( filter* f = my_pipe.filter_list ) 
00387             delete f; // filter destructor removes it from the pipeline
00388     }
00389     tbb::pipeline* operator->() { return &my_pipe; }
00390 };
00391 
00393 
00394 class filter_node: tbb::internal::no_copy {
00396     tbb::atomic<intptr_t> ref_count;
00397 protected:
00398     filter_node() {
00399         ref_count = 0;
00400 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00401         ++(__TBB_TEST_FILTER_NODE_COUNT);
00402 #endif
00403     }
00404 public:
00406     virtual void add_to( pipeline& ) = 0;
00408     void add_ref() {++ref_count;}
00410     void remove_ref() {
00411         __TBB_ASSERT(ref_count>0,"ref_count underflow");
00412         if( --ref_count==0 ) 
00413             delete this;
00414     }
00415     virtual ~filter_node() {
00416 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00417         --(__TBB_TEST_FILTER_NODE_COUNT);
00418 #endif
00419     }
00420 };
00421 
00423 template<typename T, typename U, typename Body>
00424 class filter_node_leaf: public filter_node  {
00425     const tbb::filter::mode mode;
00426     const Body body;
00427     /*override*/void add_to( pipeline& p ) {
00428         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00429         p.add_filter( *f );
00430     }
00431 public:
00432     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00433 };
00434 
00436 class filter_node_join: public filter_node {
00437     friend class filter_node; // to suppress GCC 3.2 warnings
00438     filter_node& left;
00439     filter_node& right;
00440     /*override*/~filter_node_join() {
00441        left.remove_ref();
00442        right.remove_ref();
00443     }
00444     /*override*/void add_to( pipeline& p ) {
00445         left.add_to(p);
00446         right.add_to(p);
00447     }
00448 public:
00449     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00450        left.add_ref();
00451        right.add_ref();
00452     }
00453 };
00454 
00455 } // namespace internal
00457 
00459 template<typename T, typename U, typename Body>
00460 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00461     return new internal::filter_node_leaf<T,U,Body>(mode, body);
00462 }
00463 
00464 template<typename T, typename V, typename U>
00465 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00466     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00467     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00468     return new internal::filter_node_join(*left.root,*right.root);
00469 }
00470 
00472 template<typename T, typename U>
00473 class filter_t {
00474     typedef internal::filter_node filter_node;
00475     filter_node* root;
00476     filter_t( filter_node* root_ ) : root(root_) {
00477         root->add_ref();
00478     }
00479     friend class internal::pipeline_proxy;
00480     template<typename T_, typename U_, typename Body>
00481     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00482     template<typename T_, typename V_, typename U_>
00483     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00484 public:
00485     filter_t() : root(NULL) {}
00486     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00487         if( root ) root->add_ref();
00488     }
00489     template<typename Body>
00490     filter_t( tbb::filter::mode mode, const Body& body ) :
00491         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00492         root->add_ref();
00493     }
00494 
00495     void operator=( const filter_t<T,U>& rhs ) {
00496         // Order of operations below carefully chosen so that reference counts remain correct
00497         // in unlikely event that remove_ref throws exception.
00498         filter_node* old = root;
00499         root = rhs.root; 
00500         if( root ) root->add_ref();
00501         if( old ) old->remove_ref();
00502     }
00503     ~filter_t() {
00504         if( root ) root->remove_ref();
00505     }
00506     void clear() {
00507         // Like operator= with filter_t() on right side.
00508         if( root ) {
00509             filter_node* old = root;
00510             root = NULL;
00511             old->remove_ref();
00512         }
00513     }
00514 };
00515 
00516 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00517     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
00518     filter_chain.root->add_to(my_pipe);
00519 }
00520 
00521 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00522 #if __TBB_TASK_GROUP_CONTEXT
00523     , tbb::task_group_context& context
00524 #endif
00525     ) {
00526     internal::pipeline_proxy pipe(filter_chain);
00527     // tbb::pipeline::run() is called via the proxy
00528     pipe->run(max_number_of_live_tokens
00529 #if __TBB_TASK_GROUP_CONTEXT
00530               , context
00531 #endif
00532     );
00533 }
00534 
00535 #if __TBB_TASK_GROUP_CONTEXT
00536 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00537     tbb::task_group_context context;
00538     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00539 }
00540 #endif // __TBB_TASK_GROUP_CONTEXT
00541 
00542 } // interface5
00543 
00544 using interface5::flow_control;
00545 using interface5::filter_t;
00546 using interface5::make_filter;
00547 using interface5::parallel_pipeline;
00548 
00549 } // tbb
00550 
00551 #endif /* __TBB_pipeline_H */

Copyright © 2005-2010 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.