Project JXTA

net.jxta.endpoint
Class ThreadedMessenger

java.lang.Object
  extended by net.jxta.util.AbstractSimpleSelectable
      extended by net.jxta.endpoint.AbstractMessenger
          extended by net.jxta.endpoint.ThreadedMessenger
All Implemented Interfaces:
Runnable, Messenger, SimpleSelectable

public abstract class ThreadedMessenger
extends AbstractMessenger
implements Runnable

This is a messenger meant to be shared by multiple channels and automatically distribute the available bandwith among the channels. This one is implemented with a dedicated background thread.


Nested Class Summary
 
Nested classes/interfaces inherited from interface net.jxta.util.SimpleSelectable
SimpleSelectable.IdentityReference
 
Field Summary
 
Fields inherited from class net.jxta.endpoint.AbstractMessenger
DEFAULT_MTU, dstAddress
 
Fields inherited from class net.jxta.util.AbstractSimpleSelectable
identityReference
 
Fields inherited from interface net.jxta.endpoint.Messenger
ANYSTATE, BREAKING, BROKEN, CLOSED, CLOSING, CONNECTED, DISCONNECTED, DISCONNECTING, IDLE, RECONCLOSING, RECONNECTING, RECONSATURATED, RESOLCLOSING, RESOLPENDING, RESOLSATURATED, RESOLVED, RESOLVING, SATURATED, SENDING, SENDINGSATURATED, TERMINAL, UNRESOLVABLE, UNRESOLVED, UNRESOLVING, USABLE
 
Constructor Summary
ThreadedMessenger(PeerGroupID homeGroupID, EndpointAddress destination, EndpointAddress logicalDestination, int channelQueueSize)
          Create a new ThreadedMessenger.
 
Method Summary
 void close()
          Close this messenger after processing any pending messages.
protected abstract  void closeImpl()
          Close underlying connection.
protected abstract  boolean connectImpl()
          Make underlying connection.
 Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam)
          If applicable, returns another messenger that will send messages to the same destination address than this one, but with the specified default service and serviceParam, possibly rewriting addresses to ensure delivery through the specified redirection.
 EndpointAddress getLogicalDestinationAddress()
          Returns the logical destination of this messenger.
protected abstract  EndpointAddress getLogicalDestinationImpl()
          Obtain the logical destination address from the implementer (which likely gets it from the transport messenger).
 int getState()
          Returns the current state.
 void resolve()
          Force the messenger to start resolving if it is not resolved yet.
 void run()
          Runs the state machine until there's nothing left to do.
 void sendMessageB(Message msg, String service, String serviceParam)
          Simple sending: blocks until the message was accepted for sending or the messenger is not Messenger.USABLE; whichever occurs first.
protected abstract  void sendMessageBImpl(Message msg, String service, String param)
          Sends message through underlying connection.
 boolean sendMessageN(Message msg, String service, String serviceParam)
          Sends a message to the destination specified at construction.

In this case, this method is here out of principle but is not really expected to be invoked.

protected  void shutdown()
          The endpoint service may call this to cause an orderly closure of its messengers.
 
Methods inherited from class net.jxta.endpoint.AbstractMessenger
flush, getDestinationAddress, getDestinationAddressObject, getMTU, isClosed, isIdle, isSynchronous, itemChanged, sendMessage, sendMessage, sendMessage, setStateLock, waitState
 
Methods inherited from class net.jxta.util.AbstractSimpleSelectable
getIdentityReference, haveListeners, notifyChange, register, registerListener, unregister, unregisterListener
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface net.jxta.util.SimpleSelectable
getIdentityReference, register, unregister
 

Constructor Detail

ThreadedMessenger

public ThreadedMessenger(PeerGroupID homeGroupID,
                         EndpointAddress destination,
                         EndpointAddress logicalDestination,
                         int channelQueueSize)
Create a new ThreadedMessenger.

Parameters:
homeGroupID - the group that this messenger works for. This is the group of the endpoint service or transport that created this messenger.
destination - where messages should be addressed to
logicalDestination - the expected logical address of the destination. Pass null if unknown/irrelevant
channelQueueSize - the queue size that channels should have.
Method Detail

run

public void run()
Runs the state machine until there's nothing left to do. Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both cause actions, and since connectAction and startAction are deferred, it seems possible that one of the actions caused by send, close, or shutdown be called while connectAction or startAction are in progress. However, the state machine gives us a few guarantees: All the actions except closeInput and closeOutput have an *end* event. No state transition that results in an action other than closeInput or closeOutput, may occur until the end event for an on-going action has been called. We perform closeInput and closeOutput on the fly, so none of the exposed methods are capable of producing deferred actions while an action is already deferred. So, there is at most one deferred action after returning from an event method, regardless the number of concurrent threads invoking the exposed methods, and it can only happen once per deferred action performed.

Specified by:
run in interface Runnable

shutdown

protected final void shutdown()
The endpoint service may call this to cause an orderly closure of its messengers.


getLogicalDestinationAddress

public EndpointAddress getLogicalDestinationAddress()
Returns the logical destination of this messenger. This may be a different address than is returned by getDestinationAddress and refers to the entity which is located at the destination address.

By analogy, a telephone number would be the destination address, and the owner of that telephone number would be the logical destination. Each logical destination may be known by one or more destination addresses.

