Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h, on_error=self.on_error_delegate())
88
89 - def _init(self):
90 self.errors = []
91 92 # on_error relay handler tied to underlying C reactor. Use when the 93 # error will always be generated from a callback from this reactor. 94 # Needed to prevent reference cycles and be compatible with wrappers.
95 - class ErrorDelegate(object):
96 - def __init__(self, reactor):
97 self.reactor_impl = reactor._impl
98 - def on_error(self, info):
99 ractor = Reactor.wrap(self.reactor_impl) 100 ractor.on_error(info)
101
102 - def on_error_delegate(self):
103 return Reactor.ErrorDelegate(self).on_error
104
105 - def on_error(self, info):
106 self.errors.append(info) 107 self.yield_()
108
109 - def _get_global(self):
110 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
111
112 - def _set_global(self, handler):
113 impl = _chandler(handler, self.on_error_delegate()) 114 pn_reactor_set_global_handler(self._impl, impl) 115 pn_decref(impl)
116 117 global_handler = property(_get_global, _set_global) 118
119 - def _get_timeout(self):
120 return millis2timeout(pn_reactor_get_timeout(self._impl))
121
122 - def _set_timeout(self, secs):
123 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
124 125 timeout = property(_get_timeout, _set_timeout) 126
127 - def yield_(self):
128 pn_reactor_yield(self._impl)
129
130 - def mark(self):
131 return pn_reactor_mark(self._impl)
132
133 - def _get_handler(self):
134 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
135
136 - def _set_handler(self, handler):
137 impl = _chandler(handler, self.on_error_delegate()) 138 pn_reactor_set_handler(self._impl, impl) 139 pn_decref(impl)
140 141 handler = property(_get_handler, _set_handler) 142
143 - def run(self):
144 self.timeout = 3.14159265359 145 self.start() 146 while self.process(): pass 147 self.stop() 148 self.process() 149 self.global_handler = None 150 self.handler = None
151
152 - def wakeup(self):
153 n = pn_reactor_wakeup(self._impl) 154 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
155
156 - def start(self):
157 pn_reactor_start(self._impl)
158 159 @property
160 - def quiesced(self):
161 return pn_reactor_quiesced(self._impl)
162
163 - def _check_errors(self):
164 if self.errors: 165 for exc, value, tb in self.errors[:-1]: 166 traceback.print_exception(exc, value, tb) 167 exc, value, tb = self.errors[-1] 168 _compat.raise_(exc, value, tb)
169
170 - def process(self):
171 result = pn_reactor_process(self._impl) 172 self._check_errors() 173 return result
174
175 - def stop(self):
176 pn_reactor_stop(self._impl) 177 self._check_errors()
178
179 - def schedule(self, delay, task):
180 impl = _chandler(task, self.on_error_delegate()) 181 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 182 pn_decref(impl) 183 return task
184
185 - def acceptor(self, host, port, handler=None):
186 impl = _chandler(handler, self.on_error_delegate()) 187 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 188 pn_decref(impl) 189 if aimpl: 190 return Acceptor(aimpl) 191 else: 192 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
193
194 - def connection(self, handler=None):
195 """Deprecated: use connection_to_host() instead 196 """ 197 impl = _chandler(handler, self.on_error_delegate()) 198 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 199 if impl: pn_decref(impl) 200 return result
201
202 - def connection_to_host(self, host, port, handler=None):
203 """Create an outgoing Connection that will be managed by the reactor. 204 The reator's pn_iohandler will create a socket connection to the host 205 once the connection is opened. 206 """ 207 conn = self.connection(handler) 208 self.set_connection_host(conn, host, port) 209 return conn
210
211 - def set_connection_host(self, connection, host, port):
212 """Change the address used by the connection. The address is 213 used by the reactor's iohandler to create an outgoing socket 214 connection. This must be set prior to opening the connection. 215 """ 216 pn_reactor_set_connection_host(self._impl, 217 connection._impl, 218 unicode2utf8(str(host)), 219 unicode2utf8(str(port)))
220
221 - def get_connection_address(self, connection):
222 """This may be used to retrieve the remote peer address. 223 @return: string containing the address in URL format or None if no 224 address is available. Use the proton.Url class to create a Url object 225 from the returned value. 226 """ 227 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 228 return utf82unicode(_url)
229
230 - def selectable(self, handler=None):
231 impl = _chandler(handler, self.on_error_delegate()) 232 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 233 if impl: 234 record = pn_selectable_attachments(result._impl) 235 pn_record_set_handler(record, impl) 236 pn_decref(impl) 237 return result
238
239 - def update(self, sel):
240 pn_reactor_update(self._impl, sel._impl)
241
242 - def push_event(self, obj, etype):
243 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
244 245 from proton import wrappers as _wrappers 246 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 247 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
248 249 250 -class EventInjector(object):
251 """ 252 Can be added to a reactor to allow events to be triggered by an 253 external thread but handled on the event thread associated with 254 the reactor. An instance of this class can be passed to the 255 Reactor.selectable() method of the reactor in order to activate 256 it. The close() method should be called when it is no longer 257 needed, to allow the event loop to end if needed. 258 """
259 - def __init__(self):
260 self.queue = Queue.Queue() 261 self.pipe = os.pipe() 262 self._closed = False
263
264 - def trigger(self, event):
265 """ 266 Request that the given event be dispatched on the event thread 267 of the reactor to which this EventInjector was added. 268 """ 269 self.queue.put(event) 270 os.write(self.pipe[1], _compat.str2bin("!"))
271
272 - def close(self):
273 """ 274 Request that this EventInjector be closed. Existing events 275 will be dispctahed on the reactors event dispactch thread, 276 then this will be removed from the set of interest. 277 """ 278 self._closed = True 279 os.write(self.pipe[1], _compat.str2bin("!"))
280
281 - def fileno(self):
282 return self.pipe[0]
283
284 - def on_selectable_init(self, event):
285 sel = event.context 286 sel.fileno(self.fileno()) 287 sel.reading = True 288 event.reactor.update(sel)
289
290 - def on_selectable_readable(self, event):
291 os.read(self.pipe[0], 512) 292 while not self.queue.empty(): 293 requested = self.queue.get() 294 event.reactor.push_event(requested.context, requested.type) 295 if self._closed: 296 s = event.context 297 s.terminate() 298 event.reactor.update(s)
299
300 301 -class ApplicationEvent(EventBase):
302 """ 303 Application defined event, which can optionally be associated with 304 an engine object and or an arbitrary subject 305 """
306 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
307 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 308 self.connection = connection 309 self.session = session 310 self.link = link 311 self.delivery = delivery 312 if self.delivery: 313 self.link = self.delivery.link 314 if self.link: 315 self.session = self.link.session 316 if self.session: 317 self.connection = self.session.connection 318 self.subject = subject
319
320 - def __repr__(self):
321 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 322 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
323
324 -class Transaction(object):
325 """ 326 Class to track state of an AMQP 1.0 transaction. 327 """
328 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
329 self.txn_ctrl = txn_ctrl 330 self.handler = handler 331 self.id = None 332 self._declare = None 333 self._discharge = None 334 self.failed = False 335 self._pending = [] 336 self.settle_before_discharge = settle_before_discharge 337 self.declare()
338
339 - def commit(self):
340 self.discharge(False)
341
342 - def abort(self):
343 self.discharge(True)
344
345 - def declare(self):
346 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
347
348 - def discharge(self, failed):
349 self.failed = failed 350 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
351
352 - def _send_ctrl(self, descriptor, value):
353 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 354 delivery.transaction = self 355 return delivery
356
357 - def send(self, sender, msg, tag=None):
358 dlv = sender.send(msg, tag=tag) 359 dlv.local.data = [self.id] 360 dlv.update(0x34) 361 return dlv
362
363 - def accept(self, delivery):
364 self.update(delivery, PN_ACCEPTED) 365 if self.settle_before_discharge: 366 delivery.settle() 367 else: 368 self._pending.append(delivery)
369
370 - def update(self, delivery, state=None):
371 if state: 372 delivery.local.data = [self.id, Described(ulong(state), [])] 373 delivery.update(0x34)
374
375 - def _release_pending(self):
376 for d in self._pending: 377 d.update(Delivery.RELEASED) 378 d.settle() 379 self._clear_pending()
380
381 - def _clear_pending(self):
382 self._pending = []
383
384 - def handle_outcome(self, event):
385 if event.delivery == self._declare: 386 if event.delivery.remote.data: 387 self.id = event.delivery.remote.data[0] 388 self.handler.on_transaction_declared(event) 389 elif event.delivery.remote_state == Delivery.REJECTED: 390 self.handler.on_transaction_declare_failed(event) 391 else: 392 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 393 self.handler.on_transaction_declare_failed(event) 394 elif event.delivery == self._discharge: 395 if event.delivery.remote_state == Delivery.REJECTED: 396 if not self.failed: 397 self.handler.on_transaction_commit_failed(event) 398 self._release_pending() # make this optional? 399 else: 400 if self.failed: 401 self.handler.on_transaction_aborted(event) 402 self._release_pending() 403 else: 404 self.handler.on_transaction_committed(event) 405 self._clear_pending()
406
407 -class LinkOption(object):
408 """ 409 Abstract interface for link configuration options 410 """
411 - def apply(self, link):
412 """ 413 Subclasses will implement any configuration logic in this 414 method 415 """ 416 pass
417 - def test(self, link):
418 """ 419 Subclasses can override this to selectively apply an option 420 e.g. based on some link criteria 421 """ 422 return True
423
424 -class AtMostOnce(LinkOption):
425 - def apply(self, link):
427
428 -class AtLeastOnce(LinkOption):
429 - def apply(self, link):
432
433 -class SenderOption(LinkOption):
434 - def apply(self, sender): pass
435 - def test(self, link): return link.is_sender
436
437 -class ReceiverOption(LinkOption):
438 - def apply(self, receiver): pass
439 - def test(self, link): return link.is_receiver
440
441 -class DynamicNodeProperties(LinkOption):
442 - def __init__(self, props={}):
443 self.properties = {} 444 for k in props: 445 if isinstance(k, symbol): 446 self.properties[k] = props[k] 447 else: 448 self.properties[symbol(k)] = props[k]
449
450 - def apply(self, link):
455
456 -class Filter(ReceiverOption):
457 - def __init__(self, filter_set={}):
458 self.filter_set = filter_set
459
460 - def apply(self, receiver):
461 receiver.source.filter.put_dict(self.filter_set)
462
463 -class Selector(Filter):
464 """ 465 Configures a link with a message selector filter 466 """
467 - def __init__(self, value, name='selector'):
468 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
469
470 -class DurableSubscription(ReceiverOption):
471 - def apply(self, receiver):
474
475 -class Move(ReceiverOption):
476 - def apply(self, receiver):
478
479 -class Copy(ReceiverOption):
480 - def apply(self, receiver):
482 490
491 -def _create_session(connection, handler=None):
492 session = connection.session() 493 session.open() 494 return session
495
496 497 -def _get_attr(target, name):
498 if hasattr(target, name): 499 return getattr(target, name) 500 else: 501 return None
502
503 -class SessionPerConnection(object):
504 - def __init__(self):
505 self._default_session = None
506
507 - def session(self, connection):
508 if not self._default_session: 509 self._default_session = _create_session(connection) 510 return self._default_session
511
512 -class GlobalOverrides(object):
513 """ 514 Internal handler that triggers the necessary socket connect for an 515 opened connection. 516 """
517 - def __init__(self, base):
518 self.base = base
519
520 - def on_unhandled(self, name, event):
521 if not self._override(event): 522 event.dispatch(self.base)
523
524 - def _override(self, event):
525 conn = event.connection 526 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
527
528 -class Connector(Handler):
529 """ 530 Internal handler that triggers the necessary socket connect for an 531 opened connection. 532 """
533 - def __init__(self, connection):
534 self.connection = connection 535 self.address = None 536 self.heartbeat = None 537 self.reconnect = None 538 self.ssl_domain = None 539 self.allow_insecure_mechs = True 540 self.allowed_mechs = None 541 self.sasl_enabled = True 542 self.user = None 543 self.password = None 544 self.virtual_host = None 545 self.ssl_sni = None
546
547 - def _connect(self, connection, reactor):
548 assert(reactor is not None) 549 url = self.address.next() 550 reactor.set_connection_host(connection, url.host, str(url.port)) 551 # if virtual-host not set, use host from address as default 552 if self.virtual_host is None: 553 connection.hostname = url.host 554 logging.debug("connecting to %s..." % url) 555 556 transport = Transport() 557 if self.sasl_enabled: 558 sasl = transport.sasl() 559 sasl.allow_insecure_mechs = self.allow_insecure_mechs 560 if url.username: 561 connection.user = url.username 562 elif self.user: 563 connection.user = self.user 564 if url.password: 565 connection.password = url.password 566 elif self.password: 567 connection.password = self.password 568 if self.allowed_mechs: 569 sasl.allowed_mechs(self.allowed_mechs) 570 transport.bind(connection) 571 if self.heartbeat: 572 transport.idle_timeout = self.heartbeat 573 if url.scheme == 'amqps': 574 if not self.ssl_domain: 575 raise SSLUnavailable("amqps: SSL libraries not found") 576 self.ssl = SSL(transport, self.ssl_domain) 577 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
578
579 - def on_connection_local_open(self, event):
580 self._connect(event.connection, event.reactor)
581
582 - def on_connection_remote_open(self, event):
583 logging.debug("connected to %s" % event.connection.hostname) 584 if self.reconnect: 585 self.reconnect.reset() 586 self.transport = None
587
588 - def on_transport_tail_closed(self, event):
589 self.on_transport_closed(event)
590
591 - def on_transport_closed(self, event):
592 if self.connection is None: return 593 if self.connection.state & Endpoint.LOCAL_ACTIVE: 594 if self.reconnect: 595 event.transport.unbind() 596 delay = self.reconnect.next() 597 if delay == 0: 598 logging.info("Disconnected, reconnecting...") 599 self._connect(self.connection, event.reactor) 600 return 601 else: 602 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 603 event.reactor.schedule(delay, self) 604 return 605 else: 606 logging.debug("Disconnected") 607 # See connector.cpp: conn.free()/pn_connection_release() here? 608 self.connection = None
609
610 - def on_timer_task(self, event):
611 self._connect(self.connection, event.reactor)
612
613 -class Backoff(object):
614 """ 615 A reconnect strategy involving an increasing delay between 616 retries, up to a maximum or 10 seconds. 617 """
618 - def __init__(self):
619 self.delay = 0
620
621 - def reset(self):
622 self.delay = 0
623
624 - def next(self):
625 current = self.delay 626 if current == 0: 627 self.delay = 0.1 628 else: 629 self.delay = min(10, 2*current) 630 return current
631
632 -class Urls(object):
633 - def __init__(self, values):
634 self.values = [Url(v) for v in values] 635 self.i = iter(self.values)
636
637 - def __iter__(self):
638 return self
639
640 - def next(self):
641 try: 642 return next(self.i) 643 except StopIteration: 644 self.i = iter(self.values) 645 return next(self.i)
646
647 -class SSLConfig(object):
648 - def __init__(self):
649 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 650 self.server = SSLDomain(SSLDomain.MODE_SERVER)
651
652 - def set_credentials(self, cert_file, key_file, password):
653 self.client.set_credentials(cert_file, key_file, password) 654 self.server.set_credentials(cert_file, key_file, password)
655
656 - def set_trusted_ca_db(self, certificate_db):
657 self.client.set_trusted_ca_db(certificate_db) 658 self.server.set_trusted_ca_db(certificate_db)
659
660 661 -class Container(Reactor):
662 """A representation of the AMQP concept of a 'container', which 663 lossely speaking is something that establishes links to or from 664 another container, over which messages are transfered. This is 665 an extension to the Reactor class that adds convenience methods 666 for creating connections and sender- or receiver- links. 667 """
668 - def __init__(self, *handlers, **kwargs):
669 super(Container, self).__init__(*handlers, **kwargs) 670 if "impl" not in kwargs: 671 try: 672 self.ssl = SSLConfig() 673 except SSLUnavailable: 674 self.ssl = None 675 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 676 self.trigger = None 677 self.container_id = str(generate_uuid()) 678 self.allow_insecure_mechs = True 679 self.allowed_mechs = None 680 self.sasl_enabled = True 681 self.user = None 682 self.password = None 683 Wrapper.__setattr__(self, 'subclass', self.__class__)
684
685 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
686 """ 687 Initiates the establishment of an AMQP connection. Returns an 688 instance of proton.Connection. 689 690 @param url: URL string of process to connect to 691 692 @param urls: list of URL strings of process to try to connect to 693 694 Only one of url or urls should be specified. 695 696 @param reconnect: A value of False will prevent the library 697 form automatically trying to reconnect if the underlying 698 socket is disconnected before the connection has been closed. 699 700 @param heartbeat: A value in milliseconds indicating the 701 desired frequency of heartbeats used to test the underlying 702 socket is alive. 703 704 @param ssl_domain: SSL configuration in the form of an 705 instance of proton.SSLdomain. 706 707 @param handler: a connection scoped handler that will be 708 called to process any events in the scope of this connection 709 or its child links 710 711 @param kwargs: sasl_enabled, which determines whether a sasl layer is 712 used for the connection; allowed_mechs an optional list of SASL 713 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag 714 indicating whether insecure mechanisms, such as PLAIN over a 715 non-encrypted socket, are allowed; 'virtual_host' the hostname to set 716 in the Open performative used by peer to determine the correct 717 back-end service for the client. If 'virtual_host' is not supplied the 718 host field from the URL is used instead." 719 720 """ 721 conn = self.connection(handler) 722 conn.container = self.container_id or str(generate_uuid()) 723 conn.offered_capabilities = kwargs.get('offered_capabilities') 724 conn.desired_capabilities = kwargs.get('desired_capabilities') 725 conn.properties = kwargs.get('properties') 726 727 connector = Connector(conn) 728 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 729 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 730 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 731 connector.user = kwargs.get('user', self.user) 732 connector.password = kwargs.get('password', self.password) 733 connector.virtual_host = kwargs.get('virtual_host') 734 if connector.virtual_host: 735 # only set hostname if virtual-host is a non-empty string 736 conn.hostname = connector.virtual_host 737 connector.ssl_sni = kwargs.get('sni') 738 739 conn._overrides = connector 740 if url: connector.address = Urls([url]) 741 elif urls: connector.address = Urls(urls) 742 elif address: connector.address = address 743 else: raise ValueError("One of url, urls or address required") 744 if heartbeat: 745 connector.heartbeat = heartbeat 746 if reconnect: 747 connector.reconnect = reconnect 748 elif reconnect is None: 749 connector.reconnect = Backoff() 750 # use container's default client domain if none specified. This is 751 # only necessary of the URL specifies the "amqps:" scheme 752 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 753 conn._session_policy = SessionPerConnection() #todo: make configurable 754 conn.open() 755 return conn
756
757 - def _get_id(self, container, remote, local):
758 if local and remote: "%s-%s-%s" % (container, remote, local) 759 elif local: return "%s-%s" % (container, local) 760 elif remote: return "%s-%s" % (container, remote) 761 else: return "%s-%s" % (container, str(generate_uuid()))
762
763 - def _get_session(self, context):
764 if isinstance(context, Url): 765 return self._get_session(self.connect(url=context)) 766 elif isinstance(context, Session): 767 return context 768 elif isinstance(context, Connection): 769 if hasattr(context, '_session_policy'): 770 return context._session_policy.session(context) 771 else: 772 return _create_session(context) 773 else: 774 return context.session()
775
776 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
777 """ 778 Initiates the establishment of a link over which messages can 779 be sent. Returns an instance of proton.Sender. 780 781 There are two patterns of use. (1) A connection can be passed 782 as the first argument, in which case the link is established 783 on that connection. In this case the target address can be 784 specified as the second argument (or as a keyword 785 argument). The source address can also be specified if 786 desired. (2) Alternatively a URL can be passed as the first 787 argument. In this case a new connection will be establised on 788 which the link will be attached. If a path is specified and 789 the target is not, then the path of the URL is used as the 790 target address. 791 792 The name of the link may be specified if desired, otherwise a 793 unique name will be generated. 794 795 Various LinkOptions can be specified to further control the 796 attachment. 797 """ 798 if isinstance(context, _compat.STRING_TYPES): 799 context = Url(context) 800 if isinstance(context, Url) and not target: 801 target = context.path 802 session = self._get_session(context) 803 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 804 if source: 805 snd.source.address = source 806 if target: 807 snd.target.address = target 808 if handler != None: 809 snd.handler = handler 810 if tags: 811 snd.tag_generator = tags 812 _apply_link_options(options, snd) 813 snd.open() 814 return snd
815
816 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
817 """ 818 Initiates the establishment of a link over which messages can 819 be received (aka a subscription). Returns an instance of 820 proton.Receiver. 821 822 There are two patterns of use. (1) A connection can be passed 823 as the first argument, in which case the link is established 824 on that connection. In this case the source address can be 825 specified as the second argument (or as a keyword 826 argument). The target address can also be specified if 827 desired. (2) Alternatively a URL can be passed as the first 828 argument. In this case a new connection will be establised on 829 which the link will be attached. If a path is specified and 830 the source is not, then the path of the URL is used as the 831 target address. 832 833 The name of the link may be specified if desired, otherwise a 834 unique name will be generated. 835 836 Various LinkOptions can be specified to further control the 837 attachment. 838 """ 839 if isinstance(context, _compat.STRING_TYPES): 840 context = Url(context) 841 if isinstance(context, Url) and not source: 842 source = context.path 843 session = self._get_session(context) 844 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 845 if source: 846 rcv.source.address = source 847 if dynamic: 848 rcv.source.dynamic = True 849 if target: 850 rcv.target.address = target 851 if handler != None: 852 rcv.handler = handler 853 _apply_link_options(options, rcv) 854 rcv.open() 855 return rcv
856
857 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
858 if not _get_attr(context, '_txn_ctrl'): 859 class InternalTransactionHandler(OutgoingMessageHandler): 860 def __init__(self): 861 super(InternalTransactionHandler, self).__init__(auto_settle=True)
862 863 def on_settled(self, event): 864 if hasattr(event.delivery, "transaction"): 865 event.transaction = event.delivery.transaction 866 event.delivery.transaction.handle_outcome(event)
867 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 868 context._txn_ctrl.target.type = Terminus.COORDINATOR 869 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 870 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 871
872 - def listen(self, url, ssl_domain=None):
873 """ 874 Initiates a server socket, accepting incoming AMQP connections 875 on the interface and port specified. 876 """ 877 url = Url(url) 878 acceptor = self.acceptor(url.host, url.port) 879 ssl_config = ssl_domain 880 if not ssl_config and url.scheme == 'amqps': 881 # use container's default server domain 882 if self.ssl: 883 ssl_config = self.ssl.server 884 else: 885 raise SSLUnavailable("amqps: SSL libraries not found") 886 if ssl_config: 887 acceptor.set_ssl_domain(ssl_config) 888 return acceptor
889
890 - def do_work(self, timeout=None):
891 if timeout: 892 self.timeout = timeout 893 return self.process()
894