xref: /unit/src/java/nginx/unit/websocket/WsFrameClient.java (revision 1157:7ae152bda303)
1*1157Smax.romanov@nginx.com /*
2*1157Smax.romanov@nginx.com  * Licensed to the Apache Software Foundation (ASF) under one or more
3*1157Smax.romanov@nginx.com  * contributor license agreements.  See the NOTICE file distributed with
4*1157Smax.romanov@nginx.com  * this work for additional information regarding copyright ownership.
5*1157Smax.romanov@nginx.com  * The ASF licenses this file to You under the Apache License, Version 2.0
6*1157Smax.romanov@nginx.com  * (the "License"); you may not use this file except in compliance with
7*1157Smax.romanov@nginx.com  * the License.  You may obtain a copy of the License at
8*1157Smax.romanov@nginx.com  *
9*1157Smax.romanov@nginx.com  *     http://www.apache.org/licenses/LICENSE-2.0
10*1157Smax.romanov@nginx.com  *
11*1157Smax.romanov@nginx.com  * Unless required by applicable law or agreed to in writing, software
12*1157Smax.romanov@nginx.com  * distributed under the License is distributed on an "AS IS" BASIS,
13*1157Smax.romanov@nginx.com  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*1157Smax.romanov@nginx.com  * See the License for the specific language governing permissions and
15*1157Smax.romanov@nginx.com  * limitations under the License.
16*1157Smax.romanov@nginx.com  */
17*1157Smax.romanov@nginx.com package nginx.unit.websocket;
18*1157Smax.romanov@nginx.com 
19*1157Smax.romanov@nginx.com import java.io.EOFException;
20*1157Smax.romanov@nginx.com import java.io.IOException;
21*1157Smax.romanov@nginx.com import java.nio.ByteBuffer;
22*1157Smax.romanov@nginx.com import java.nio.channels.CompletionHandler;
23*1157Smax.romanov@nginx.com 
24*1157Smax.romanov@nginx.com import javax.websocket.CloseReason;
25*1157Smax.romanov@nginx.com import javax.websocket.CloseReason.CloseCodes;
26*1157Smax.romanov@nginx.com 
27*1157Smax.romanov@nginx.com import org.apache.juli.logging.Log;
28*1157Smax.romanov@nginx.com import org.apache.juli.logging.LogFactory;
29*1157Smax.romanov@nginx.com import org.apache.tomcat.util.res.StringManager;
30*1157Smax.romanov@nginx.com 
31*1157Smax.romanov@nginx.com public class WsFrameClient extends WsFrameBase {
32*1157Smax.romanov@nginx.com 
33*1157Smax.romanov@nginx.com     private final Log log = LogFactory.getLog(WsFrameClient.class); // must not be static
34*1157Smax.romanov@nginx.com     private static final StringManager sm = StringManager.getManager(WsFrameClient.class);
35*1157Smax.romanov@nginx.com 
36*1157Smax.romanov@nginx.com     private final AsyncChannelWrapper channel;
37*1157Smax.romanov@nginx.com     private final CompletionHandler<Integer, Void> handler;
38*1157Smax.romanov@nginx.com     // Not final as it may need to be re-sized
39*1157Smax.romanov@nginx.com     private volatile ByteBuffer response;
40*1157Smax.romanov@nginx.com 
WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, Transformation transformation)41*1157Smax.romanov@nginx.com     public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession,
42*1157Smax.romanov@nginx.com             Transformation transformation) {
43*1157Smax.romanov@nginx.com         super(wsSession, transformation);
44*1157Smax.romanov@nginx.com         this.response = response;
45*1157Smax.romanov@nginx.com         this.channel = channel;
46*1157Smax.romanov@nginx.com         this.handler = new WsFrameClientCompletionHandler();
47*1157Smax.romanov@nginx.com     }
48*1157Smax.romanov@nginx.com 
49*1157Smax.romanov@nginx.com 
startInputProcessing()50*1157Smax.romanov@nginx.com     void startInputProcessing() {
51*1157Smax.romanov@nginx.com         try {
52*1157Smax.romanov@nginx.com             processSocketRead();
53*1157Smax.romanov@nginx.com         } catch (IOException e) {
54*1157Smax.romanov@nginx.com             close(e);
55*1157Smax.romanov@nginx.com         }
56*1157Smax.romanov@nginx.com     }
57*1157Smax.romanov@nginx.com 
58*1157Smax.romanov@nginx.com 
processSocketRead()59*1157Smax.romanov@nginx.com     private void processSocketRead() throws IOException {
60*1157Smax.romanov@nginx.com         while (true) {
61*1157Smax.romanov@nginx.com             switch (getReadState()) {
62*1157Smax.romanov@nginx.com             case WAITING:
63*1157Smax.romanov@nginx.com                 if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) {
64*1157Smax.romanov@nginx.com                     continue;
65*1157Smax.romanov@nginx.com                 }
66*1157Smax.romanov@nginx.com                 while (response.hasRemaining()) {
67*1157Smax.romanov@nginx.com                     if (isSuspended()) {
68*1157Smax.romanov@nginx.com                         if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
69*1157Smax.romanov@nginx.com                             continue;
70*1157Smax.romanov@nginx.com                         }
71*1157Smax.romanov@nginx.com                         // There is still data available in the response buffer
72*1157Smax.romanov@nginx.com                         // Return here so that the response buffer will not be
73*1157Smax.romanov@nginx.com                         // cleared and there will be no data read from the
74*1157Smax.romanov@nginx.com                         // socket. Thus when the read operation is resumed first
75*1157Smax.romanov@nginx.com                         // the data left in the response buffer will be consumed
76*1157Smax.romanov@nginx.com                         // and then a new socket read will be performed
77*1157Smax.romanov@nginx.com                         return;
78*1157Smax.romanov@nginx.com                     }
79*1157Smax.romanov@nginx.com                     inputBuffer.mark();
80*1157Smax.romanov@nginx.com                     inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());
81*1157Smax.romanov@nginx.com 
82*1157Smax.romanov@nginx.com                     int toCopy = Math.min(response.remaining(), inputBuffer.remaining());
83*1157Smax.romanov@nginx.com 
84*1157Smax.romanov@nginx.com                     // Copy remaining bytes read in HTTP phase to input buffer used by
85*1157Smax.romanov@nginx.com                     // frame processing
86*1157Smax.romanov@nginx.com 
87*1157Smax.romanov@nginx.com                     int orgLimit = response.limit();
88*1157Smax.romanov@nginx.com                     response.limit(response.position() + toCopy);
89*1157Smax.romanov@nginx.com                     inputBuffer.put(response);
90*1157Smax.romanov@nginx.com                     response.limit(orgLimit);
91*1157Smax.romanov@nginx.com 
92*1157Smax.romanov@nginx.com                     inputBuffer.limit(inputBuffer.position()).reset();
93*1157Smax.romanov@nginx.com 
94*1157Smax.romanov@nginx.com                     // Process the data we have
95*1157Smax.romanov@nginx.com                     processInputBuffer();
96*1157Smax.romanov@nginx.com                 }
97*1157Smax.romanov@nginx.com                 response.clear();
98*1157Smax.romanov@nginx.com 
99*1157Smax.romanov@nginx.com                 // Get some more data
100*1157Smax.romanov@nginx.com                 if (isOpen()) {
101*1157Smax.romanov@nginx.com                     channel.read(response, null, handler);
102*1157Smax.romanov@nginx.com                 } else {
103*1157Smax.romanov@nginx.com                     changeReadState(ReadState.CLOSING);
104*1157Smax.romanov@nginx.com                 }
105*1157Smax.romanov@nginx.com                 return;
106*1157Smax.romanov@nginx.com             case SUSPENDING_WAIT:
107*1157Smax.romanov@nginx.com                 if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) {
108*1157Smax.romanov@nginx.com                     continue;
109*1157Smax.romanov@nginx.com                 }
110*1157Smax.romanov@nginx.com                 return;
111*1157Smax.romanov@nginx.com             default:
112*1157Smax.romanov@nginx.com                 throw new IllegalStateException(
113*1157Smax.romanov@nginx.com                         sm.getString("wsFrameServer.illegalReadState", getReadState()));
114*1157Smax.romanov@nginx.com             }
115*1157Smax.romanov@nginx.com         }
116*1157Smax.romanov@nginx.com     }
117*1157Smax.romanov@nginx.com 
118*1157Smax.romanov@nginx.com 
close(Throwable t)119*1157Smax.romanov@nginx.com     private final void close(Throwable t) {
120*1157Smax.romanov@nginx.com         changeReadState(ReadState.CLOSING);
121*1157Smax.romanov@nginx.com         CloseReason cr;
122*1157Smax.romanov@nginx.com         if (t instanceof WsIOException) {
123*1157Smax.romanov@nginx.com             cr = ((WsIOException) t).getCloseReason();
124*1157Smax.romanov@nginx.com         } else {
125*1157Smax.romanov@nginx.com             cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage());
126*1157Smax.romanov@nginx.com         }
127*1157Smax.romanov@nginx.com 
128*1157Smax.romanov@nginx.com         try {
129*1157Smax.romanov@nginx.com             wsSession.close(cr);
130*1157Smax.romanov@nginx.com         } catch (IOException ignore) {
131*1157Smax.romanov@nginx.com             // Ignore
132*1157Smax.romanov@nginx.com         }
133*1157Smax.romanov@nginx.com     }
134*1157Smax.romanov@nginx.com 
135*1157Smax.romanov@nginx.com 
136*1157Smax.romanov@nginx.com     @Override
isMasked()137*1157Smax.romanov@nginx.com     protected boolean isMasked() {
138*1157Smax.romanov@nginx.com         // Data is from the server so it is not masked
139*1157Smax.romanov@nginx.com         return false;
140*1157Smax.romanov@nginx.com     }
141*1157Smax.romanov@nginx.com 
142*1157Smax.romanov@nginx.com 
143*1157Smax.romanov@nginx.com     @Override
getLog()144*1157Smax.romanov@nginx.com     protected Log getLog() {
145*1157Smax.romanov@nginx.com         return log;
146*1157Smax.romanov@nginx.com     }
147*1157Smax.romanov@nginx.com 
148*1157Smax.romanov@nginx.com     private class WsFrameClientCompletionHandler implements CompletionHandler<Integer, Void> {
149*1157Smax.romanov@nginx.com 
150*1157Smax.romanov@nginx.com         @Override
completed(Integer result, Void attachment)151*1157Smax.romanov@nginx.com         public void completed(Integer result, Void attachment) {
152*1157Smax.romanov@nginx.com             if (result.intValue() == -1) {
153*1157Smax.romanov@nginx.com                 // BZ 57762. A dropped connection will get reported as EOF
154*1157Smax.romanov@nginx.com                 // rather than as an error so handle it here.
155*1157Smax.romanov@nginx.com                 if (isOpen()) {
156*1157Smax.romanov@nginx.com                     // No close frame was received
157*1157Smax.romanov@nginx.com                     close(new EOFException());
158*1157Smax.romanov@nginx.com                 }
159*1157Smax.romanov@nginx.com                 // No data to process
160*1157Smax.romanov@nginx.com                 return;
161*1157Smax.romanov@nginx.com             }
162*1157Smax.romanov@nginx.com             response.flip();
163*1157Smax.romanov@nginx.com             doResumeProcessing(true);
164*1157Smax.romanov@nginx.com         }
165*1157Smax.romanov@nginx.com 
166*1157Smax.romanov@nginx.com         @Override
failed(Throwable exc, Void attachment)167*1157Smax.romanov@nginx.com         public void failed(Throwable exc, Void attachment) {
168*1157Smax.romanov@nginx.com             if (exc instanceof ReadBufferOverflowException) {
169*1157Smax.romanov@nginx.com                 // response will be empty if this exception is thrown
170*1157Smax.romanov@nginx.com                 response = ByteBuffer
171*1157Smax.romanov@nginx.com                         .allocate(((ReadBufferOverflowException) exc).getMinBufferSize());
172*1157Smax.romanov@nginx.com                 response.flip();
173*1157Smax.romanov@nginx.com                 doResumeProcessing(false);
174*1157Smax.romanov@nginx.com             } else {
175*1157Smax.romanov@nginx.com                 close(exc);
176*1157Smax.romanov@nginx.com             }
177*1157Smax.romanov@nginx.com         }
178*1157Smax.romanov@nginx.com 
doResumeProcessing(boolean checkOpenOnError)179*1157Smax.romanov@nginx.com         private void doResumeProcessing(boolean checkOpenOnError) {
180*1157Smax.romanov@nginx.com             while (true) {
181*1157Smax.romanov@nginx.com                 switch (getReadState()) {
182*1157Smax.romanov@nginx.com                 case PROCESSING:
183*1157Smax.romanov@nginx.com                     if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) {
184*1157Smax.romanov@nginx.com                         continue;
185*1157Smax.romanov@nginx.com                     }
186*1157Smax.romanov@nginx.com                     resumeProcessing(checkOpenOnError);
187*1157Smax.romanov@nginx.com                     return;
188*1157Smax.romanov@nginx.com                 case SUSPENDING_PROCESS:
189*1157Smax.romanov@nginx.com                     if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
190*1157Smax.romanov@nginx.com                         continue;
191*1157Smax.romanov@nginx.com                     }
192*1157Smax.romanov@nginx.com                     return;
193*1157Smax.romanov@nginx.com                 default:
194*1157Smax.romanov@nginx.com                     throw new IllegalStateException(
195*1157Smax.romanov@nginx.com                             sm.getString("wsFrame.illegalReadState", getReadState()));
196*1157Smax.romanov@nginx.com                 }
197*1157Smax.romanov@nginx.com             }
198*1157Smax.romanov@nginx.com         }
199*1157Smax.romanov@nginx.com     }
200*1157Smax.romanov@nginx.com 
201*1157Smax.romanov@nginx.com 
202*1157Smax.romanov@nginx.com     @Override
resumeProcessing()203*1157Smax.romanov@nginx.com     protected void resumeProcessing() {
204*1157Smax.romanov@nginx.com         resumeProcessing(true);
205*1157Smax.romanov@nginx.com     }
206*1157Smax.romanov@nginx.com 
resumeProcessing(boolean checkOpenOnError)207*1157Smax.romanov@nginx.com     private void resumeProcessing(boolean checkOpenOnError) {
208*1157Smax.romanov@nginx.com         try {
209*1157Smax.romanov@nginx.com             processSocketRead();
210*1157Smax.romanov@nginx.com         } catch (IOException e) {
211*1157Smax.romanov@nginx.com             if (checkOpenOnError) {
212*1157Smax.romanov@nginx.com                 // Only send a close message on an IOException if the client
213*1157Smax.romanov@nginx.com                 // has not yet received a close control message from the server
214*1157Smax.romanov@nginx.com                 // as the IOException may be in response to the client
215*1157Smax.romanov@nginx.com                 // continuing to send a message after the server sent a close
216*1157Smax.romanov@nginx.com                 // control message.
217*1157Smax.romanov@nginx.com                 if (isOpen()) {
218*1157Smax.romanov@nginx.com                     if (log.isDebugEnabled()) {
219*1157Smax.romanov@nginx.com                         log.debug(sm.getString("wsFrameClient.ioe"), e);
220*1157Smax.romanov@nginx.com                     }
221*1157Smax.romanov@nginx.com                     close(e);
222*1157Smax.romanov@nginx.com                 }
223*1157Smax.romanov@nginx.com             } else {
224*1157Smax.romanov@nginx.com                 close(e);
225*1157Smax.romanov@nginx.com             }
226*1157Smax.romanov@nginx.com         }
227*1157Smax.romanov@nginx.com     }
228*1157Smax.romanov@nginx.com }
229