001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.safehaus.asyncweb.service.pipeline;
021:
022: import java.util.Iterator;
023: import java.util.LinkedHashMap;
024: import java.util.Map;
025:
026: import org.safehaus.asyncweb.common.HttpResponse;
027: import org.safehaus.asyncweb.service.HttpServiceContext;
028: import org.slf4j.Logger;
029: import org.slf4j.LoggerFactory;
030:
031: public class StandardRequestPipeline implements RequestPipeline {
032:
033: private static final Logger LOG = LoggerFactory
034: .getLogger(StandardRequestPipeline.class);
035:
036: private int maxPipelinedRequests;
037: private RequestPipelineListener listener;
038: private Runnable emptyCommand;
039: private Map<HttpServiceContext, HttpResponse> entryMap = new LinkedHashMap<HttpServiceContext, HttpResponse>();
040:
041: public StandardRequestPipeline(int maxPipelinedRequests) {
042: this .maxPipelinedRequests = maxPipelinedRequests;
043: }
044:
045: public boolean addRequest(HttpServiceContext context) {
046: boolean added = false;
047: synchronized (entryMap) {
048: if (entryMap.size() < maxPipelinedRequests) {
049: entryMap.put(context, null);
050: added = true;
051: }
052: }
053: if (added && LOG.isDebugEnabled()) {
054: LOG.debug("Request added to pipeline ok");
055: }
056: return added;
057: }
058:
059: public void releaseResponse(HttpServiceContext context) {
060: if (context.getCommittedResponse() == null) {
061: throw new IllegalStateException(
062: "response is not committed.");
063: }
064: synchronized (entryMap) {
065: entryMap.put(context, context.getCommittedResponse());
066: releaseRequests();
067: }
068: }
069:
070: public void disposeAll() {
071: synchronized (entryMap) {
072: entryMap.clear();
073: }
074: }
075:
076: public void runWhenEmpty(Runnable command) {
077: synchronized (entryMap) {
078: if (entryMap.isEmpty()) {
079: command.run();
080: } else {
081: emptyCommand = command;
082: }
083: }
084: }
085:
086: /**
087: * Sets the pipeline listener associated with this pipeline
088: *
089: * @param listener The listener
090: */
091: public void setPipelineListener(RequestPipelineListener listener) {
092: this .listener = listener;
093: }
094:
095: /**
096: * Releases any requests which can be freed as a result of a request
097: * being freed.
098: * We simply iterate through the list (in insertion order) - freeing
099: * all responses until we arive at one which has not yet been completed
100: */
101: private void releaseRequests() {
102: for (Iterator<Map.Entry<HttpServiceContext, HttpResponse>> iter = entryMap
103: .entrySet().iterator(); iter.hasNext();) {
104: Map.Entry<HttpServiceContext, HttpResponse> entry = iter
105: .next();
106: HttpResponse response = entry.getValue();
107: if (response != null) {
108: if (LOG.isDebugEnabled()) {
109: LOG
110: .debug("Response freed from pipeline. Notifying");
111: }
112: listener.responseReleased(entry.getKey());
113: iter.remove();
114: } else {
115: break;
116: }
117: }
118: if (emptyCommand != null && entryMap.isEmpty()) {
119: emptyCommand.run();
120: }
121: }
122:
123: }
|