00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025
00026 #include <gruel/thread.h>
00027 #include <iostream>
00028 #include <stdexcept>
00029
00030 #ifndef DO_DEBUG
00031 #define DO_DEBUG 0
00032 #endif
00033
00034 #if DO_DEBUG
00035 #define DEBUG(X) do{X} while(0);
00036 #else
00037 #define DEBUG(X) do{} while(0);
00038 #endif
00039
00040 template <class T>
00041 class circular_buffer
00042 {
00043 private:
00044
00045 T* d_buffer;
00046
00047
00048 size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00049 size_t d_n_avail_write_I, d_n_avail_read_I;
00050
00051
00052 gruel::mutex* d_internal;
00053 gruel::condition_variable* d_readBlock;
00054 gruel::condition_variable* d_writeBlock;
00055
00056
00057 bool d_doWriteBlock, d_doFullRead, d_doAbort;
00058
00059 void delete_mutex_cond () {
00060 if (d_internal) {
00061 delete d_internal;
00062 d_internal = NULL;
00063 }
00064 if (d_readBlock) {
00065 delete d_readBlock;
00066 d_readBlock = NULL;
00067 }
00068 if (d_writeBlock) {
00069 delete d_writeBlock;
00070 d_writeBlock = NULL;
00071 }
00072 };
00073
00074 public:
00075 circular_buffer (size_t bufLen_I,
00076 bool doWriteBlock = true, bool doFullRead = false) {
00077 if (bufLen_I == 0)
00078 throw std::runtime_error ("circular_buffer(): "
00079 "Number of items to buffer must be > 0.\n");
00080 d_bufLen_I = bufLen_I;
00081 d_buffer = (T*) new T[d_bufLen_I];
00082 d_doWriteBlock = doWriteBlock;
00083 d_doFullRead = doFullRead;
00084 d_internal = NULL;
00085 d_readBlock = d_writeBlock = NULL;
00086 reset ();
00087 DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_
00088 << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false")
00089 << ", doFullRead = " << (d_doFullRead ? "true" : "false")
00090 << std::endl);
00091 };
00092
00093 ~circular_buffer () {
00094 delete_mutex_cond ();
00095 delete [] d_buffer;
00096 };
00097
00098 inline size_t n_avail_write_items () {
00099 gruel::scoped_lock l (*d_internal);
00100 size_t retVal = d_n_avail_write_I;
00101 return (retVal);
00102 };
00103
00104 inline size_t n_avail_read_items () {
00105 gruel::scoped_lock l (*d_internal);
00106 size_t retVal = d_n_avail_read_I;
00107 return (retVal);
00108 };
00109
00110 inline size_t buffer_length_items () {return (d_bufLen_I);};
00111 inline bool do_write_block () {return (d_doWriteBlock);};
00112 inline bool do_full_read () {return (d_doFullRead);};
00113
00114 void reset () {
00115 d_doAbort = false;
00116 bzero (d_buffer, d_bufLen_I * sizeof (T));
00117 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00118 d_n_avail_write_I = d_bufLen_I;
00119 delete_mutex_cond ();
00120
00121
00122
00123 d_internal = new gruel::mutex ();
00124
00125
00126
00127
00128 d_readBlock = new gruel::condition_variable ();
00129 d_writeBlock = new gruel::condition_variable ();
00130 };
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153 int enqueue (T* buf, size_t bufLen_I) {
00154 DEBUG (std::cerr << "enqueue: buf = " << (void*) buf
00155 << ", bufLen = " << bufLen_I
00156 << ", #av_wr = " << d_n_avail_write_I
00157 << ", #av_rd = " << d_n_avail_read_I << std::endl);
00158 if (bufLen_I > d_bufLen_I) {
00159 std::cerr << "ERROR: cannot add buffer longer ("
00160 << bufLen_I << ") than instantiated length ("
00161 << d_bufLen_I << ")." << std::endl;
00162 throw std::runtime_error ("circular_buffer::enqueue()");
00163 }
00164
00165 if (bufLen_I == 0)
00166 return (0);
00167 if (!buf)
00168 throw std::runtime_error ("circular_buffer::enqueue(): "
00169 "input buffer is NULL.\n");
00170 gruel::scoped_lock l (*d_internal);
00171 if (d_doAbort) {
00172 return (2);
00173 }
00174
00175 int retval = 1;
00176 if (bufLen_I > d_n_avail_write_I) {
00177 if (d_doWriteBlock) {
00178 while (bufLen_I > d_n_avail_write_I) {
00179 DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl);
00180
00181
00182 d_writeBlock->wait (l);
00183
00184 if (d_doAbort) {
00185 DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl);
00186 return (2);
00187 }
00188 DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl);
00189 }
00190 } else {
00191 d_n_avail_read_I = d_bufLen_I - bufLen_I;
00192 d_n_avail_write_I = bufLen_I;
00193 DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl);
00194 retval = -1;
00195 }
00196 }
00197 size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00198 if (n_now_I > bufLen_I)
00199 n_now_I = bufLen_I;
00200 else if (n_now_I < bufLen_I)
00201 n_start_I = bufLen_I - n_now_I;
00202 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00203 if (n_start_I) {
00204 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00205 d_writeNdx_I = n_start_I;
00206 } else
00207 d_writeNdx_I += n_now_I;
00208 d_n_avail_read_I += bufLen_I;
00209 d_n_avail_write_I -= bufLen_I;
00210 d_readBlock->notify_one ();
00211 return (retval);
00212 };
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 int dequeue (T* buf, size_t* bufLen_I) {
00236 DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf)
00237 << ", *bufLen = " << (*bufLen_I)
00238 << ", #av_wr = " << d_n_avail_write_I
00239 << ", #av_rd = " << d_n_avail_read_I << std::endl);
00240 if (!bufLen_I)
00241 throw std::runtime_error ("circular_buffer::dequeue(): "
00242 "input bufLen pointer is NULL.\n");
00243 if (!buf)
00244 throw std::runtime_error ("circular_buffer::dequeue(): "
00245 "input buffer pointer is NULL.\n");
00246 size_t l_bufLen_I = *bufLen_I;
00247 if (l_bufLen_I == 0)
00248 return (0);
00249 if (l_bufLen_I > d_bufLen_I) {
00250 std::cerr << "ERROR: cannot remove buffer longer ("
00251 << l_bufLen_I << ") than instantiated length ("
00252 << d_bufLen_I << ")." << std::endl;
00253 throw std::runtime_error ("circular_buffer::dequeue()");
00254 }
00255
00256 gruel::scoped_lock l (*d_internal);
00257 if (d_doAbort) {
00258 return (2);
00259 }
00260 if (d_doFullRead) {
00261 while (d_n_avail_read_I < l_bufLen_I) {
00262 DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl);
00263
00264
00265 d_readBlock->wait (l);
00266
00267 if (d_doAbort) {
00268 DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl);
00269 return (2);
00270 }
00271 DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl);
00272 }
00273 } else {
00274 while (d_n_avail_read_I == 0) {
00275 DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl);
00276
00277
00278 d_readBlock->wait (l);
00279
00280 if (d_doAbort) {
00281 DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl);
00282 return (2);
00283 }
00284 DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl);
00285 }
00286 }
00287 if (l_bufLen_I > d_n_avail_read_I)
00288 l_bufLen_I = d_n_avail_read_I;
00289 size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00290 if (n_now_I > l_bufLen_I)
00291 n_now_I = l_bufLen_I;
00292 else if (n_now_I < l_bufLen_I)
00293 n_start_I = l_bufLen_I - n_now_I;
00294 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00295 if (n_start_I) {
00296 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00297 d_readNdx_I = n_start_I;
00298 } else
00299 d_readNdx_I += n_now_I;
00300 *bufLen_I = l_bufLen_I;
00301 d_n_avail_read_I -= l_bufLen_I;
00302 d_n_avail_write_I += l_bufLen_I;
00303 d_writeBlock->notify_one ();
00304 return (1);
00305 };
00306
00307 void abort () {
00308 gruel::scoped_lock l (*d_internal);
00309 d_doAbort = true;
00310 d_writeBlock->notify_one ();
00311 d_readBlock->notify_one ();
00312 };
00313 };
00314
00315 #endif