xref: /unit/src/java/nginx/unit/websocket/WsFrameClient.java (revision 1157:7ae152bda303)
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package nginx.unit.websocket;
18 
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.CompletionHandler;
23 
24 import javax.websocket.CloseReason;
25 import javax.websocket.CloseReason.CloseCodes;
26 
27 import org.apache.juli.logging.Log;
28 import org.apache.juli.logging.LogFactory;
29 import org.apache.tomcat.util.res.StringManager;
30 
31 public class WsFrameClient extends WsFrameBase {
32 
33     private final Log log = LogFactory.getLog(WsFrameClient.class); // must not be static
34     private static final StringManager sm = StringManager.getManager(WsFrameClient.class);
35 
36     private final AsyncChannelWrapper channel;
37     private final CompletionHandler<Integer, Void> handler;
38     // Not final as it may need to be re-sized
39     private volatile ByteBuffer response;
40 
WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, Transformation transformation)41     public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession,
42             Transformation transformation) {
43         super(wsSession, transformation);
44         this.response = response;
45         this.channel = channel;
46         this.handler = new WsFrameClientCompletionHandler();
47     }
48 
49 
startInputProcessing()50     void startInputProcessing() {
51         try {
52             processSocketRead();
53         } catch (IOException e) {
54             close(e);
55         }
56     }
57 
58 
processSocketRead()59     private void processSocketRead() throws IOException {
60         while (true) {
61             switch (getReadState()) {
62             case WAITING:
63                 if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) {
64                     continue;
65                 }
66                 while (response.hasRemaining()) {
67                     if (isSuspended()) {
68                         if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
69                             continue;
70                         }
71                         // There is still data available in the response buffer
72                         // Return here so that the response buffer will not be
73                         // cleared and there will be no data read from the
74                         // socket. Thus when the read operation is resumed first
75                         // the data left in the response buffer will be consumed
76                         // and then a new socket read will be performed
77                         return;
78                     }
79                     inputBuffer.mark();
80                     inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());
81 
82                     int toCopy = Math.min(response.remaining(), inputBuffer.remaining());
83 
84                     // Copy remaining bytes read in HTTP phase to input buffer used by
85                     // frame processing
86 
87                     int orgLimit = response.limit();
88                     response.limit(response.position() + toCopy);
89                     inputBuffer.put(response);
90                     response.limit(orgLimit);
91 
92                     inputBuffer.limit(inputBuffer.position()).reset();
93 
94                     // Process the data we have
95                     processInputBuffer();
96                 }
97                 response.clear();
98 
99                 // Get some more data
100                 if (isOpen()) {
101                     channel.read(response, null, handler);
102                 } else {
103                     changeReadState(ReadState.CLOSING);
104                 }
105                 return;
106             case SUSPENDING_WAIT:
107                 if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) {
108                     continue;
109                 }
110                 return;
111             default:
112                 throw new IllegalStateException(
113                         sm.getString("wsFrameServer.illegalReadState", getReadState()));
114             }
115         }
116     }
117 
118 
close(Throwable t)119     private final void close(Throwable t) {
120         changeReadState(ReadState.CLOSING);
121         CloseReason cr;
122         if (t instanceof WsIOException) {
123             cr = ((WsIOException) t).getCloseReason();
124         } else {
125             cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage());
126         }
127 
128         try {
129             wsSession.close(cr);
130         } catch (IOException ignore) {
131             // Ignore
132         }
133     }
134 
135 
136     @Override
isMasked()137     protected boolean isMasked() {
138         // Data is from the server so it is not masked
139         return false;
140     }
141 
142 
143     @Override
getLog()144     protected Log getLog() {
145         return log;
146     }
147 
148     private class WsFrameClientCompletionHandler implements CompletionHandler<Integer, Void> {
149 
150         @Override
completed(Integer result, Void attachment)151         public void completed(Integer result, Void attachment) {
152             if (result.intValue() == -1) {
153                 // BZ 57762. A dropped connection will get reported as EOF
154                 // rather than as an error so handle it here.
155                 if (isOpen()) {
156                     // No close frame was received
157                     close(new EOFException());
158                 }
159                 // No data to process
160                 return;
161             }
162             response.flip();
163             doResumeProcessing(true);
164         }
165 
166         @Override
failed(Throwable exc, Void attachment)167         public void failed(Throwable exc, Void attachment) {
168             if (exc instanceof ReadBufferOverflowException) {
169                 // response will be empty if this exception is thrown
170                 response = ByteBuffer
171                         .allocate(((ReadBufferOverflowException) exc).getMinBufferSize());
172                 response.flip();
173                 doResumeProcessing(false);
174             } else {
175                 close(exc);
176             }
177         }
178 
doResumeProcessing(boolean checkOpenOnError)179         private void doResumeProcessing(boolean checkOpenOnError) {
180             while (true) {
181                 switch (getReadState()) {
182                 case PROCESSING:
183                     if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) {
184                         continue;
185                     }
186                     resumeProcessing(checkOpenOnError);
187                     return;
188                 case SUSPENDING_PROCESS:
189                     if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
190                         continue;
191                     }
192                     return;
193                 default:
194                     throw new IllegalStateException(
195                             sm.getString("wsFrame.illegalReadState", getReadState()));
196                 }
197             }
198         }
199     }
200 
201 
202     @Override
resumeProcessing()203     protected void resumeProcessing() {
204         resumeProcessing(true);
205     }
206 
resumeProcessing(boolean checkOpenOnError)207     private void resumeProcessing(boolean checkOpenOnError) {
208         try {
209             processSocketRead();
210         } catch (IOException e) {
211             if (checkOpenOnError) {
212                 // Only send a close message on an IOException if the client
213                 // has not yet received a close control message from the server
214                 // as the IOException may be in response to the client
215                 // continuing to send a message after the server sent a close
216                 // control message.
217                 if (isOpen()) {
218                     if (log.isDebugEnabled()) {
219                         log.debug(sm.getString("wsFrameClient.ioe"), e);
220                     }
221                     close(e);
222                 }
223             } else {
224                 close(e);
225             }
226         }
227     }
228 }
229