Specified by:
getLogicalDestinationAddress in interface Messenger
Returns:
EndpointAddress the logical destination address of this messenger.
See Also:
Messenger.getDestinationAddress()

close

public void close()
Close this messenger after processing any pending messages. This method is not blocking. Upon return, the messenger will be in one of the non Messenger.USABLE states, which means that no message may be sent through it. Any other effect of this method, such as an underlying connection being closed, or all pending messages being processed, may be deferred indefinitely. When the messenger has completely processed the closure request, it will be in one of the Messenger.TERMINAL states (which are also Messenger.IDLE states). Therefore, if one is interrested in the outcome of the closure, one may wait for the messenger to be in a Messenger.TERMINAL or Messenger.IDLE state, and check which it is. Messenger.CLOSED denotes success (all outstanding messages have been sent), as opposed to Messenger.UNRESOLVABLE or Messenger.BROKEN.

Specified by:
close in interface Messenger

sendMessageN

public final boolean sendMessageN(Message msg,
                                  String service,
                                  String serviceParam)
Sends a message to the destination specified at construction. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

This method is identical to Messenger.sendMessage(Message, String, String), except that it does not throw an exception. The invoker has to retrieve a detailed status from the message if needed.

Error Handling:

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to Messenger.sendMessageN(net.jxta.endpoint.Message, java.lang.String, java.lang.String):

     messenger.sendMessageN( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

In this case, this method is here out of principle but is not really expected to be invoked. The normal way of using a ThreadedMessenger is through its channels. We do provide a default channel that all invokers that go around channels will share. That could be usefull to send rare out of band messages for example.

Specified by:
sendMessageN in interface Messenger
Parameters:
msg - The message to send.
service - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
serviceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
Returns:
boolean true if the message has been accepted for sending, otherwise false.

sendMessageB

public final void sendMessageB(Message msg,
                               String service,
                               String serviceParam)
                        throws IOException
Description copied from interface: Messenger
Simple sending: blocks until the message was accepted for sending or the messenger is not Messenger.USABLE; whichever occurs first. If a service name and service param are specified, they will replace those specified at construction for the purpose of sending this message only.

Error Handling:

WARNING: The Message object should not be reused or modified until completely processed. Concurrent modification of a message while a messenger is sending the message will produce incorrect and unpredictable results. If completion is not monitored, the message should never be reused. If necessary, a clone of the message may be provided to Messenger.sendMessageB(net.jxta.endpoint.Message, java.lang.String, java.lang.String):

     messenger.sendMessageB( (Message) myMessage.clone(), theService, theParam );
 

There is no guarantee that a message successfully sent will actually be received.

Specified by:
sendMessageB in interface Messenger
service - Optionally replaces the service in the destination address. If null then the destination address's default service will be used. If the empty string ("") is used then no service is included in the destination address.
serviceParam - Optionally replaces the service param in the destination address. If null then the destination address's default service parameter will be used. If the empty string ("") is used then no service param is included in the destination address.
Throws:
IOException - Thrown if the message cannot be sent.

resolve

public final void resolve()
Description copied from interface: Messenger
Force the messenger to start resolving if it is not resolved yet. Any attempt at sending a message has the same effect, but the message may fail as a result, depending upon the method used.

Specified by:
resolve in interface Messenger

getState

public final int getState()
Description copied from interface: Messenger
Returns the current state.

Specified by:
getState in interface Messenger
Returns:
one of the legal descreet state values.

getChannelMessenger

public Messenger getChannelMessenger(PeerGroupID redirection,
                                     String service,
                                     String serviceParam)
Description copied from interface: Messenger
If applicable, returns another messenger that will send messages to the same destination address than this one, but with the specified default service and serviceParam, possibly rewriting addresses to ensure delivery through the specified redirection. This is not generaly usefull to applications and most messengers will return null. This method is needed by the EndpointService when interacting with Messengers provided by Transport modules. If you are not implementing a Transport module, then you can ignore this method. Important: The channel so obtained is not configured to support the Messenger.sendMessage(Message, String, String, OutgoingMessageEventListener) legacy method. If use of this method is desired, ChannelMessenger.setMessageWatcher(net.jxta.endpoint.ListenerAdaptor) must be used first.

Specified by:
getChannelMessenger in interface Messenger
Parameters:
redirection - The requested redirection. The resulting channel messenger will use this to force delivery of the message only in the specified group (or possibly decendants, but not parents). If null the local group is assumed. This redirection is applied only to messages that are sent to a service name and service param that do not imply a group redirection.
service - The service to which the resulting channel will send messages, when they are not sent to a specified service.
serviceParam - The service parameter that the resulting channel will use to send messages, when no parameter is specified.
Returns:
a channelMessenger as specified.
See Also:
MessageSender.getMessenger(EndpointAddress, Object)

closeImpl

protected abstract void closeImpl()
Close underlying connection. May fail current send.


connectImpl

protected abstract boolean connectImpl()
Make underlying connection.


sendMessageBImpl

protected abstract void sendMessageBImpl(Message msg,
                                         String service,
                                         String param)
                                  throws IOException
Sends message through underlying connection.

Throws:
IOException

getLogicalDestinationImpl

protected abstract EndpointAddress getLogicalDestinationImpl()
Obtain the logical destination address from the implementer (which likely gets it from the transport messenger). Might not work if unresolved, so use with care.


JXTA J2SE