1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package nginx.unit.websocket; 18 19 import java.io.EOFException; 20 import java.io.IOException; 21 import java.nio.ByteBuffer; 22 import java.nio.channels.CompletionHandler; 23 24 import javax.websocket.CloseReason; 25 import javax.websocket.CloseReason.CloseCodes; 26 27 import org.apache.juli.logging.Log; 28 import org.apache.juli.logging.LogFactory; 29 import org.apache.tomcat.util.res.StringManager; 30 31 public class WsFrameClient extends WsFrameBase { 32 33 private final Log log = LogFactory.getLog(WsFrameClient.class); // must not be static 34 private static final StringManager sm = StringManager.getManager(WsFrameClient.class); 35 36 private final AsyncChannelWrapper channel; 37 private final CompletionHandler<Integer, Void> handler; 38 // Not final as it may need to be re-sized 39 private volatile ByteBuffer response; 40 WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, Transformation transformation)41 public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, 42 Transformation transformation) { 43 super(wsSession, transformation); 44 this.response = response; 45 this.channel = channel; 46 this.handler = new WsFrameClientCompletionHandler(); 47 } 48 49 startInputProcessing()50 void startInputProcessing() { 51 try { 52 processSocketRead(); 53 } catch (IOException e) { 54 close(e); 55 } 56 } 57 58 processSocketRead()59 private void processSocketRead() throws IOException { 60 while (true) { 61 switch (getReadState()) { 62 case WAITING: 63 if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) { 64 continue; 65 } 66 while (response.hasRemaining()) { 67 if (isSuspended()) { 68 if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { 69 continue; 70 } 71 // There is still data available in the response buffer 72 // Return here so that the response buffer will not be 73 // cleared and there will be no data read from the 74 // socket. Thus when the read operation is resumed first 75 // the data left in the response buffer will be consumed 76 // and then a new socket read will be performed 77 return; 78 } 79 inputBuffer.mark(); 80 inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); 81 82 int toCopy = Math.min(response.remaining(), inputBuffer.remaining()); 83 84 // Copy remaining bytes read in HTTP phase to input buffer used by 85 // frame processing 86 87 int orgLimit = response.limit(); 88 response.limit(response.position() + toCopy); 89 inputBuffer.put(response); 90 response.limit(orgLimit); 91 92 inputBuffer.limit(inputBuffer.position()).reset(); 93 94 // Process the data we have 95 processInputBuffer(); 96 } 97 response.clear(); 98 99 // Get some more data 100 if (isOpen()) { 101 channel.read(response, null, handler); 102 } else { 103 changeReadState(ReadState.CLOSING); 104 } 105 return; 106 case SUSPENDING_WAIT: 107 if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) { 108 continue; 109 } 110 return; 111 default: 112 throw new IllegalStateException( 113 sm.getString("wsFrameServer.illegalReadState", getReadState())); 114 } 115 } 116 } 117 118 close(Throwable t)119 private final void close(Throwable t) { 120 changeReadState(ReadState.CLOSING); 121 CloseReason cr; 122 if (t instanceof WsIOException) { 123 cr = ((WsIOException) t).getCloseReason(); 124 } else { 125 cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage()); 126 } 127 128 try { 129 wsSession.close(cr); 130 } catch (IOException ignore) { 131 // Ignore 132 } 133 } 134 135 136 @Override isMasked()137 protected boolean isMasked() { 138 // Data is from the server so it is not masked 139 return false; 140 } 141 142 143 @Override getLog()144 protected Log getLog() { 145 return log; 146 } 147 148 private class WsFrameClientCompletionHandler implements CompletionHandler<Integer, Void> { 149 150 @Override completed(Integer result, Void attachment)151 public void completed(Integer result, Void attachment) { 152 if (result.intValue() == -1) { 153 // BZ 57762. A dropped connection will get reported as EOF 154 // rather than as an error so handle it here. 155 if (isOpen()) { 156 // No close frame was received 157 close(new EOFException()); 158 } 159 // No data to process 160 return; 161 } 162 response.flip(); 163 doResumeProcessing(true); 164 } 165 166 @Override failed(Throwable exc, Void attachment)167 public void failed(Throwable exc, Void attachment) { 168 if (exc instanceof ReadBufferOverflowException) { 169 // response will be empty if this exception is thrown 170 response = ByteBuffer 171 .allocate(((ReadBufferOverflowException) exc).getMinBufferSize()); 172 response.flip(); 173 doResumeProcessing(false); 174 } else { 175 close(exc); 176 } 177 } 178 doResumeProcessing(boolean checkOpenOnError)179 private void doResumeProcessing(boolean checkOpenOnError) { 180 while (true) { 181 switch (getReadState()) { 182 case PROCESSING: 183 if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) { 184 continue; 185 } 186 resumeProcessing(checkOpenOnError); 187 return; 188 case SUSPENDING_PROCESS: 189 if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { 190 continue; 191 } 192 return; 193 default: 194 throw new IllegalStateException( 195 sm.getString("wsFrame.illegalReadState", getReadState())); 196 } 197 } 198 } 199 } 200 201 202 @Override resumeProcessing()203 protected void resumeProcessing() { 204 resumeProcessing(true); 205 } 206 resumeProcessing(boolean checkOpenOnError)207 private void resumeProcessing(boolean checkOpenOnError) { 208 try { 209 processSocketRead(); 210 } catch (IOException e) { 211 if (checkOpenOnError) { 212 // Only send a close message on an IOException if the client 213 // has not yet received a close control message from the server 214 // as the IOException may be in response to the client 215 // continuing to send a message after the server sent a close 216 // control message. 217 if (isOpen()) { 218 if (log.isDebugEnabled()) { 219 log.debug(sm.getString("wsFrameClient.ioe"), e); 220 } 221 close(e); 222 } 223 } else { 224 close(e); 225 } 226 } 227 } 228 } 229