00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 #include "wvstreamsdebugger.h"
00019 #include "wvstrutils.h"
00020 #include "wvistreamlist.h"
00021 #include "wvlinkerhack.h"
00022 #include "wvmoniker.h"
00023
00024 #ifdef _WIN32
00025 #define ENOBUFS WSAENOBUFS
00026 #undef errno
00027 #define errno GetLastError()
00028 #ifdef __GNUC__
00029 #include <sys/socket.h>
00030 #endif
00031 #include "streams.h"
00032 #else
00033 #include <errno.h>
00034 #endif
00035
00036 #include <map>
00037
00038 using std::make_pair;
00039 using std::map;
00040
00041
00042
00043
00044 #if 0
00045 # ifndef _MSC_VER
00046 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00047 # else
00048 # define TRACE printf
00049 # endif
00050 #else
00051 # ifndef _MSC_VER
00052 # define TRACE(x, y...)
00053 # else
00054 # define TRACE
00055 # endif
00056 #endif
00057
00058 WvStream *WvStream::globalstream = NULL;
00059
00060 UUID_MAP_BEGIN(WvStream)
00061 UUID_MAP_ENTRY(IObject)
00062 UUID_MAP_ENTRY(IWvStream)
00063 UUID_MAP_END
00064
00065
00066 static map<WSID, WvStream*> *wsid_map;
00067 static WSID next_wsid_to_try;
00068
00069
00070 WV_LINK(WvStream);
00071
00072 static IWvStream *create_null(WvStringParm, IObject *)
00073 {
00074 return new WvStream();
00075 }
00076
00077 static WvMoniker<IWvStream> reg("null", create_null);
00078
00079
00080 IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
00081 {
00082 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
00083 if (!s)
00084 {
00085 s = new WvStream();
00086 s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
00087 WVRELEASE(obj);
00088 }
00089 return s;
00090 }
00091
00092
00093 static bool is_prefix_insensitive(const char *str, const char *prefix)
00094 {
00095 size_t len = strlen(prefix);
00096 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
00097 }
00098
00099
00100 static const char *strstr_insensitive(const char *haystack, const char *needle)
00101 {
00102 while (*haystack != '\0')
00103 {
00104 if (is_prefix_insensitive(haystack, needle))
00105 return haystack;
00106 ++haystack;
00107 }
00108 return NULL;
00109 }
00110
00111
00112 static bool contains_insensitive(const char *haystack, const char *needle)
00113 {
00114 return strstr_insensitive(haystack, needle) != NULL;
00115 }
00116
00117
00118 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
00119 static inline const char *Yes_No(bool val)
00120 {
00121 return val? "Yes": "No";
00122 }
00123
00124
00125 void WvStream::debugger_streams_display_header(WvStringParm cmd,
00126 WvStreamsDebugger::ResultCallback result_cb)
00127 {
00128 WvStringList result;
00129 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
00130 "----------------Type", "-", "Name--------------------");
00131 result_cb(cmd, result);
00132 }
00133
00134
00135
00136 static WvString friendly_ms(time_t ms)
00137 {
00138 if (ms <= 0)
00139 return WvString("(%s)", ms);
00140 else if (ms < 1000)
00141 return WvString("%sms", ms);
00142 else if (ms < 60*1000)
00143 return WvString("%ss", ms/1000);
00144 else if (ms < 60*60*1000)
00145 return WvString("%sm", ms/(60*1000));
00146 else if (ms <= 24*60*60*1000)
00147 return WvString("%sh", ms/(60*60*1000));
00148 else
00149 return WvString("%sd", ms/(24*60*60*1000));
00150 }
00151
00152 void WvStream::debugger_streams_display_one_stream(WvStream *s,
00153 WvStringParm cmd,
00154 WvStreamsDebugger::ResultCallback result_cb)
00155 {
00156 WvStringList result;
00157 s->addRef();
00158 unsigned refcount = s->release();
00159 result.append(list_format,
00160 s->wsid(), " ",
00161 refcount, " ",
00162 Yes_No(s->isok()), " ",
00163 Yes_No(s->uses_continue_select), " ",
00164 friendly_ms(s->alarm_remaining()), " ",
00165 s->wstype(), " ",
00166 s->wsname());
00167 result_cb(cmd, result);
00168 }
00169
00170
00171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
00172 WvStringParm cmd,
00173 const WvStringList &args,
00174 WvStreamsDebugger::ResultCallback result_cb)
00175 {
00176 bool show = args.isempty();
00177 WvStringList::Iter arg(args);
00178 for (arg.rewind(); arg.next(); )
00179 {
00180 WSID wsid;
00181 bool is_num = wvstring_to_num(*arg, wsid);
00182
00183 if (is_num)
00184 {
00185 if (s->wsid() == wsid)
00186 {
00187 show = true;
00188 break;
00189 }
00190 }
00191 else
00192 {
00193 if (s->wsname() && contains_insensitive(s->wsname(), *arg)
00194 || s->wstype() && contains_insensitive(s->wstype(), *arg))
00195 {
00196 show = true;
00197 break;
00198 }
00199 }
00200 }
00201 if (show)
00202 debugger_streams_display_one_stream(s, cmd, result_cb);
00203 }
00204
00205
00206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
00207 WvStringList &args,
00208 WvStreamsDebugger::ResultCallback result_cb, void *)
00209 {
00210 debugger_streams_display_header(cmd, result_cb);
00211 if (wsid_map)
00212 {
00213 map<WSID, WvStream*>::iterator it;
00214
00215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
00216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
00217 result_cb);
00218 }
00219
00220 return WvString::null;
00221 }
00222
00223
00224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
00225 WvStringList &args,
00226 WvStreamsDebugger::ResultCallback result_cb, void *)
00227 {
00228 if (args.isempty())
00229 return WvString("Usage: %s <WSID>", cmd);
00230 WSID wsid;
00231 WvString wsid_str = args.popstr();
00232 if (!wvstring_to_num(wsid_str, wsid))
00233 return WvString("Invalid WSID '%s'", wsid_str);
00234 IWvStream *s = WvStream::find_by_wsid(wsid);
00235 if (!s)
00236 return WvString("No such stream");
00237 s->close();
00238 return WvString::null;
00239 }
00240
00241
00242 void WvStream::add_debugger_commands()
00243 {
00244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
00245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
00246 }
00247
00248
00249 WvStream::WvStream():
00250 read_requires_writable(NULL),
00251 write_requires_readable(NULL),
00252 uses_continue_select(false),
00253 personal_stack_size(131072),
00254 alarm_was_ticking(false),
00255 stop_read(false),
00256 stop_write(false),
00257 closed(false),
00258 readcb(wv::bind(&WvStream::legacy_callback, this)),
00259 max_outbuf_size(0),
00260 outbuf_delayed_flush(false),
00261 is_auto_flush(true),
00262 want_to_flush(true),
00263 is_flushing(false),
00264 queue_min(0),
00265 autoclose_time(0),
00266 alarm_time(wvtime_zero),
00267 last_alarm_check(wvtime_zero)
00268 {
00269 TRACE("Creating wvstream %p\n", this);
00270
00271 static bool first = true;
00272 if (first)
00273 {
00274 first = false;
00275 WvStream::add_debugger_commands();
00276 }
00277
00278
00279 if (!wsid_map)
00280 wsid_map = new map<WSID, WvStream*>;
00281 WSID first_wsid_tried = next_wsid_to_try;
00282 do
00283 {
00284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
00285 break;
00286 ++next_wsid_to_try;
00287 } while (next_wsid_to_try != first_wsid_tried);
00288 my_wsid = next_wsid_to_try++;
00289 bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
00290 assert(inserted);
00291
00292 #ifdef _WIN32
00293 WSAData wsaData;
00294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00295 assert(result == 0);
00296 #endif
00297 }
00298
00299
00300
00301 IWvStream::IWvStream()
00302 {
00303 }
00304
00305
00306 IWvStream::~IWvStream()
00307 {
00308 }
00309
00310
00311 WvStream::~WvStream()
00312 {
00313 TRACE("destroying %p\n", this);
00314 close();
00315
00316
00317
00318
00319 assert(!uses_continue_select || !call_ctx);
00320
00321 call_ctx = 0;
00322
00323 assert(wsid_map);
00324 wsid_map->erase(my_wsid);
00325 if (wsid_map->empty())
00326 {
00327 delete wsid_map;
00328 wsid_map = NULL;
00329 }
00330
00331
00332
00333
00334
00335 WvIStreamList::globallist.unlink(this);
00336
00337 TRACE("done destroying %p\n", this);
00338 }
00339
00340
00341 void WvStream::close()
00342 {
00343 TRACE("flushing in wvstream...\n");
00344 flush(2000);
00345 TRACE("(flushed)\n");
00346
00347 closed = true;
00348
00349 if (!!closecb)
00350 {
00351 IWvStreamCallback cb = closecb;
00352 closecb = 0;
00353 cb();
00354 }
00355
00356
00357
00358
00359 }
00360
00361
00362 void WvStream::autoforward(WvStream &s)
00363 {
00364 setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
00365 read_requires_writable = &s;
00366 }
00367
00368
00369 void WvStream::noautoforward()
00370 {
00371 setcallback(0);
00372 read_requires_writable = NULL;
00373 }
00374
00375
00376 void WvStream::autoforward_callback(WvStream &input, WvStream &output)
00377 {
00378 char buf[1024];
00379 size_t len;
00380
00381 len = input.read(buf, sizeof(buf));
00382 output.write(buf, len);
00383 }
00384
00385
00386 void WvStream::_callback()
00387 {
00388 execute();
00389 if (!!callfunc)
00390 callfunc();
00391 }
00392
00393
00394 void *WvStream::_callwrap(void *)
00395 {
00396 _callback();
00397 return NULL;
00398 }
00399
00400
00401 void WvStream::callback()
00402 {
00403 TRACE("(?)");
00404
00405
00406 if (alarm_remaining() == 0)
00407 {
00408 alarm_time = wvtime_zero;
00409 alarm_was_ticking = true;
00410 }
00411 else
00412 alarm_was_ticking = false;
00413
00414 assert(!uses_continue_select || personal_stack_size >= 1024);
00415
00416 #define TEST_CONTINUES_HARSHLY 0
00417 #if TEST_CONTINUES_HARSHLY
00418 #ifndef _WIN32
00419 # warning "Using WvCont for *all* streams for testing!"
00420 #endif
00421 if (1)
00422 #else
00423 if (uses_continue_select && personal_stack_size >= 1024)
00424 #endif
00425 {
00426 if (!call_ctx)
00427 {
00428 call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
00429 personal_stack_size);
00430 }
00431
00432 call_ctx(NULL);
00433 }
00434 else
00435 _callback();
00436
00437
00438
00439
00440
00441
00442 }
00443
00444
00445 bool WvStream::isok() const
00446 {
00447 return !closed && WvErrorBase::isok();
00448 }
00449
00450
00451 void WvStream::seterr(int _errnum)
00452 {
00453 if (!geterr())
00454 {
00455 WvErrorBase::seterr(_errnum);
00456 close();
00457 }
00458 }
00459
00460
00461 size_t WvStream::read(WvBuf &outbuf, size_t count)
00462 {
00463
00464 size_t free = outbuf.free();
00465 if (count > free)
00466 count = free;
00467
00468 WvDynBuf tmp;
00469 unsigned char *buf = tmp.alloc(count);
00470 size_t len = read(buf, count);
00471 tmp.unalloc(count - len);
00472 outbuf.merge(tmp);
00473 return len;
00474 }
00475
00476
00477 size_t WvStream::write(WvBuf &inbuf, size_t count)
00478 {
00479
00480 size_t avail = inbuf.used();
00481 if (count > avail)
00482 count = avail;
00483 const unsigned char *buf = inbuf.get(count);
00484 size_t len = write(buf, count);
00485 inbuf.unget(count - len);
00486 return len;
00487 }
00488
00489
00490 size_t WvStream::read(void *buf, size_t count)
00491 {
00492 assert(!count || buf);
00493
00494 size_t bufu, i;
00495 unsigned char *newbuf;
00496
00497 bufu = inbuf.used();
00498 if (bufu < queue_min)
00499 {
00500 newbuf = inbuf.alloc(queue_min - bufu);
00501 assert(newbuf);
00502 i = uread(newbuf, queue_min - bufu);
00503 inbuf.unalloc(queue_min - bufu - i);
00504
00505 bufu = inbuf.used();
00506 }
00507
00508 if (bufu < queue_min)
00509 {
00510 maybe_autoclose();
00511 return 0;
00512 }
00513
00514
00515 if (!bufu)
00516 bufu = uread(buf, count);
00517 else
00518 {
00519
00520 if (bufu > count)
00521 bufu = count;
00522
00523 memcpy(buf, inbuf.get(bufu), bufu);
00524 }
00525
00526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00527 maybe_autoclose();
00528 return bufu;
00529 }
00530
00531
00532 size_t WvStream::write(const void *buf, size_t count)
00533 {
00534 assert(!count || buf);
00535 if (!isok() || !buf || !count || stop_write) return 0;
00536
00537 size_t wrote = 0;
00538 if (!outbuf_delayed_flush && !outbuf.used())
00539 {
00540 wrote = uwrite(buf, count);
00541 count -= wrote;
00542 buf = (const unsigned char *)buf + wrote;
00543
00544 }
00545 if (max_outbuf_size != 0)
00546 {
00547 size_t canbuffer = max_outbuf_size - outbuf.used();
00548 if (count > canbuffer)
00549 count = canbuffer;
00550 }
00551 if (count != 0)
00552 {
00553 outbuf.put(buf, count);
00554 wrote += count;
00555 }
00556
00557 if (should_flush())
00558 {
00559 if (is_auto_flush)
00560 flush(0);
00561 else
00562 flush_outbuf(0);
00563 }
00564
00565 return wrote;
00566 }
00567
00568
00569 void WvStream::noread()
00570 {
00571 stop_read = true;
00572 maybe_autoclose();
00573 }
00574
00575
00576 void WvStream::nowrite()
00577 {
00578 stop_write = true;
00579 maybe_autoclose();
00580 }
00581
00582
00583 void WvStream::maybe_autoclose()
00584 {
00585 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00586 close();
00587 }
00588
00589
00590 bool WvStream::isreadable()
00591 {
00592 return isok() && select(0, true, false, false);
00593 }
00594
00595
00596 bool WvStream::iswritable()
00597 {
00598 return !stop_write && isok() && select(0, false, true, false);
00599 }
00600
00601
00602 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00603 int readahead)
00604 {
00605 assert(separator >= 0);
00606 assert(separator <= 255);
00607
00608
00609
00610 WvTime timeout_time(0);
00611 if (wait_msec > 0)
00612 timeout_time = msecadd(wvtime(), wait_msec);
00613
00614 maybe_autoclose();
00615
00616
00617
00618 while (isok())
00619 {
00620
00621 queuemin(0);
00622
00623
00624 if (inbuf.strchr(separator) > 0)
00625 break;
00626 else if (!isok() || stop_read)
00627 break;
00628
00629
00630 queuemin(inbuf.used() + 1);
00631
00632
00633 if (wait_msec > 0)
00634 {
00635 wait_msec = msecdiff(timeout_time, wvtime());
00636 if (wait_msec < 0)
00637 wait_msec = 0;
00638 }
00639
00640
00641
00642 bool hasdata;
00643 if (wait_msec != 0 && uses_continue_select)
00644 hasdata = continue_select(wait_msec);
00645 else
00646 hasdata = select(wait_msec, true, false);
00647
00648 if (!isok())
00649 break;
00650
00651 if (hasdata)
00652 {
00653
00654 WvDynBuf tmp;
00655 unsigned char *buf = tmp.alloc(readahead);
00656 assert(buf);
00657 size_t len = uread(buf, readahead);
00658 tmp.unalloc(readahead - len);
00659 inbuf.put(tmp.get(len), len);
00660 hasdata = len > 0;
00661 }
00662
00663 if (!isok())
00664 break;
00665
00666 if (!hasdata && wait_msec == 0)
00667 return NULL;
00668 }
00669 if (!inbuf.used())
00670 return NULL;
00671
00672
00673 size_t i = 0;
00674 i = inbuf.strchr(separator);
00675 if (i > 0) {
00676 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00677 assert(eol && *eol == separator);
00678 *eol = 0;
00679 return const_cast<char*>((const char *)inbuf.get(i));
00680 } else {
00681
00682
00683
00684 inbuf.alloc(1)[0] = 0;
00685 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00686 }
00687 }
00688
00689
00690 char *WvStream::continue_getline(time_t wait_msec, int separator,
00691 int readahead)
00692 {
00693 assert(false && "not implemented, come back later!");
00694 assert(uses_continue_select);
00695 return NULL;
00696 }
00697
00698
00699 void WvStream::drain()
00700 {
00701 char buf[1024];
00702 while (isreadable())
00703 read(buf, sizeof(buf));
00704 }
00705
00706
00707 bool WvStream::flush(time_t msec_timeout)
00708 {
00709 if (is_flushing) return false;
00710
00711 TRACE("%p flush starts\n", this);
00712
00713 is_flushing = true;
00714 want_to_flush = true;
00715 bool done = flush_internal(msec_timeout)
00716 && flush_outbuf(msec_timeout);
00717 is_flushing = false;
00718
00719 TRACE("flush stops (%d)\n", done);
00720 return done;
00721 }
00722
00723
00724 bool WvStream::should_flush()
00725 {
00726 return want_to_flush;
00727 }
00728
00729
00730 bool WvStream::flush_outbuf(time_t msec_timeout)
00731 {
00732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00733 bool outbuf_was_used = outbuf.used();
00734
00735
00736
00737
00738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00739 {
00740 maybe_autoclose();
00741 return true;
00742 }
00743
00744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
00745
00746
00747 while (outbuf_was_used && isok())
00748 {
00749
00750
00751
00752 size_t attempt = outbuf.optgettable();
00753 size_t real = uwrite(outbuf.get(attempt), attempt);
00754
00755
00756
00757
00758 if (isok() && real < attempt)
00759 {
00760 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00761 assert(outbuf.ungettable() >= attempt - real);
00762 outbuf.unget(attempt - real);
00763 }
00764
00765
00766
00767
00768
00769 if (!msec_timeout)
00770 break;
00771 if (msec_timeout >= 0
00772 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00773 break;
00774
00775 outbuf_was_used = outbuf.used();
00776 }
00777
00778
00779 if (autoclose_time && isok())
00780 {
00781 time_t now = time(NULL);
00782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00783 this, now - autoclose_time, outbuf.used());
00784 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00785 {
00786 autoclose_time = 0;
00787 close();
00788 }
00789 }
00790
00791 TRACE("flush_outbuf: after autoclose chunk\n");
00792 if (outbuf_delayed_flush && !outbuf_was_used)
00793 want_to_flush = false;
00794
00795 TRACE("flush_outbuf: now isok=%d\n", isok());
00796
00797
00798 if (outbuf_was_used && !isok())
00799 outbuf.zap();
00800
00801 maybe_autoclose();
00802 TRACE("flush_outbuf stops\n");
00803
00804 return !outbuf_was_used;
00805 }
00806
00807
00808 bool WvStream::flush_internal(time_t msec_timeout)
00809 {
00810
00811 return true;
00812 }
00813
00814
00815 int WvStream::getrfd() const
00816 {
00817 return -1;
00818 }
00819
00820
00821 int WvStream::getwfd() const
00822 {
00823 return -1;
00824 }
00825
00826
00827 void WvStream::flush_then_close(int msec_timeout)
00828 {
00829 time_t now = time(NULL);
00830 autoclose_time = now + (msec_timeout + 999) / 1000;
00831
00832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00833 this, outbuf.used(), autoclose_time - now);
00834
00835
00836
00837
00838
00839
00840 flush(0);
00841 }
00842
00843
00844 void WvStream::pre_select(SelectInfo &si)
00845 {
00846 maybe_autoclose();
00847
00848 time_t alarmleft = alarm_remaining();
00849
00850 if (!isok() || (!si.inherit_request && alarmleft == 0))
00851 {
00852 si.msec_timeout = 0;
00853 return;
00854 }
00855
00856 if (!si.inherit_request)
00857 {
00858 si.wants.readable |= static_cast<bool>(readcb);
00859 si.wants.writable |= static_cast<bool>(writecb);
00860 si.wants.isexception |= static_cast<bool>(exceptcb);
00861 }
00862
00863
00864 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00865 {
00866 si.msec_timeout = 0;
00867 return;
00868 }
00869 if (alarmleft >= 0
00870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00871 si.msec_timeout = alarmleft + 10;
00872 }
00873
00874
00875 bool WvStream::post_select(SelectInfo &si)
00876 {
00877 if (!si.inherit_request)
00878 {
00879 si.wants.readable |= static_cast<bool>(readcb);
00880 si.wants.writable |= static_cast<bool>(writecb);
00881 si.wants.isexception |= static_cast<bool>(exceptcb);
00882 }
00883
00884
00885
00886
00887
00888
00889
00890 if (should_flush())
00891 flush(0);
00892 if (!si.inherit_request && alarm_remaining() == 0)
00893 return true;
00894 if ((si.wants.readable || (!si.inherit_request && readcb))
00895 && inbuf.used() && inbuf.used() >= queue_min)
00896 return true;
00897 return false;
00898 }
00899
00900
00901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00902 bool readable, bool writable, bool isexcept, bool forceable)
00903 {
00904 FD_ZERO(&si.read);
00905 FD_ZERO(&si.write);
00906 FD_ZERO(&si.except);
00907
00908 if (forceable)
00909 {
00910 si.wants.readable = readcb;
00911 si.wants.writable = writecb;
00912 si.wants.isexception = exceptcb;
00913 }
00914 else
00915 {
00916 si.wants.readable = readable;
00917 si.wants.writable = writable;
00918 si.wants.isexception = isexcept;
00919 }
00920
00921 si.max_fd = -1;
00922 si.msec_timeout = msec_timeout;
00923 si.inherit_request = ! forceable;
00924 si.global_sure = false;
00925
00926 wvstime_sync();
00927
00928 pre_select(si);
00929 if (globalstream && forceable && (globalstream != this))
00930 {
00931 WvStream *s = globalstream;
00932 globalstream = NULL;
00933 s->xpre_select(si, SelectRequest(false, false, false));
00934 globalstream = s;
00935 }
00936 }
00937
00938
00939 int WvStream::_do_select(SelectInfo &si)
00940 {
00941
00942 timeval tv;
00943 tv.tv_sec = si.msec_timeout / 1000;
00944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00945
00946 #ifdef _WIN32
00947
00948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00949 FD_SET(fakefd, &si.except);
00950 #endif
00951
00952
00953 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00955
00956
00957
00958
00959
00960
00961 if (sel < 0
00962 && errno != EAGAIN && errno != EINTR
00963 && errno != EBADF
00964 && errno != ENOBUFS
00965 )
00966 {
00967 seterr(errno);
00968 }
00969 #ifdef _WIN32
00970 ::close(fakefd);
00971 #endif
00972 TRACE("select() returned %d\n", sel);
00973 return sel;
00974 }
00975
00976
00977 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00978 {
00979
00980
00981
00982
00983
00984 wvstime_sync_forward();
00985
00986 bool sure = post_select(si);
00987 if (globalstream && forceable && (globalstream != this))
00988 {
00989 WvStream *s = globalstream;
00990 globalstream = NULL;
00991 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00992 || si.global_sure;
00993 globalstream = s;
00994 }
00995 return sure;
00996 }
00997
00998
00999 bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
01000 bool isexcept, bool forceable)
01001 {
01002
01003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
01004
01005 SelectInfo si;
01006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
01007 forceable);
01008
01009 bool sure = false;
01010 int sel = _do_select(si);
01011 if (sel >= 0)
01012 sure = _process_selectinfo(si, forceable);
01013 if (si.global_sure && globalstream && forceable && (globalstream != this))
01014 globalstream->callback();
01015
01016 return sure;
01017 }
01018
01019
01020 IWvStream::SelectRequest WvStream::get_select_request()
01021 {
01022 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
01023 }
01024
01025
01026 void WvStream::force_select(bool readable, bool writable, bool isexception)
01027 {
01028 if (readable)
01029 readcb = wv::bind(&WvStream::legacy_callback, this);
01030 if (writable)
01031 writecb = wv::bind(&WvStream::legacy_callback, this);
01032 if (isexception)
01033 exceptcb = wv::bind(&WvStream::legacy_callback, this);
01034 }
01035
01036
01037 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
01038 {
01039 if (readable)
01040 readcb = 0;
01041 if (writable)
01042 writecb = 0;
01043 if (isexception)
01044 exceptcb = 0;
01045 }
01046
01047
01048 void WvStream::alarm(time_t msec_timeout)
01049 {
01050 if (msec_timeout >= 0)
01051 alarm_time = msecadd(wvstime(), msec_timeout);
01052 else
01053 alarm_time = wvtime_zero;
01054 }
01055
01056
01057 time_t WvStream::alarm_remaining()
01058 {
01059 if (alarm_time.tv_sec)
01060 {
01061 WvTime now = wvstime();
01062
01063
01064 if (now < last_alarm_check)
01065 {
01066 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
01067
01068 if (msecdiff(last_alarm_check, now) > 200)
01069 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
01070 "(%ld:%ld %ld:%ld)\n",
01071 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
01072 now.tv_sec, now.tv_usec);
01073 #endif
01074 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
01075 }
01076
01077 last_alarm_check = now;
01078
01079 time_t remaining = msecdiff(alarm_time, now);
01080 if (remaining < 0)
01081 remaining = 0;
01082 return remaining;
01083 }
01084 return -1;
01085 }
01086
01087
01088 bool WvStream::continue_select(time_t msec_timeout)
01089 {
01090 assert(uses_continue_select);
01091
01092
01093
01094 assert(call_ctx);
01095
01096 if (msec_timeout >= 0)
01097 alarm(msec_timeout);
01098
01099 alarm(msec_timeout);
01100 WvCont::yield();
01101 alarm(-1);
01102
01103
01104
01105
01106
01107
01108
01109 TRACE("hello-%p\n", this);
01110 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
01111 }
01112
01113
01114 void WvStream::terminate_continue_select()
01115 {
01116 close();
01117 call_ctx = 0;
01118 }
01119
01120
01121 const WvAddr *WvStream::src() const
01122 {
01123 return NULL;
01124 }
01125
01126
01127 void WvStream::setcallback(IWvStreamCallback _callfunc)
01128 {
01129 callfunc = _callfunc;
01130 call_ctx = 0;
01131 }
01132
01133
01134 void WvStream::legacy_callback()
01135 {
01136 execute();
01137 if (!!callfunc)
01138 callfunc();
01139 }
01140
01141
01142 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
01143 {
01144 IWvStreamCallback tmp = readcb;
01145
01146 readcb = _callback;
01147
01148 return tmp;
01149 }
01150
01151
01152 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
01153 {
01154 IWvStreamCallback tmp = writecb;
01155
01156 writecb = _callback;
01157
01158 return tmp;
01159 }
01160
01161
01162 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
01163 {
01164 IWvStreamCallback tmp = exceptcb;
01165
01166 exceptcb = _callback;
01167
01168 return tmp;
01169 }
01170
01171
01172 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
01173 {
01174 IWvStreamCallback tmp = closecb;
01175 if (isok())
01176 closecb = _callback;
01177 else
01178 {
01179
01180 closecb = 0;
01181 if (!!_callback)
01182 _callback();
01183 }
01184 return tmp;
01185 }
01186
01187
01188 void WvStream::unread(WvBuf &unreadbuf, size_t count)
01189 {
01190 WvDynBuf tmp;
01191 tmp.merge(unreadbuf, count);
01192 tmp.merge(inbuf);
01193 inbuf.zap();
01194 inbuf.merge(tmp);
01195 }
01196
01197
01198 IWvStream *WvStream::find_by_wsid(WSID wsid)
01199 {
01200 IWvStream *retval = NULL;
01201
01202 if (wsid_map)
01203 {
01204 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
01205
01206 if (it != wsid_map->end())
01207 retval = it->second;
01208 }
01209
01210 return retval;
01211 }