Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, re, socket, time, types, weakref 
 20   
 21  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 22  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 23  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 24  from select import select 
25 26 27 -class OutgoingMessageHandler(Handler):
28 """ 29 A utility for simpler and more intuitive handling of delivery 30 events related to outgoing i.e. sent messages. 31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle 34 self.delegate = delegate
35 41
42 - def on_delivery(self, event):
43 dlv = event.delivery 44 if dlv.link.is_sender and dlv.updated: 45 if dlv.remote_state == Delivery.ACCEPTED: 46 self.on_accepted(event) 47 elif dlv.remote_state == Delivery.REJECTED: 48 self.on_rejected(event) 49 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 50 self.on_released(event) 51 if dlv.settled: 52 self.on_settled(event) 53 if self.auto_settle: 54 dlv.settle()
55
56 - def on_sendable(self, event):
57 """ 58 Called when the sender link has credit and messages can 59 therefore be transferred. 60 """ 61 if self.delegate != None: 62 dispatch(self.delegate, 'on_sendable', event)
63
64 - def on_accepted(self, event):
65 """ 66 Called when the remote peer accepts an outgoing message. 67 """ 68 if self.delegate != None: 69 dispatch(self.delegate, 'on_accepted', event)
70
71 - def on_rejected(self, event):
72 """ 73 Called when the remote peer rejects an outgoing message. 74 """ 75 if self.delegate != None: 76 dispatch(self.delegate, 'on_rejected', event)
77
78 - def on_released(self, event):
79 """ 80 Called when the remote peer releases an outgoing message. Note 81 that this may be in response to either the RELEASE or MODIFIED 82 state as defined by the AMQP specification. 83 """ 84 if self.delegate != None: 85 dispatch(self.delegate, 'on_released', event)
86
87 - def on_settled(self, event):
88 """ 89 Called when the remote peer has settled the outgoing 90 message. This is the point at which it shouod never be 91 retransmitted. 92 """ 93 if self.delegate != None: 94 dispatch(self.delegate, 'on_settled', event)
95
96 -def recv_msg(delivery):
97 msg = Message() 98 msg.decode(delivery.link.recv(delivery.pending)) 99 delivery.link.advance() 100 return msg
101
102 -class Reject(ProtonException):
103 """ 104 An exception that indicate a message should be rejected 105 """ 106 pass
107
108 -class Release(ProtonException):
109 """ 110 An exception that indicate a message should be rejected 111 """ 112 pass
113
114 -class Acking(object):
115 - def accept(self, delivery):
116 """ 117 Accepts a received message. 118 """ 119 self.settle(delivery, Delivery.ACCEPTED)
120
121 - def reject(self, delivery):
122 """ 123 Rejects a received message that is considered invalid or 124 unprocessable. 125 """ 126 self.settle(delivery, Delivery.REJECTED)
127
128 - def release(self, delivery, delivered=True):
129 """ 130 Releases a received message, making it available at the source 131 for any (other) interested receiver. The ``delivered`` 132 parameter indicates whether this should be considered a 133 delivery attempt (and the delivery count updated) or not. 134 """ 135 if delivered: 136 self.settle(delivery, Delivery.MODIFIED) 137 else: 138 self.settle(delivery, Delivery.RELEASED)
139
140 - def settle(self, delivery, state=None):
144
145 -class IncomingMessageHandler(Handler, Acking):
146 """ 147 A utility for simpler and more intuitive handling of delivery 148 events related to incoming i.e. received messages. 149 """ 150
151 - def __init__(self, auto_accept=True, delegate=None):
152 self.delegate = delegate 153 self.auto_accept = auto_accept
154
155 - def on_delivery(self, event):
156 dlv = event.delivery 157 if not dlv.link.is_receiver: return 158 if dlv.readable and not dlv.partial: 159 event.message = recv_msg(dlv) 160 if event.link.state & Endpoint.LOCAL_CLOSED: 161 if self.auto_accept: 162 dlv.update(Delivery.RELEASED) 163 dlv.settle() 164 else: 165 try: 166 self.on_message(event) 167 if self.auto_accept: 168 dlv.update(Delivery.ACCEPTED) 169 dlv.settle() 170 except Reject: 171 dlv.update(Delivery.REJECTED) 172 dlv.settle() 173 except Release: 174 dlv.update(Delivery.MODIFIED) 175 dlv.settle() 176 elif dlv.updated and dlv.settled: 177 self.on_settled(event)
178
179 - def on_message(self, event):
180 """ 181 Called when a message is received. The message itself can be 182 obtained as a property on the event. For the purpose of 183 refering to this message in further actions (e.g. if 184 explicitly accepting it, the ``delivery`` should be used, also 185 obtainable via a property on the event. 186 """ 187 if self.delegate != None: 188 dispatch(self.delegate, 'on_message', event)
189
190 - def on_settled(self, event):
191 if self.delegate != None: 192 dispatch(self.delegate, 'on_settled', event)
193
194 -class EndpointStateHandler(Handler):
195 """ 196 A utility that exposes 'endpoint' events i.e. the open/close for 197 links, sessions and connections in a more intuitive manner. A 198 XXX_opened method will be called when both local and remote peers 199 have opened the link, session or connection. This can be used to 200 confirm a locally initiated action for example. A XXX_opening 201 method will be called when the remote peer has requested an open 202 that was not initiated locally. By default this will simply open 203 locally, which then triggers the XXX_opened call. The same applies 204 to close. 205 """ 206
207 - def __init__(self, peer_close_is_error=False, delegate=None):
208 self.delegate = delegate 209 self.peer_close_is_error = peer_close_is_error
210 211 @classmethod
212 - def is_local_open(cls, endpoint):
213 return endpoint.state & Endpoint.LOCAL_ACTIVE
214 215 @classmethod
216 - def is_local_uninitialised(cls, endpoint):
217 return endpoint.state & Endpoint.LOCAL_UNINIT
218 219 @classmethod
220 - def is_local_closed(cls, endpoint):
221 return endpoint.state & Endpoint.LOCAL_CLOSED
222 223 @classmethod
224 - def is_remote_open(cls, endpoint):
225 return endpoint.state & Endpoint.REMOTE_ACTIVE
226 227 @classmethod
228 - def is_remote_closed(cls, endpoint):
229 return endpoint.state & Endpoint.REMOTE_CLOSED
230 231 @classmethod
232 - def print_error(cls, endpoint, endpoint_type):
233 if endpoint.remote_condition: 234 logging.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 235 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 236 logging.error("%s closed by peer" % endpoint_type)
237 246
247 - def on_session_remote_close(self, event):
248 if event.session.remote_condition: 249 self.on_session_error(event) 250 elif self.is_local_closed(event.session): 251 self.on_session_closed(event) 252 else: 253 self.on_session_closing(event) 254 event.session.close()
255
256 - def on_connection_remote_close(self, event):
257 if event.connection.remote_condition: 258 if event.connection.remote_condition.name == "amqp:connection:forced": 259 # Treat this the same as just having the transport closed by the peer without 260 # sending any events. Allow reconnection to happen transparently. 261 return 262 self.on_connection_error(event) 263 elif self.is_local_closed(event.connection): 264 self.on_connection_closed(event) 265 else: 266 self.on_connection_closing(event) 267 event.connection.close()
268
269 - def on_connection_local_open(self, event):
270 if self.is_remote_open(event.connection): 271 self.on_connection_opened(event)
272
273 - def on_connection_remote_open(self, event):
274 if self.is_local_open(event.connection): 275 self.on_connection_opened(event) 276 elif self.is_local_uninitialised(event.connection): 277 self.on_connection_opening(event) 278 event.connection.open()
279
280 - def on_session_local_open(self, event):
281 if self.is_remote_open(event.session): 282 self.on_session_opened(event)
283
284 - def on_session_remote_open(self, event):
285 if self.is_local_open(event.session): 286 self.on_session_opened(event) 287 elif self.is_local_uninitialised(event.session): 288 self.on_session_opening(event) 289 event.session.open()
290 294 301
302 - def on_connection_opened(self, event):
303 if self.delegate != None: 304 dispatch(self.delegate, 'on_connection_opened', event)
305
306 - def on_session_opened(self, event):
307 if self.delegate != None: 308 dispatch(self.delegate, 'on_session_opened', event)
309 313
314 - def on_connection_opening(self, event):
315 if self.delegate != None: 316 dispatch(self.delegate, 'on_connection_opening', event)
317
318 - def on_session_opening(self, event):
319 if self.delegate != None: 320 dispatch(self.delegate, 'on_session_opening', event)
321 325
326 - def on_connection_error(self, event):
327 if self.delegate != None: 328 dispatch(self.delegate, 'on_connection_error', event) 329 else: 330 self.log_error(event.connection, "connection")
331
332 - def on_session_error(self, event):
333 if self.delegate != None: 334 dispatch(self.delegate, 'on_session_error', event) 335 else: 336 self.log_error(event.session, "session") 337 event.connection.close()
338 345
346 - def on_connection_closed(self, event):
347 if self.delegate != None: 348 dispatch(self.delegate, 'on_connection_closed', event)
349
350 - def on_session_closed(self, event):
351 if self.delegate != None: 352 dispatch(self.delegate, 'on_session_closed', event)
353 357
358 - def on_connection_closing(self, event):
359 if self.delegate != None: 360 dispatch(self.delegate, 'on_connection_closing', event) 361 elif self.peer_close_is_error: 362 self.on_connection_error(event)
363
364 - def on_session_closing(self, event):
365 if self.delegate != None: 366 dispatch(self.delegate, 'on_session_closing', event) 367 elif self.peer_close_is_error: 368 self.on_session_error(event)
369 375
376 - def on_transport_tail_closed(self, event):
377 self.on_transport_closed(event)
378
379 - def on_transport_closed(self, event):
380 if self.delegate != None and event.connection and self.is_local_open(event.connection): 381 dispatch(self.delegate, 'on_disconnected', event)
382
383 -class MessagingHandler(Handler, Acking):
384 """ 385 A general purpose handler that makes the proton-c events somewhat 386 simpler to deal with and/or avoids repetitive tasks for common use 387 cases. 388 """
389 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
390 self.handlers = [] 391 if prefetch: 392 self.handlers.append(CFlowController(prefetch)) 393 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 394 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 395 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 396 self.fatal_conditions = ["amqp:unauthorized-access"]
397
398 - def on_transport_error(self, event):
399 """ 400 Called when some error is encountered with the transport over 401 which the AMQP connection is to be established. This includes 402 authentication errors as well as socket errors. 403 """ 404 if event.transport.condition: 405 if event.transport.condition.info: 406 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) 407 else: 408 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 409 if event.transport.condition.name in self.fatal_conditions: 410 event.connection.close() 411 else: 412 logging.error("Unspecified transport error")
413
414 - def on_connection_error(self, event):
415 """ 416 Called when the peer closes the connection with an error condition. 417 """ 418 EndpointStateHandler.print_error(event.connection, "connection")
419
420 - def on_session_error(self, event):
421 """ 422 Called when the peer closes the session with an error condition. 423 """ 424 EndpointStateHandler.print_error(event.session, "session") 425 event.connection.close()
426 433
434 - def on_reactor_init(self, event):
435 """ 436 Called when the event loop - the reactor - starts. 437 """ 438 if hasattr(event.reactor, 'subclass'): 439 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 440 self.on_start(event)
441
442 - def on_start(self, event):
443 """ 444 Called when the event loop starts. (Just an alias for on_reactor_init) 445 """ 446 pass
447 - def on_connection_closed(self, event):
448 """ 449 Called when the connection is closed. 450 """ 451 pass
452 - def on_session_closed(self, event):
453 """ 454 Called when the session is closed. 455 """ 456 pass
462 - def on_connection_closing(self, event):
463 """ 464 Called when the peer initiates the closing of the connection. 465 """ 466 pass
467 - def on_session_closing(self, event):
468 """ 469 Called when the peer initiates the closing of the session. 470 """ 471 pass
477 - def on_disconnected(self, event):
478 """ 479 Called when the socket is disconnected. 480 """ 481 pass
482
483 - def on_sendable(self, event):
484 """ 485 Called when the sender link has credit and messages can 486 therefore be transferred. 487 """ 488 pass
489
490 - def on_accepted(self, event):
491 """ 492 Called when the remote peer accepts an outgoing message. 493 """ 494 pass
495
496 - def on_rejected(self, event):
497 """ 498 Called when the remote peer rejects an outgoing message. 499 """ 500 pass
501
502 - def on_released(self, event):
503 """ 504 Called when the remote peer releases an outgoing message. Note 505 that this may be in response to either the RELEASE or MODIFIED 506 state as defined by the AMQP specification. 507 """ 508 pass
509
510 - def on_settled(self, event):
511 """ 512 Called when the remote peer has settled the outgoing 513 message. This is the point at which it shouod never be 514 retransmitted. 515 """ 516 pass
517 - def on_message(self, event):
518 """ 519 Called when a message is received. The message itself can be 520 obtained as a property on the event. For the purpose of 521 refering to this message in further actions (e.g. if 522 explicitly accepting it, the ``delivery`` should be used, also 523 obtainable via a property on the event. 524 """ 525 pass
526
527 -class TransactionHandler(object):
528 """ 529 The interface for transaction handlers, i.e. objects that want to 530 be notified of state changes related to a transaction. 531 """
532 - def on_transaction_declared(self, event):
533 pass
534
535 - def on_transaction_committed(self, event):
536 pass
537
538 - def on_transaction_aborted(self, event):
539 pass
540
541 - def on_transaction_declare_failed(self, event):
542 pass
543
544 - def on_transaction_commit_failed(self, event):
545 pass
546
547 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
548 """ 549 An extension to the MessagingHandler for applications using 550 transactions. 551 """ 552
553 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
554 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
555
556 - def accept(self, delivery, transaction=None):
557 if transaction: 558 transaction.accept(delivery) 559 else: 560 super(TransactionalClientHandler, self).accept(delivery)
561 562 from proton import WrappedHandler 563 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
564 565 -class CFlowController(WrappedHandler):
566
567 - def __init__(self, window=1024):
568 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
569
570 -class CHandshaker(WrappedHandler):
571
572 - def __init__(self):
573 WrappedHandler.__init__(self, pn_handshaker)
574
575 -class IOHandler(WrappedHandler):
576
577 - def __init__(self):
578 WrappedHandler.__init__(self, pn_iohandler)
579
580 -class PythonIO:
581
582 - def __init__(self):
583 self.selectables = [] 584 self.delegate = IOHandler()
585
586 - def on_unhandled(self, method, event):
587 event.dispatch(self.delegate)
588
589 - def on_selectable_init(self, event):
590 self.selectables.append(event.context)
591
592 - def on_selectable_updated(self, event):
593 pass
594
595 - def on_selectable_final(self, event):
596 sel = event.context 597 if sel.is_terminal: 598 self.selectables.remove(sel) 599 sel.release()
600
601 - def on_reactor_quiesced(self, event):
602 reactor = event.reactor 603 # check if we are still quiesced, other handlers of 604 # on_reactor_quiesced could have produced events to process 605 if not reactor.quiesced: return 606 607 reading = [] 608 writing = [] 609 deadline = None 610 for sel in self.selectables: 611 if sel.reading: 612 reading.append(sel) 613 if sel.writing: 614 writing.append(sel) 615 if sel.deadline: 616 if deadline is None: 617 deadline = sel.deadline 618 else: 619 deadline = min(sel.deadline, deadline) 620 621 if deadline is not None: 622 timeout = deadline - time.time() 623 else: 624 timeout = reactor.timeout 625 if (timeout < 0): timeout = 0 626 timeout = min(timeout, reactor.timeout) 627 readable, writable, _ = select(reading, writing, [], timeout) 628 629 reactor.mark() 630 631 now = time.time() 632 633 for s in readable: 634 s.readable() 635 for s in writable: 636 s.writable() 637 for s in self.selectables: 638 if s.deadline and now > s.deadline: 639 s.expired() 640 641 reactor.yield_()
642