001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.connector.work;
019    
020    import java.util.concurrent.Executor;
021    
022    import javax.resource.spi.work.ExecutionContext;
023    import javax.resource.spi.work.Work;
024    import javax.resource.spi.work.WorkCompletedException;
025    import javax.resource.spi.work.WorkException;
026    import javax.resource.spi.work.WorkListener;
027    import javax.resource.spi.work.WorkManager;
028    
029    import org.apache.geronimo.connector.work.pool.ScheduleWorkExecutor;
030    import org.apache.geronimo.connector.work.pool.StartWorkExecutor;
031    import org.apache.geronimo.connector.work.pool.SyncWorkExecutor;
032    import org.apache.geronimo.connector.work.pool.WorkExecutor;
033    import org.apache.geronimo.transaction.manager.XAWork;
034    
035    /**
036     * WorkManager implementation which uses under the cover three WorkExecutorPool
037     * - one for each synchronization policy - in order to dispatch the submitted
038     * Work instances.
039     * <P>
040     * A WorkManager is a component of the JCA specifications, which allows a
041     * Resource Adapter to submit tasks to an Application Server for execution.
042     *
043     * @version $Rev: 550546 $ $Date: 2007-06-25 18:52:11 +0200 (Mon, 25 Jun 2007) $
044     */
045    public class GeronimoWorkManager implements WorkManager {
046    
047    //    private final static int DEFAULT_POOL_SIZE = 10;
048    
049        /**
050         * Pool of threads used by this WorkManager in order to process
051         * the Work instances submitted via the doWork methods.
052         */
053        private Executor syncWorkExecutorPool;
054    
055        /**
056         * Pool of threads used by this WorkManager in order to process
057         * the Work instances submitted via the startWork methods.
058         */
059        private Executor startWorkExecutorPool;
060    
061        /**
062         * Pool of threads used by this WorkManager in order to process
063         * the Work instances submitted via the scheduleWork methods.
064         */
065        private Executor scheduledWorkExecutorPool;
066    
067        private final XAWork transactionManager;
068    
069        private final WorkExecutor scheduleWorkExecutor = new ScheduleWorkExecutor();
070        private final WorkExecutor startWorkExecutor = new StartWorkExecutor();
071        private final WorkExecutor syncWorkExecutor = new SyncWorkExecutor();
072    
073        /**
074         * Create a WorkManager.
075         */
076        public GeronimoWorkManager() {
077            this(null, null, null, null);
078        }
079    
080        public GeronimoWorkManager(Executor sync, Executor start, Executor sched, XAWork xaWork) {
081            syncWorkExecutorPool = sync;
082            startWorkExecutorPool = start;
083            scheduledWorkExecutorPool = sched;
084            this.transactionManager = xaWork;
085        }
086    
087        public void doStart() throws Exception {
088        }
089    
090        public void doStop() throws Exception {
091        }
092    
093        public void doFail() {
094            try {
095                doStop();
096            } catch (Exception e) {
097                //TODO what to do?
098            }
099        }
100    
101        public Executor getSyncWorkExecutorPool() {
102            return syncWorkExecutorPool;
103        }
104    
105        public Executor getStartWorkExecutorPool() {
106            return startWorkExecutorPool;
107        }
108    
109        public Executor getScheduledWorkExecutorPool() {
110            return scheduledWorkExecutorPool;
111        }
112    
113        /* (non-Javadoc)
114        * @see javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work)
115        */
116        public void doWork(Work work) throws WorkException {
117            executeWork(new WorkerContext(work, transactionManager), syncWorkExecutor, syncWorkExecutorPool);
118        }
119    
120        /* (non-Javadoc)
121         * @see javax.resource.spi.work.WorkManager#doWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener)
122         */
123        public void doWork(
124                Work work,
125                long startTimeout,
126                ExecutionContext execContext,
127                WorkListener workListener)
128                throws WorkException {
129            WorkerContext workWrapper =
130                    new WorkerContext(work, startTimeout, execContext, transactionManager, workListener);
131            workWrapper.setThreadPriority(Thread.currentThread().getPriority());
132            executeWork(workWrapper, syncWorkExecutor, syncWorkExecutorPool);
133        }
134    
135        /* (non-Javadoc)
136         * @see javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work)
137         */
138        public long startWork(Work work) throws WorkException {
139            WorkerContext workWrapper = new WorkerContext(work, transactionManager);
140            workWrapper.setThreadPriority(Thread.currentThread().getPriority());
141            executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool);
142            return System.currentTimeMillis() - workWrapper.getAcceptedTime();
143        }
144    
145        /* (non-Javadoc)
146         * @see javax.resource.spi.work.WorkManager#startWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener)
147         */
148        public long startWork(
149                Work work,
150                long startTimeout,
151                ExecutionContext execContext,
152                WorkListener workListener)
153                throws WorkException {
154            WorkerContext workWrapper =
155                    new WorkerContext(work, startTimeout, execContext, transactionManager, workListener);
156            workWrapper.setThreadPriority(Thread.currentThread().getPriority());
157            executeWork(workWrapper, startWorkExecutor, startWorkExecutorPool);
158            return System.currentTimeMillis() - workWrapper.getAcceptedTime();
159        }
160    
161        /* (non-Javadoc)
162         * @see javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work)
163         */
164        public void scheduleWork(Work work) throws WorkException {
165            WorkerContext workWrapper = new WorkerContext(work, transactionManager);
166            workWrapper.setThreadPriority(Thread.currentThread().getPriority());
167            executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool);
168        }
169    
170        /* (non-Javadoc)
171         * @see javax.resource.spi.work.WorkManager#scheduleWork(javax.resource.spi.work.Work, long, javax.resource.spi.work.ExecutionContext, javax.resource.spi.work.WorkListener)
172         */
173        public void scheduleWork(
174                Work work,
175                long startTimeout,
176                ExecutionContext execContext,
177                WorkListener workListener)
178                throws WorkException {
179            WorkerContext workWrapper =
180                    new WorkerContext(work, startTimeout, execContext, transactionManager, workListener);
181            workWrapper.setThreadPriority(Thread.currentThread().getPriority());
182            executeWork(workWrapper, scheduleWorkExecutor, scheduledWorkExecutorPool);
183        }
184    
185        /**
186         * Execute the specified Work.
187         *
188         * @param work Work to be executed.
189         *
190         * @exception WorkException Indicates that the Work execution has been
191         * unsuccessful.
192         */
193        private void executeWork(WorkerContext work, WorkExecutor workExecutor, Executor pooledExecutor) throws WorkException {
194            work.workAccepted(this);
195            try {
196                workExecutor.doExecute(work, pooledExecutor);
197                WorkException exception = work.getWorkException();
198                if (null != exception) {
199                    throw exception;
200                }
201            } catch (InterruptedException e) {
202                WorkCompletedException wcj = new WorkCompletedException(
203                        "The execution has been interrupted.", e);
204                wcj.setErrorCode(WorkException.INTERNAL);
205                throw wcj;
206            }
207        }
208    
209    }