1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, re, socket, time, types, weakref
20
21 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
22 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
23 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
24 from select import select
28 """
29 A utility for simpler and more intuitive handling of delivery
30 events related to outgoing i.e. sent messages.
31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle
34 self.delegate = delegate
35
41
55
57 """
58 Called when the sender link has credit and messages can
59 therefore be transferred.
60 """
61 if self.delegate != None:
62 dispatch(self.delegate, 'on_sendable', event)
63
65 """
66 Called when the remote peer accepts an outgoing message.
67 """
68 if self.delegate != None:
69 dispatch(self.delegate, 'on_accepted', event)
70
72 """
73 Called when the remote peer rejects an outgoing message.
74 """
75 if self.delegate != None:
76 dispatch(self.delegate, 'on_rejected', event)
77
79 """
80 Called when the remote peer releases an outgoing message. Note
81 that this may be in response to either the RELEASE or MODIFIED
82 state as defined by the AMQP specification.
83 """
84 if self.delegate != None:
85 dispatch(self.delegate, 'on_released', event)
86
88 """
89 Called when the remote peer has settled the outgoing
90 message. This is the point at which it shouod never be
91 retransmitted.
92 """
93 if self.delegate != None:
94 dispatch(self.delegate, 'on_settled', event)
95
101
102 -class Reject(ProtonException):
103 """
104 An exception that indicate a message should be rejected
105 """
106 pass
107
109 """
110 An exception that indicate a message should be rejected
111 """
112 pass
113
120
127
128 - def release(self, delivery, delivered=True):
129 """
130 Releases a received message, making it available at the source
131 for any (other) interested receiver. The ``delivered``
132 parameter indicates whether this should be considered a
133 delivery attempt (and the delivery count updated) or not.
134 """
135 if delivered:
136 self.settle(delivery, Delivery.MODIFIED)
137 else:
138 self.settle(delivery, Delivery.RELEASED)
139
140 - def settle(self, delivery, state=None):
144
146 """
147 A utility for simpler and more intuitive handling of delivery
148 events related to incoming i.e. received messages.
149 """
150
151 - def __init__(self, auto_accept=True, delegate=None):
152 self.delegate = delegate
153 self.auto_accept = auto_accept
154
178
180 """
181 Called when a message is received. The message itself can be
182 obtained as a property on the event. For the purpose of
183 refering to this message in further actions (e.g. if
184 explicitly accepting it, the ``delivery`` should be used, also
185 obtainable via a property on the event.
186 """
187 if self.delegate != None:
188 dispatch(self.delegate, 'on_message', event)
189
191 if self.delegate != None:
192 dispatch(self.delegate, 'on_settled', event)
193
195 """
196 A utility that exposes 'endpoint' events i.e. the open/close for
197 links, sessions and connections in a more intuitive manner. A
198 XXX_opened method will be called when both local and remote peers
199 have opened the link, session or connection. This can be used to
200 confirm a locally initiated action for example. A XXX_opening
201 method will be called when the remote peer has requested an open
202 that was not initiated locally. By default this will simply open
203 locally, which then triggers the XXX_opened call. The same applies
204 to close.
205 """
206
207 - def __init__(self, peer_close_is_error=False, delegate=None):
208 self.delegate = delegate
209 self.peer_close_is_error = peer_close_is_error
210
211 @classmethod
214
215 @classmethod
218
219 @classmethod
222
223 @classmethod
226
227 @classmethod
230
231 @classmethod
237
246
255
268
272
279
283
290
294
301
303 if self.delegate != None:
304 dispatch(self.delegate, 'on_connection_opened', event)
305
307 if self.delegate != None:
308 dispatch(self.delegate, 'on_session_opened', event)
309
311 if self.delegate != None:
312 dispatch(self.delegate, 'on_link_opened', event)
313
315 if self.delegate != None:
316 dispatch(self.delegate, 'on_connection_opening', event)
317
319 if self.delegate != None:
320 dispatch(self.delegate, 'on_session_opening', event)
321
323 if self.delegate != None:
324 dispatch(self.delegate, 'on_link_opening', event)
325
327 if self.delegate != None:
328 dispatch(self.delegate, 'on_connection_error', event)
329 else:
330 self.log_error(event.connection, "connection")
331
333 if self.delegate != None:
334 dispatch(self.delegate, 'on_session_error', event)
335 else:
336 self.log_error(event.session, "session")
337 event.connection.close()
338
340 if self.delegate != None:
341 dispatch(self.delegate, 'on_link_error', event)
342 else:
343 self.log_error(event.link, "link")
344 event.connection.close()
345
347 if self.delegate != None:
348 dispatch(self.delegate, 'on_connection_closed', event)
349
351 if self.delegate != None:
352 dispatch(self.delegate, 'on_session_closed', event)
353
355 if self.delegate != None:
356 dispatch(self.delegate, 'on_link_closed', event)
357
359 if self.delegate != None:
360 dispatch(self.delegate, 'on_connection_closing', event)
361 elif self.peer_close_is_error:
362 self.on_connection_error(event)
363
365 if self.delegate != None:
366 dispatch(self.delegate, 'on_session_closing', event)
367 elif self.peer_close_is_error:
368 self.on_session_error(event)
369
371 if self.delegate != None:
372 dispatch(self.delegate, 'on_link_closing', event)
373 elif self.peer_close_is_error:
374 self.on_link_error(event)
375
378
382
384 """
385 A general purpose handler that makes the proton-c events somewhat
386 simpler to deal with and/or avoids repetitive tasks for common use
387 cases.
388 """
389 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
397
413
419
426
433
435 """
436 Called when the event loop - the reactor - starts.
437 """
438 if hasattr(event.reactor, 'subclass'):
439 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
440 self.on_start(event)
441
443 """
444 Called when the event loop starts. (Just an alias for on_reactor_init)
445 """
446 pass
448 """
449 Called when the connection is closed.
450 """
451 pass
453 """
454 Called when the session is closed.
455 """
456 pass
458 """
459 Called when the link is closed.
460 """
461 pass
463 """
464 Called when the peer initiates the closing of the connection.
465 """
466 pass
468 """
469 Called when the peer initiates the closing of the session.
470 """
471 pass
473 """
474 Called when the peer initiates the closing of the link.
475 """
476 pass
478 """
479 Called when the socket is disconnected.
480 """
481 pass
482
484 """
485 Called when the sender link has credit and messages can
486 therefore be transferred.
487 """
488 pass
489
491 """
492 Called when the remote peer accepts an outgoing message.
493 """
494 pass
495
497 """
498 Called when the remote peer rejects an outgoing message.
499 """
500 pass
501
503 """
504 Called when the remote peer releases an outgoing message. Note
505 that this may be in response to either the RELEASE or MODIFIED
506 state as defined by the AMQP specification.
507 """
508 pass
509
511 """
512 Called when the remote peer has settled the outgoing
513 message. This is the point at which it shouod never be
514 retransmitted.
515 """
516 pass
518 """
519 Called when a message is received. The message itself can be
520 obtained as a property on the event. For the purpose of
521 refering to this message in further actions (e.g. if
522 explicitly accepting it, the ``delivery`` should be used, also
523 obtainable via a property on the event.
524 """
525 pass
526
528 """
529 The interface for transaction handlers, i.e. objects that want to
530 be notified of state changes related to a transaction.
531 """
534
537
540
543
546
548 """
549 An extension to the MessagingHandler for applications using
550 transactions.
551 """
552
553 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
555
556 - def accept(self, delivery, transaction=None):
561
562 from proton import WrappedHandler
563 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
566
568 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
569
571
573 WrappedHandler.__init__(self, pn_handshaker)
574
576
578 WrappedHandler.__init__(self, pn_iohandler)
579
581
583 self.selectables = []
584 self.delegate = IOHandler()
585
588
590 self.selectables.append(event.context)
591
594
596 sel = event.context
597 if sel.is_terminal:
598 self.selectables.remove(sel)
599 sel.release()
600
642