1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
92
93
94
97 self.reactor_impl = reactor._impl
101
104
106 self.errors.append(info)
107 self.yield_()
108
111
113 impl = _chandler(handler, self.on_error_delegate())
114 pn_reactor_set_global_handler(self._impl, impl)
115 pn_decref(impl)
116
117 global_handler = property(_get_global, _set_global)
118
120 return millis2timeout(pn_reactor_get_timeout(self._impl))
121
123 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
124
125 timeout = property(_get_timeout, _set_timeout)
126
128 pn_reactor_yield(self._impl)
129
131 return pn_reactor_mark(self._impl)
132
135
137 impl = _chandler(handler, self.on_error_delegate())
138 pn_reactor_set_handler(self._impl, impl)
139 pn_decref(impl)
140
141 handler = property(_get_handler, _set_handler)
142
151
153 n = pn_reactor_wakeup(self._impl)
154 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
155
157 pn_reactor_start(self._impl)
158
159 @property
161 return pn_reactor_quiesced(self._impl)
162
164 if self.errors:
165 for exc, value, tb in self.errors[:-1]:
166 traceback.print_exception(exc, value, tb)
167 exc, value, tb = self.errors[-1]
168 _compat.raise_(exc, value, tb)
169
171 result = pn_reactor_process(self._impl)
172 self._check_errors()
173 return result
174
176 pn_reactor_stop(self._impl)
177 self._check_errors()
178
180 impl = _chandler(task, self.on_error_delegate())
181 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
182 pn_decref(impl)
183 return task
184
185 - def acceptor(self, host, port, handler=None):
186 impl = _chandler(handler, self.on_error_delegate())
187 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
188 pn_decref(impl)
189 if aimpl:
190 return Acceptor(aimpl)
191 else:
192 raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port)
193
195 """Deprecated: use connection_to_host() instead
196 """
197 impl = _chandler(handler, self.on_error_delegate())
198 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
199 if impl: pn_decref(impl)
200 return result
201
203 """Create an outgoing Connection that will be managed by the reactor.
204 The reator's pn_iohandler will create a socket connection to the host
205 once the connection is opened.
206 """
207 conn = self.connection(handler)
208 self.set_connection_host(conn, host, port)
209 return conn
210
212 """Change the address used by the connection. The address is
213 used by the reactor's iohandler to create an outgoing socket
214 connection. This must be set prior to opening the connection.
215 """
216 pn_reactor_set_connection_host(self._impl,
217 connection._impl,
218 unicode2utf8(str(host)),
219 unicode2utf8(str(port)))
220
222 """This may be used to retrieve the remote peer address.
223 @return: string containing the address in URL format or None if no
224 address is available. Use the proton.Url class to create a Url object
225 from the returned value.
226 """
227 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
228 return utf82unicode(_url)
229
231 impl = _chandler(handler, self.on_error_delegate())
232 result = Selectable.wrap(pn_reactor_selectable(self._impl))
233 if impl:
234 record = pn_selectable_attachments(result._impl)
235 pn_record_set_handler(record, impl)
236 pn_decref(impl)
237 return result
238
240 pn_reactor_update(self._impl, sel._impl)
241
243 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
244
245 from proton import wrappers as _wrappers
246 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
247 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
251 """
252 Can be added to a reactor to allow events to be triggered by an
253 external thread but handled on the event thread associated with
254 the reactor. An instance of this class can be passed to the
255 Reactor.selectable() method of the reactor in order to activate
256 it. The close() method should be called when it is no longer
257 needed, to allow the event loop to end if needed.
258 """
260 self.queue = Queue.Queue()
261 self.pipe = os.pipe()
262 self._closed = False
263
265 """
266 Request that the given event be dispatched on the event thread
267 of the reactor to which this EventInjector was added.
268 """
269 self.queue.put(event)
270 os.write(self.pipe[1], _compat.str2bin("!"))
271
273 """
274 Request that this EventInjector be closed. Existing events
275 will be dispctahed on the reactors event dispactch thread,
276 then this will be removed from the set of interest.
277 """
278 self._closed = True
279 os.write(self.pipe[1], _compat.str2bin("!"))
280
283
289
299
302 """
303 Application defined event, which can optionally be associated with
304 an engine object and or an arbitrary subject
305 """
306 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
319
323
325 """
326 Class to track state of an AMQP 1.0 transaction.
327 """
328 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
329 self.txn_ctrl = txn_ctrl
330 self.handler = handler
331 self.id = None
332 self._declare = None
333 self._discharge = None
334 self.failed = False
335 self._pending = []
336 self.settle_before_discharge = settle_before_discharge
337 self.declare()
338
341
344
346 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
347
351
356
357 - def send(self, sender, msg, tag=None):
362
369
370 - def update(self, delivery, state=None):
374
380
383
406
408 """
409 Abstract interface for link configuration options
410 """
412 """
413 Subclasses will implement any configuration logic in this
414 method
415 """
416 pass
417 - def test(self, link):
418 """
419 Subclasses can override this to selectively apply an option
420 e.g. based on some link criteria
421 """
422 return True
423
427
432
434 - def apply(self, sender): pass
436
438 - def apply(self, receiver): pass
440
455
458 self.filter_set = filter_set
459
460 - def apply(self, receiver):
462
464 """
465 Configures a link with a message selector filter
466 """
467 - def __init__(self, value, name='selector'):
469
471 - def apply(self, receiver):
474
475 -class Move(ReceiverOption):
476 - def apply(self, receiver):
478
479 -class Copy(ReceiverOption):
480 - def apply(self, receiver):
482
490
495
502
505 self._default_session = None
506
508 if not self._default_session:
509 self._default_session = _create_session(connection)
510 return self._default_session
511
513 """
514 Internal handler that triggers the necessary socket connect for an
515 opened connection.
516 """
519
521 if not self._override(event):
522 event.dispatch(self.base)
523
525 conn = event.connection
526 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
527
529 """
530 Internal handler that triggers the necessary socket connect for an
531 opened connection.
532 """
546
547 - def _connect(self, connection, reactor):
578
581
587
590
609
612
614 """
615 A reconnect strategy involving an increasing delay between
616 retries, up to a maximum or 10 seconds.
617 """
620
623
631
634 self.values = [Url(v) for v in values]
635 self.i = iter(self.values)
636
639
641 try:
642 return next(self.i)
643 except StopIteration:
644 self.i = iter(self.values)
645 return next(self.i)
646
659
662 """A representation of the AMQP concept of a 'container', which
663 lossely speaking is something that establishes links to or from
664 another container, over which messages are transfered. This is
665 an extension to the Reactor class that adds convenience methods
666 for creating connections and sender- or receiver- links.
667 """
668 - def __init__(self, *handlers, **kwargs):
684
685 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
686 """
687 Initiates the establishment of an AMQP connection. Returns an
688 instance of proton.Connection.
689
690 @param url: URL string of process to connect to
691
692 @param urls: list of URL strings of process to try to connect to
693
694 Only one of url or urls should be specified.
695
696 @param reconnect: A value of False will prevent the library
697 form automatically trying to reconnect if the underlying
698 socket is disconnected before the connection has been closed.
699
700 @param heartbeat: A value in milliseconds indicating the
701 desired frequency of heartbeats used to test the underlying
702 socket is alive.
703
704 @param ssl_domain: SSL configuration in the form of an
705 instance of proton.SSLdomain.
706
707 @param handler: a connection scoped handler that will be
708 called to process any events in the scope of this connection
709 or its child links
710
711 @param kwargs: sasl_enabled, which determines whether a sasl layer is
712 used for the connection; allowed_mechs an optional list of SASL
713 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
714 indicating whether insecure mechanisms, such as PLAIN over a
715 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
716 in the Open performative used by peer to determine the correct
717 back-end service for the client. If 'virtual_host' is not supplied the
718 host field from the URL is used instead."
719
720 """
721 conn = self.connection(handler)
722 conn.container = self.container_id or str(generate_uuid())
723 conn.offered_capabilities = kwargs.get('offered_capabilities')
724 conn.desired_capabilities = kwargs.get('desired_capabilities')
725 conn.properties = kwargs.get('properties')
726
727 connector = Connector(conn)
728 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
729 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
730 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
731 connector.user = kwargs.get('user', self.user)
732 connector.password = kwargs.get('password', self.password)
733 connector.virtual_host = kwargs.get('virtual_host')
734 if connector.virtual_host:
735
736 conn.hostname = connector.virtual_host
737 connector.ssl_sni = kwargs.get('sni')
738
739 conn._overrides = connector
740 if url: connector.address = Urls([url])
741 elif urls: connector.address = Urls(urls)
742 elif address: connector.address = address
743 else: raise ValueError("One of url, urls or address required")
744 if heartbeat:
745 connector.heartbeat = heartbeat
746 if reconnect:
747 connector.reconnect = reconnect
748 elif reconnect is None:
749 connector.reconnect = Backoff()
750
751
752 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
753 conn._session_policy = SessionPerConnection()
754 conn.open()
755 return conn
756
757 - def _get_id(self, container, remote, local):
758 if local and remote: "%s-%s-%s" % (container, remote, local)
759 elif local: return "%s-%s" % (container, local)
760 elif remote: return "%s-%s" % (container, remote)
761 else: return "%s-%s" % (container, str(generate_uuid()))
762
775
776 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
777 """
778 Initiates the establishment of a link over which messages can
779 be sent. Returns an instance of proton.Sender.
780
781 There are two patterns of use. (1) A connection can be passed
782 as the first argument, in which case the link is established
783 on that connection. In this case the target address can be
784 specified as the second argument (or as a keyword
785 argument). The source address can also be specified if
786 desired. (2) Alternatively a URL can be passed as the first
787 argument. In this case a new connection will be establised on
788 which the link will be attached. If a path is specified and
789 the target is not, then the path of the URL is used as the
790 target address.
791
792 The name of the link may be specified if desired, otherwise a
793 unique name will be generated.
794
795 Various LinkOptions can be specified to further control the
796 attachment.
797 """
798 if isinstance(context, _compat.STRING_TYPES):
799 context = Url(context)
800 if isinstance(context, Url) and not target:
801 target = context.path
802 session = self._get_session(context)
803 snd = session.sender(name or self._get_id(session.connection.container, target, source))
804 if source:
805 snd.source.address = source
806 if target:
807 snd.target.address = target
808 if handler != None:
809 snd.handler = handler
810 if tags:
811 snd.tag_generator = tags
812 _apply_link_options(options, snd)
813 snd.open()
814 return snd
815
816 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
817 """
818 Initiates the establishment of a link over which messages can
819 be received (aka a subscription). Returns an instance of
820 proton.Receiver.
821
822 There are two patterns of use. (1) A connection can be passed
823 as the first argument, in which case the link is established
824 on that connection. In this case the source address can be
825 specified as the second argument (or as a keyword
826 argument). The target address can also be specified if
827 desired. (2) Alternatively a URL can be passed as the first
828 argument. In this case a new connection will be establised on
829 which the link will be attached. If a path is specified and
830 the source is not, then the path of the URL is used as the
831 target address.
832
833 The name of the link may be specified if desired, otherwise a
834 unique name will be generated.
835
836 Various LinkOptions can be specified to further control the
837 attachment.
838 """
839 if isinstance(context, _compat.STRING_TYPES):
840 context = Url(context)
841 if isinstance(context, Url) and not source:
842 source = context.path
843 session = self._get_session(context)
844 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
845 if source:
846 rcv.source.address = source
847 if dynamic:
848 rcv.source.dynamic = True
849 if target:
850 rcv.target.address = target
851 if handler != None:
852 rcv.handler = handler
853 _apply_link_options(options, rcv)
854 rcv.open()
855 return rcv
856
858 if not _get_attr(context, '_txn_ctrl'):
859 class InternalTransactionHandler(OutgoingMessageHandler):
860 def __init__(self):
861 super(InternalTransactionHandler, self).__init__(auto_settle=True)
862
863 def on_settled(self, event):
864 if hasattr(event.delivery, "transaction"):
865 event.transaction = event.delivery.transaction
866 event.delivery.transaction.handle_outcome(event)
867 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
868 context._txn_ctrl.target.type = Terminus.COORDINATOR
869 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
870 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
871
872 - def listen(self, url, ssl_domain=None):
873 """
874 Initiates a server socket, accepting incoming AMQP connections
875 on the interface and port specified.
876 """
877 url = Url(url)
878 acceptor = self.acceptor(url.host, url.port)
879 ssl_config = ssl_domain
880 if not ssl_config and url.scheme == 'amqps':
881
882 if self.ssl:
883 ssl_config = self.ssl.server
884 else:
885 raise SSLUnavailable("amqps: SSL libraries not found")
886 if ssl_config:
887 acceptor.set_ssl_domain(ssl_config)
888 return acceptor
889
894