Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 25  from cproton import pn_reactor_collector, pn_collector_release 
 57   
58 -class SendException(ProtonException):
59 """ 60 Exception used to indicate an exceptional state/condition on a send request 61 """
62 - def __init__(self, state):
63 self.state = state
64
65 -def _is_settled(delivery):
66 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
67
68 -class BlockingSender(BlockingLink):
69 - def __init__(self, connection, sender):
70 super(BlockingSender, self).__init__(connection, sender) 71 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 72 #this may be followed by a detach, which may contain an error condition, so wait a little... 73 self._waitForClose() 74 #...but close ourselves if peer does not 75 self.link.close() 76 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
77
78 - def send(self, msg, timeout=False, error_states=None):
79 delivery = self.link.send(msg) 80 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 81 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 82 delivery.settle() 83 bad = error_states 84 if bad is None: 85 bad = [Delivery.REJECTED, Delivery.RELEASED] 86 if delivery.remote_state in bad: 87 raise SendException(delivery.remote_state) 88 return delivery
89
90 -class Fetcher(MessagingHandler):
91 - def __init__(self, connection, prefetch):
92 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 93 self.connection = connection 94 self.incoming = collections.deque([]) 95 self.unsettled = collections.deque([])
96
97 - def on_message(self, event):
98 self.incoming.append((event.message, event.delivery)) 99 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
100 106
107 - def on_connection_error(self, event):
108 if not self.connection.closing: 109 raise ConnectionClosed(event.connection)
110 111 @property
112 - def has_message(self):
113 return len(self.incoming)
114
115 - def pop(self):
116 message, delivery = self.incoming.popleft() 117 if not delivery.settled: 118 self.unsettled.append(delivery) 119 return message
120
121 - def settle(self, state=None):
122 delivery = self.unsettled.popleft() 123 if state: 124 delivery.update(state) 125 delivery.settle()
126
127 128 -class BlockingReceiver(BlockingLink):
129 - def __init__(self, connection, receiver, fetcher, credit=1):
130 super(BlockingReceiver, self).__init__(connection, receiver) 131 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 132 #this may be followed by a detach, which may contain an error condition, so wait a little... 133 self._waitForClose() 134 #...but close ourselves if peer does not 135 self.link.close() 136 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 137 if credit: receiver.flow(credit) 138 self.fetcher = fetcher 139 self.container = connection.container
140
141 - def __del__(self):
142 self.fetcher = None 143 # The next line causes a core dump if the Proton-C reactor finalizes 144 # first. The self.container reference prevents out of order reactor 145 # finalization. It may not be set if exception in BlockingLink.__init__ 146 if hasattr(self, "container"): 147 self.link.handler = None # implicit call to reactor
148
149 - def receive(self, timeout=False):
150 if not self.fetcher: 151 raise Exception("Can't call receive on this receiver as a handler was provided") 152 if not self.link.credit: 153 self.link.flow(1) 154 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 155 return self.fetcher.pop()
156
157 - def accept(self):
159
160 - def reject(self):
162
163 - def release(self, delivered=True):
164 if delivered: 165 self.settle(Delivery.MODIFIED) 166 else: 167 self.settle(Delivery.RELEASED)
168
169 - def settle(self, state=None):
170 if not self.fetcher: 171 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 172 self.fetcher.settle(state)
173
174 175 -class LinkDetached(LinkException):
176 - def __init__(self, link):
177 self.link = link 178 if link.is_sender: 179 txt = "sender %s to %s closed" % (link.name, link.target.address) 180 else: 181 txt = "receiver %s from %s closed" % (link.name, link.source.address) 182 if link.remote_condition: 183 txt += " due to: %s" % link.remote_condition 184 self.condition = link.remote_condition.name 185 else: 186 txt += " by peer" 187 self.condition = None 188 super(LinkDetached, self).__init__(txt)
189
190 191 -class ConnectionClosed(ConnectionException):
192 - def __init__(self, connection):
193 self.connection = connection 194 txt = "Connection %s closed" % connection.hostname 195 if connection.remote_condition: 196 txt += " due to: %s" % connection.remote_condition 197 self.condition = connection.remote_condition.name 198 else: 199 txt += " by peer" 200 self.condition = None 201 super(ConnectionClosed, self).__init__(txt)
202
203 204 -class BlockingConnection(Handler):
205 """ 206 A synchronous style connection wrapper. 207 """
208 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
209 self.disconnected = False 210 self.timeout = timeout or 60 211 self.container = container or Container() 212 self.container.timeout = self.timeout 213 self.container.start() 214 self.url = Url(url).defaults() 215 self.conn = None 216 self.closing = False 217 failed = True 218 try: 219 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 220 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 221 msg="Opening connection") 222 failed = False 223 finally: 224 if failed and self.conn: 225 self.close()
226
227 - def create_sender(self, address, handler=None, name=None, options=None):
228 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
229
230 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
231 prefetch = credit 232 if handler: 233 fetcher = None 234 if prefetch is None: 235 prefetch = 1 236 else: 237 fetcher = Fetcher(self, credit) 238 return BlockingReceiver( 239 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
240
241 - def close(self):
242 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 243 if self.closing: 244 return 245 self.closing = True 246 self.container.errors = [] 247 try: 248 if self.conn: 249 self.conn.close() 250 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 251 msg="Closing connection") 252 finally: 253 self.conn.free() 254 # For cleanup, reactor needs to process PN_CONNECTION_FINAL 255 # and all events with embedded contexts must be drained. 256 self.run() # will not block any more 257 self.conn = None 258 self.container.global_handler = None # break circular ref: container to cadapter.on_error 259 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 260 self.container = None
261
262 - def _is_closed(self):
263 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
264
265 - def run(self):
266 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 267 while self.container.process(): pass 268 self.container.stop() 269 self.container.process()
270
271 - def wait(self, condition, timeout=False, msg=None):
272 """Call process until condition() is true""" 273 if timeout is False: 274 timeout = self.timeout 275 if timeout is None: 276 while not condition() and not self.disconnected: 277 self.container.process() 278 else: 279 container_timeout = self.container.timeout 280 self.container.timeout = timeout 281 try: 282 deadline = time.time() + timeout 283 while not condition() and not self.disconnected: 284 self.container.process() 285 if deadline < time.time(): 286 txt = "Connection %s timed out" % self.url 287 if msg: txt += ": " + msg 288 raise Timeout(txt) 289 finally: 290 self.container.timeout = container_timeout 291 if self.disconnected or self._is_closed(): 292 self.container.stop() 293 self.conn.handler = None # break cyclical reference 294 if self.disconnected and not self._is_closed(): 295 raise ConnectionException( 296 "Connection %s disconnected: %s" % (self.url, self.disconnected))
297 303
304 - def on_connection_remote_close(self, event):
305 if event.connection.state & Endpoint.LOCAL_ACTIVE: 306 event.connection.close() 307 if not self.closing: 308 raise ConnectionClosed(event.connection)
309
310 - def on_transport_tail_closed(self, event):
311 self.on_transport_closed(event)
312
313 - def on_transport_head_closed(self, event):
314 self.on_transport_closed(event)
315
316 - def on_transport_closed(self, event):
317 self.disconnected = event.transport.condition or "unknown"
318
319 -class AtomicCount(object):
320 - def __init__(self, start=0, step=1):
321 """Thread-safe atomic counter. Start at start, increment by step.""" 322 self.count, self.step = start, step 323 self.lock = threading.Lock()
324
325 - def next(self):
326 """Get the next value""" 327 self.lock.acquire() 328 self.count += self.step; 329 result = self.count 330 self.lock.release() 331 return result
332
333 -class SyncRequestResponse(IncomingMessageHandler):
334 """ 335 Implementation of the synchronous request-responce (aka RPC) pattern. 336 @ivar address: Address for all requests, may be None. 337 @ivar connection: Connection for requests and responses. 338 """ 339 340 correlation_id = AtomicCount() 341
342 - def __init__(self, connection, address=None):
343 """ 344 Send requests and receive responses. A single instance can send many requests 345 to the same or different addresses. 346 347 @param connection: A L{BlockingConnection} 348 @param address: Address for all requests. 349 If not specified, each request must have the address property set. 350 Sucessive messages may have different addresses. 351 """ 352 super(SyncRequestResponse, self).__init__() 353 self.connection = connection 354 self.address = address 355 self.sender = self.connection.create_sender(self.address) 356 # dynamic=true generates a unique address dynamically for this receiver. 357 # credit=1 because we want to receive 1 response message initially. 358 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 359 self.response = None
360
361 - def call(self, request):
362 """ 363 Send a request message, wait for and return the response message. 364 365 @param request: A L{proton.Message}. If L{self.address} is not set the 366 L{self.address} must be set and will be used. 367 """ 368 if not self.address and not request.address: 369 raise ValueError("Request message has no address: %s" % request) 370 request.reply_to = self.reply_to 371 request.correlation_id = correlation_id = self.correlation_id.next() 372 self.sender.send(request) 373 def wakeup(): 374 return self.response and (self.response.correlation_id == correlation_id)
375 self.connection.wait(wakeup, msg="Waiting for response") 376 response = self.response 377 self.response = None # Ready for next response. 378 self.receiver.flow(1) # Set up credit for the next response. 379 return response
380 381 @property
382 - def reply_to(self):
383 """Return the dynamic address of our receiver.""" 384 return self.receiver.remote_source.address
385
386 - def on_message(self, event):
387 """Called when we receive a message for our receiver.""" 388 self.response = event.message 389 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
390