Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler 25 from cproton import pn_reactor_collector, pn_collector_release57 64 6730 self.connection = connection 31 self.link = link 32 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 33 msg="Opening link %s" % link.name) 34 self._checkClosed()3537 try: 38 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 39 timeout=timeout, 40 msg="Opening link %s" % self.link.name) 41 except Timeout as e: pass 42 self._checkClosed()4345 if self.link.state & Endpoint.REMOTE_CLOSED: 46 self.link.close() 47 if not self.connection.closing: 48 raise LinkDetached(self.link)4951 self.link.close() 52 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 53 msg="Closing link %s" % self.link.name)54 55 # Access to other link attributes.8970 super(BlockingSender, self).__init__(connection, sender) 71 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 72 #this may be followed by a detach, which may contain an error condition, so wait a little... 73 self._waitForClose() 74 #...but close ourselves if peer does not 75 self.link.close() 76 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7779 delivery = self.link.send(msg) 80 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 81 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 82 delivery.settle() 83 bad = error_states 84 if bad is None: 85 bad = [Delivery.REJECTED, Delivery.RELEASED] 86 if delivery.remote_state in bad: 87 raise SendException(delivery.remote_state) 88 return delivery12692 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 93 self.connection = connection 94 self.incoming = collections.deque([]) 95 self.unsettled = collections.deque([])9698 self.incoming.append((event.message, event.delivery)) 99 self.connection.container.yield_() # Wake up the wait() loop to handle the message.100102 if event.link.state & Endpoint.LOCAL_ACTIVE: 103 event.link.close() 104 if not self.connection.closing: 105 raise LinkDetached(event.link)106 110 111 @property113 return len(self.incoming)114116 message, delivery = self.incoming.popleft() 117 if not delivery.settled: 118 self.unsettled.append(delivery) 119 return message120173130 super(BlockingReceiver, self).__init__(connection, receiver) 131 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 132 #this may be followed by a detach, which may contain an error condition, so wait a little... 133 self._waitForClose() 134 #...but close ourselves if peer does not 135 self.link.close() 136 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 137 if credit: receiver.flow(credit) 138 self.fetcher = fetcher 139 self.container = connection.container140142 self.fetcher = None 143 # The next line causes a core dump if the Proton-C reactor finalizes 144 # first. The self.container reference prevents out of order reactor 145 # finalization. It may not be set if exception in BlockingLink.__init__ 146 if hasattr(self, "container"): 147 self.link.handler = None # implicit call to reactor148150 if not self.fetcher: 151 raise Exception("Can't call receive on this receiver as a handler was provided") 152 if not self.link.credit: 153 self.link.flow(1) 154 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 155 return self.fetcher.pop()156 159 162 168189177 self.link = link 178 if link.is_sender: 179 txt = "sender %s to %s closed" % (link.name, link.target.address) 180 else: 181 txt = "receiver %s from %s closed" % (link.name, link.source.address) 182 if link.remote_condition: 183 txt += " due to: %s" % link.remote_condition 184 self.condition = link.remote_condition.name 185 else: 186 txt += " by peer" 187 self.condition = None 188 super(LinkDetached, self).__init__(txt)202193 self.connection = connection 194 txt = "Connection %s closed" % connection.hostname 195 if connection.remote_condition: 196 txt += " due to: %s" % connection.remote_condition 197 self.condition = connection.remote_condition.name 198 else: 199 txt += " by peer" 200 self.condition = None 201 super(ConnectionClosed, self).__init__(txt)205 """ 206 A synchronous style connection wrapper. 207 """318208 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):209 self.disconnected = False 210 self.timeout = timeout or 60 211 self.container = container or Container() 212 self.container.timeout = self.timeout 213 self.container.start() 214 self.url = Url(url).defaults() 215 self.conn = None 216 self.closing = False 217 failed = True 218 try: 219 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 220 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 221 msg="Opening connection") 222 failed = False 223 finally: 224 if failed and self.conn: 225 self.close()226228 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))229230 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):231 prefetch = credit 232 if handler: 233 fetcher = None 234 if prefetch is None: 235 prefetch = 1 236 else: 237 fetcher = Fetcher(self, credit) 238 return BlockingReceiver( 239 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)240242 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 243 if self.closing: 244 return 245 self.closing = True 246 self.container.errors = [] 247 try: 248 if self.conn: 249 self.conn.close() 250 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 251 msg="Closing connection") 252 finally: 253 self.conn.free() 254 # For cleanup, reactor needs to process PN_CONNECTION_FINAL 255 # and all events with embedded contexts must be drained. 256 self.run() # will not block any more 257 self.conn = None 258 self.container.global_handler = None # break circular ref: container to cadapter.on_error 259 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 260 self.container = None261 264266 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 267 while self.container.process(): pass 268 self.container.stop() 269 self.container.process()270272 """Call process until condition() is true""" 273 if timeout is False: 274 timeout = self.timeout 275 if timeout is None: 276 while not condition() and not self.disconnected: 277 self.container.process() 278 else: 279 container_timeout = self.container.timeout 280 self.container.timeout = timeout 281 try: 282 deadline = time.time() + timeout 283 while not condition() and not self.disconnected: 284 self.container.process() 285 if deadline < time.time(): 286 txt = "Connection %s timed out" % self.url 287 if msg: txt += ": " + msg 288 raise Timeout(txt) 289 finally: 290 self.container.timeout = container_timeout 291 if self.disconnected or self._is_closed(): 292 self.container.stop() 293 self.conn.handler = None # break cyclical reference 294 if self.disconnected and not self._is_closed(): 295 raise ConnectionException( 296 "Connection %s disconnected: %s" % (self.url, self.disconnected))297299 if event.link.state & Endpoint.LOCAL_ACTIVE: 300 event.link.close() 301 if not self.closing: 302 raise LinkDetached(event.link)303305 if event.connection.state & Endpoint.LOCAL_ACTIVE: 306 event.connection.close() 307 if not self.closing: 308 raise ConnectionClosed(event.connection)309311 self.on_transport_closed(event)312314 self.on_transport_closed(event)315332321 """Thread-safe atomic counter. Start at start, increment by step.""" 322 self.count, self.step = start, step 323 self.lock = threading.Lock()324326 """Get the next value""" 327 self.lock.acquire() 328 self.count += self.step; 329 result = self.count 330 self.lock.release() 331 return result334 """ 335 Implementation of the synchronous request-responce (aka RPC) pattern. 336 @ivar address: Address for all requests, may be None. 337 @ivar connection: Connection for requests and responses. 338 """ 339 340 correlation_id = AtomicCount() 341380 381 @property343 """ 344 Send requests and receive responses. A single instance can send many requests 345 to the same or different addresses. 346 347 @param connection: A L{BlockingConnection} 348 @param address: Address for all requests. 349 If not specified, each request must have the address property set. 350 Sucessive messages may have different addresses. 351 """ 352 super(SyncRequestResponse, self).__init__() 353 self.connection = connection 354 self.address = address 355 self.sender = self.connection.create_sender(self.address) 356 # dynamic=true generates a unique address dynamically for this receiver. 357 # credit=1 because we want to receive 1 response message initially. 358 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 359 self.response = None360362 """ 363 Send a request message, wait for and return the response message. 364 365 @param request: A L{proton.Message}. If L{self.address} is not set the 366 L{self.address} must be set and will be used. 367 """ 368 if not self.address and not request.address: 369 raise ValueError("Request message has no address: %s" % request) 370 request.reply_to = self.reply_to 371 request.correlation_id = correlation_id = self.correlation_id.next() 372 self.sender.send(request) 373 def wakeup(): 374 return self.response and (self.response.correlation_id == correlation_id)375 self.connection.wait(wakeup, msg="Waiting for response") 376 response = self.response 377 self.response = None # Ready for next response. 378 self.receiver.flow(1) # Set up credit for the next response. 379 return response383 """Return the dynamic address of our receiver.""" 384 return self.receiver.remote_source.address385387 """Called when we receive a message for our receiver.""" 388 self.response = event.message 389 self.connection.container.yield_() # Wake up the wait() loop to handle the message.390
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Mon Sep 6 09:01:32 2021 | http://epydoc.sourceforge.net |