00001
00002
00003
00004
00005
00006
00007
00008
00009 #include <ctype.h>
00010 #include <time.h>
00011 #include "wvhttppool.h"
00012 #include "wvbufstream.h"
00013 #include "wvtcp.h"
00014 #include "strutils.h"
00015
00016 bool WvHttpStream::global_enable_pipelining = true;
00017 int WvUrlStream::max_requests = 100;
00018
00019 unsigned WvHash(const WvUrlStream::Target &n)
00020 {
00021 WvString key("%s%s", n.remaddr, n.username);
00022 return (WvHash(key));
00023 }
00024
00025
00026 WvUrlRequest::WvUrlRequest(WvStringParm _url, WvStringParm _method,
00027 WvStringParm _headers, WvStream *content_source,
00028 bool _create_dirs, bool _pipeline_test)
00029 : url(_url), headers(_headers)
00030 {
00031 instream = NULL;
00032 create_dirs = _create_dirs;
00033 pipeline_test = _pipeline_test;
00034 method = _method;
00035 is_dir = false;
00036
00037 if (pipeline_test)
00038 {
00039 outstream = NULL;
00040 putstream = NULL;
00041 }
00042 else
00043 {
00044 WvBufUrlStream *x = new WvBufUrlStream;
00045 outstream = x;
00046 x->url = url;
00047
00048 putstream = content_source;
00049 }
00050 inuse = false;
00051 }
00052
00053
00054 WvUrlRequest::~WvUrlRequest()
00055 {
00056 done();
00057 }
00058
00059
00060 void WvUrlRequest::done()
00061 {
00062 if (outstream)
00063 {
00064 outstream->seteof();
00065 outstream = NULL;
00066 }
00067 if (putstream)
00068 putstream = NULL;
00069 inuse = false;
00070 }
00071
00072
00073 void WvUrlStream::addurl(WvUrlRequest *url)
00074 {
00075 log(WvLog::Debug4, "Adding a new url: '%s'\n", url->url);
00076
00077 assert(url->outstream);
00078
00079 if (!url->url.isok())
00080 return;
00081
00082 waiting_urls.append(url, false, "waiting_url");
00083 request_next();
00084 }
00085
00086
00087 void WvUrlStream::delurl(WvUrlRequest *url)
00088 {
00089 log(WvLog::Debug4, "Removing an url: '%s'\n", url->url);
00090
00091 if (url == curl)
00092 doneurl();
00093 waiting_urls.unlink(url);
00094 urls.unlink(url);
00095 }
00096
00097
00098 WvHttpPool::WvHttpPool()
00099 : log("HTTP Pool", WvLog::Debug), conns(10),
00100 pipeline_incompatible(50)
00101 {
00102 log("Pool initializing.\n");
00103 num_streams_created = 0;
00104 }
00105
00106
00107 WvHttpPool::~WvHttpPool()
00108 {
00109 log("Created %s individual session%s during this run.\n",
00110 num_streams_created, num_streams_created == 1 ? "" : "s");
00111 if (geterr())
00112 log("Error was: %s\n", errstr());
00113
00114
00115
00116 zap();
00117 conns.zap();
00118 }
00119
00120
00121 void WvHttpPool::pre_select(SelectInfo &si)
00122 {
00123
00124
00125
00126 WvIStreamList::pre_select(si);
00127
00128 WvUrlStreamDict::Iter ci(conns);
00129 for (ci.rewind(); ci.next(); )
00130 {
00131 if (!ci->isok())
00132 si.msec_timeout = 0;
00133 }
00134
00135 WvUrlRequestList::Iter i(urls);
00136 for (i.rewind(); i.next(); )
00137 {
00138 if (!i->instream)
00139 {
00140 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
00141 if (i->url.resolve())
00142 si.msec_timeout = 0;
00143 else
00144 dns.pre_select(i->url.gethost(), si);
00145 }
00146 }
00147 }
00148
00149
00150 bool WvHttpPool::post_select(SelectInfo &si)
00151 {
00152 bool sure = false;
00153
00154 WvUrlStreamDict::Iter ci(conns);
00155 for (ci.rewind(); ci.next(); )
00156 {
00157 if (!ci->isok())
00158 {
00159 log(WvLog::Debug4, "Selecting true because of a dead stream.\n");
00160 unconnect(ci.ptr());
00161 ci.rewind();
00162 sure = true;
00163 }
00164 }
00165
00166 WvUrlRequestList::Iter i(urls);
00167 for (i.rewind(); i.next(); )
00168 {
00169 if ((!i->outstream && !i->inuse) || !i->url.isok())
00170 {
00171
00172
00173 if (!i->url.isok())
00174 {
00175 log("URL not okay: '%s'\n", i->url);
00176 i->done();
00177 }
00178
00179 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
00180 WvUrlStream *s = conns[target];
00181 if (s)
00182 s->delurl(i.ptr());
00183 i.xunlink();
00184 continue;
00185 }
00186
00187 if (!i->instream)
00188 {
00189 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
00190 if (i->url.resolve() || dns.post_select(i->url.gethost(), si))
00191 {
00192 log(WvLog::Debug4, "Selecting true because of '%s'\n", i->url);
00193 sure = true;
00194 }
00195 }
00196 }
00197
00198 return WvIStreamList::post_select(si) || sure;
00199 }
00200
00201
00202 void WvHttpPool::execute()
00203 {
00204 WvIStreamList::execute();
00205
00206 WvUrlRequestList::Iter i(urls);
00207 for (i.rewind(); i.next(); )
00208 {
00209 WvUrlStream *s;
00210
00211 if (!i->outstream || !i->url.isok() || !i->url.resolve())
00212 continue;
00213
00214 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
00215
00216
00217
00218 s = conns[target];
00219
00220
00221 if (s && !s->isok())
00222 {
00223 unconnect(s);
00224 s = NULL;
00225 }
00226
00227 if (!i->outstream)
00228 continue;
00229
00230 if (!s)
00231 {
00232 num_streams_created++;
00233 if (!strncasecmp(i->url.getproto(), "http", 4))
00234 s = new WvHttpStream(target.remaddr, target.username,
00235 i->url.getproto() == "https",
00236 pipeline_incompatible);
00237 else if (!strcasecmp(i->url.getproto(), "ftp"))
00238 s = new WvFtpStream(target.remaddr, target.username,
00239 i->url.getpassword());
00240 conns.add(s, true);
00241
00242
00243 append(s, false, "http/ftp stream");
00244 }
00245
00246 if (!i->instream)
00247 {
00248 s->addurl(i.ptr());
00249 i->instream = s;
00250 }
00251 }
00252 }
00253
00254
00255 WvBufUrlStream *WvHttpPool::addurl(WvStringParm _url, WvStringParm _method,
00256 WvStringParm _headers, WvStream *content_source, bool create_dirs)
00257 {
00258 log(WvLog::Debug4, "Adding a new url to pool: '%s'\n", _url);
00259 WvUrlRequest *url = new WvUrlRequest(_url, _method, _headers, content_source,
00260 create_dirs, false);
00261 urls.append(url, true, "addurl");
00262
00263 return url->outstream;
00264 }
00265
00266
00267 void WvHttpPool::unconnect(WvUrlStream *s)
00268 {
00269 if (!s->target.username)
00270 log("Unconnecting stream to %s.\n", s->target.remaddr);
00271 else
00272 log("Unconnecting stream to %s@%s.\n", s->target.username,
00273 s->target.remaddr);
00274
00275 WvUrlRequestList::Iter i(urls);
00276 for (i.rewind(); i.next(); )
00277 {
00278 if (i->instream == s)
00279 i->instream = NULL;
00280 }
00281
00282 unlink(s);
00283 conns.remove(s);
00284 }