1*1157Smax.romanov@nginx.com /* 2*1157Smax.romanov@nginx.com * Licensed to the Apache Software Foundation (ASF) under one or more 3*1157Smax.romanov@nginx.com * contributor license agreements. See the NOTICE file distributed with 4*1157Smax.romanov@nginx.com * this work for additional information regarding copyright ownership. 5*1157Smax.romanov@nginx.com * The ASF licenses this file to You under the Apache License, Version 2.0 6*1157Smax.romanov@nginx.com * (the "License"); you may not use this file except in compliance with 7*1157Smax.romanov@nginx.com * the License. You may obtain a copy of the License at 8*1157Smax.romanov@nginx.com * 9*1157Smax.romanov@nginx.com * http://www.apache.org/licenses/LICENSE-2.0 10*1157Smax.romanov@nginx.com * 11*1157Smax.romanov@nginx.com * Unless required by applicable law or agreed to in writing, software 12*1157Smax.romanov@nginx.com * distributed under the License is distributed on an "AS IS" BASIS, 13*1157Smax.romanov@nginx.com * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*1157Smax.romanov@nginx.com * See the License for the specific language governing permissions and 15*1157Smax.romanov@nginx.com * limitations under the License. 16*1157Smax.romanov@nginx.com */ 17*1157Smax.romanov@nginx.com package nginx.unit.websocket; 18*1157Smax.romanov@nginx.com 19*1157Smax.romanov@nginx.com import java.io.EOFException; 20*1157Smax.romanov@nginx.com import java.io.IOException; 21*1157Smax.romanov@nginx.com import java.nio.ByteBuffer; 22*1157Smax.romanov@nginx.com import java.nio.channels.CompletionHandler; 23*1157Smax.romanov@nginx.com 24*1157Smax.romanov@nginx.com import javax.websocket.CloseReason; 25*1157Smax.romanov@nginx.com import javax.websocket.CloseReason.CloseCodes; 26*1157Smax.romanov@nginx.com 27*1157Smax.romanov@nginx.com import org.apache.juli.logging.Log; 28*1157Smax.romanov@nginx.com import org.apache.juli.logging.LogFactory; 29*1157Smax.romanov@nginx.com import org.apache.tomcat.util.res.StringManager; 30*1157Smax.romanov@nginx.com 31*1157Smax.romanov@nginx.com public class WsFrameClient extends WsFrameBase { 32*1157Smax.romanov@nginx.com 33*1157Smax.romanov@nginx.com private final Log log = LogFactory.getLog(WsFrameClient.class); // must not be static 34*1157Smax.romanov@nginx.com private static final StringManager sm = StringManager.getManager(WsFrameClient.class); 35*1157Smax.romanov@nginx.com 36*1157Smax.romanov@nginx.com private final AsyncChannelWrapper channel; 37*1157Smax.romanov@nginx.com private final CompletionHandler<Integer, Void> handler; 38*1157Smax.romanov@nginx.com // Not final as it may need to be re-sized 39*1157Smax.romanov@nginx.com private volatile ByteBuffer response; 40*1157Smax.romanov@nginx.com WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, Transformation transformation)41*1157Smax.romanov@nginx.com public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession, 42*1157Smax.romanov@nginx.com Transformation transformation) { 43*1157Smax.romanov@nginx.com super(wsSession, transformation); 44*1157Smax.romanov@nginx.com this.response = response; 45*1157Smax.romanov@nginx.com this.channel = channel; 46*1157Smax.romanov@nginx.com this.handler = new WsFrameClientCompletionHandler(); 47*1157Smax.romanov@nginx.com } 48*1157Smax.romanov@nginx.com 49*1157Smax.romanov@nginx.com startInputProcessing()50*1157Smax.romanov@nginx.com void startInputProcessing() { 51*1157Smax.romanov@nginx.com try { 52*1157Smax.romanov@nginx.com processSocketRead(); 53*1157Smax.romanov@nginx.com } catch (IOException e) { 54*1157Smax.romanov@nginx.com close(e); 55*1157Smax.romanov@nginx.com } 56*1157Smax.romanov@nginx.com } 57*1157Smax.romanov@nginx.com 58*1157Smax.romanov@nginx.com processSocketRead()59*1157Smax.romanov@nginx.com private void processSocketRead() throws IOException { 60*1157Smax.romanov@nginx.com while (true) { 61*1157Smax.romanov@nginx.com switch (getReadState()) { 62*1157Smax.romanov@nginx.com case WAITING: 63*1157Smax.romanov@nginx.com if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) { 64*1157Smax.romanov@nginx.com continue; 65*1157Smax.romanov@nginx.com } 66*1157Smax.romanov@nginx.com while (response.hasRemaining()) { 67*1157Smax.romanov@nginx.com if (isSuspended()) { 68*1157Smax.romanov@nginx.com if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { 69*1157Smax.romanov@nginx.com continue; 70*1157Smax.romanov@nginx.com } 71*1157Smax.romanov@nginx.com // There is still data available in the response buffer 72*1157Smax.romanov@nginx.com // Return here so that the response buffer will not be 73*1157Smax.romanov@nginx.com // cleared and there will be no data read from the 74*1157Smax.romanov@nginx.com // socket. Thus when the read operation is resumed first 75*1157Smax.romanov@nginx.com // the data left in the response buffer will be consumed 76*1157Smax.romanov@nginx.com // and then a new socket read will be performed 77*1157Smax.romanov@nginx.com return; 78*1157Smax.romanov@nginx.com } 79*1157Smax.romanov@nginx.com inputBuffer.mark(); 80*1157Smax.romanov@nginx.com inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); 81*1157Smax.romanov@nginx.com 82*1157Smax.romanov@nginx.com int toCopy = Math.min(response.remaining(), inputBuffer.remaining()); 83*1157Smax.romanov@nginx.com 84*1157Smax.romanov@nginx.com // Copy remaining bytes read in HTTP phase to input buffer used by 85*1157Smax.romanov@nginx.com // frame processing 86*1157Smax.romanov@nginx.com 87*1157Smax.romanov@nginx.com int orgLimit = response.limit(); 88*1157Smax.romanov@nginx.com response.limit(response.position() + toCopy); 89*1157Smax.romanov@nginx.com inputBuffer.put(response); 90*1157Smax.romanov@nginx.com response.limit(orgLimit); 91*1157Smax.romanov@nginx.com 92*1157Smax.romanov@nginx.com inputBuffer.limit(inputBuffer.position()).reset(); 93*1157Smax.romanov@nginx.com 94*1157Smax.romanov@nginx.com // Process the data we have 95*1157Smax.romanov@nginx.com processInputBuffer(); 96*1157Smax.romanov@nginx.com } 97*1157Smax.romanov@nginx.com response.clear(); 98*1157Smax.romanov@nginx.com 99*1157Smax.romanov@nginx.com // Get some more data 100*1157Smax.romanov@nginx.com if (isOpen()) { 101*1157Smax.romanov@nginx.com channel.read(response, null, handler); 102*1157Smax.romanov@nginx.com } else { 103*1157Smax.romanov@nginx.com changeReadState(ReadState.CLOSING); 104*1157Smax.romanov@nginx.com } 105*1157Smax.romanov@nginx.com return; 106*1157Smax.romanov@nginx.com case SUSPENDING_WAIT: 107*1157Smax.romanov@nginx.com if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) { 108*1157Smax.romanov@nginx.com continue; 109*1157Smax.romanov@nginx.com } 110*1157Smax.romanov@nginx.com return; 111*1157Smax.romanov@nginx.com default: 112*1157Smax.romanov@nginx.com throw new IllegalStateException( 113*1157Smax.romanov@nginx.com sm.getString("wsFrameServer.illegalReadState", getReadState())); 114*1157Smax.romanov@nginx.com } 115*1157Smax.romanov@nginx.com } 116*1157Smax.romanov@nginx.com } 117*1157Smax.romanov@nginx.com 118*1157Smax.romanov@nginx.com close(Throwable t)119*1157Smax.romanov@nginx.com private final void close(Throwable t) { 120*1157Smax.romanov@nginx.com changeReadState(ReadState.CLOSING); 121*1157Smax.romanov@nginx.com CloseReason cr; 122*1157Smax.romanov@nginx.com if (t instanceof WsIOException) { 123*1157Smax.romanov@nginx.com cr = ((WsIOException) t).getCloseReason(); 124*1157Smax.romanov@nginx.com } else { 125*1157Smax.romanov@nginx.com cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage()); 126*1157Smax.romanov@nginx.com } 127*1157Smax.romanov@nginx.com 128*1157Smax.romanov@nginx.com try { 129*1157Smax.romanov@nginx.com wsSession.close(cr); 130*1157Smax.romanov@nginx.com } catch (IOException ignore) { 131*1157Smax.romanov@nginx.com // Ignore 132*1157Smax.romanov@nginx.com } 133*1157Smax.romanov@nginx.com } 134*1157Smax.romanov@nginx.com 135*1157Smax.romanov@nginx.com 136*1157Smax.romanov@nginx.com @Override isMasked()137*1157Smax.romanov@nginx.com protected boolean isMasked() { 138*1157Smax.romanov@nginx.com // Data is from the server so it is not masked 139*1157Smax.romanov@nginx.com return false; 140*1157Smax.romanov@nginx.com } 141*1157Smax.romanov@nginx.com 142*1157Smax.romanov@nginx.com 143*1157Smax.romanov@nginx.com @Override getLog()144*1157Smax.romanov@nginx.com protected Log getLog() { 145*1157Smax.romanov@nginx.com return log; 146*1157Smax.romanov@nginx.com } 147*1157Smax.romanov@nginx.com 148*1157Smax.romanov@nginx.com private class WsFrameClientCompletionHandler implements CompletionHandler<Integer, Void> { 149*1157Smax.romanov@nginx.com 150*1157Smax.romanov@nginx.com @Override completed(Integer result, Void attachment)151*1157Smax.romanov@nginx.com public void completed(Integer result, Void attachment) { 152*1157Smax.romanov@nginx.com if (result.intValue() == -1) { 153*1157Smax.romanov@nginx.com // BZ 57762. A dropped connection will get reported as EOF 154*1157Smax.romanov@nginx.com // rather than as an error so handle it here. 155*1157Smax.romanov@nginx.com if (isOpen()) { 156*1157Smax.romanov@nginx.com // No close frame was received 157*1157Smax.romanov@nginx.com close(new EOFException()); 158*1157Smax.romanov@nginx.com } 159*1157Smax.romanov@nginx.com // No data to process 160*1157Smax.romanov@nginx.com return; 161*1157Smax.romanov@nginx.com } 162*1157Smax.romanov@nginx.com response.flip(); 163*1157Smax.romanov@nginx.com doResumeProcessing(true); 164*1157Smax.romanov@nginx.com } 165*1157Smax.romanov@nginx.com 166*1157Smax.romanov@nginx.com @Override failed(Throwable exc, Void attachment)167*1157Smax.romanov@nginx.com public void failed(Throwable exc, Void attachment) { 168*1157Smax.romanov@nginx.com if (exc instanceof ReadBufferOverflowException) { 169*1157Smax.romanov@nginx.com // response will be empty if this exception is thrown 170*1157Smax.romanov@nginx.com response = ByteBuffer 171*1157Smax.romanov@nginx.com .allocate(((ReadBufferOverflowException) exc).getMinBufferSize()); 172*1157Smax.romanov@nginx.com response.flip(); 173*1157Smax.romanov@nginx.com doResumeProcessing(false); 174*1157Smax.romanov@nginx.com } else { 175*1157Smax.romanov@nginx.com close(exc); 176*1157Smax.romanov@nginx.com } 177*1157Smax.romanov@nginx.com } 178*1157Smax.romanov@nginx.com doResumeProcessing(boolean checkOpenOnError)179*1157Smax.romanov@nginx.com private void doResumeProcessing(boolean checkOpenOnError) { 180*1157Smax.romanov@nginx.com while (true) { 181*1157Smax.romanov@nginx.com switch (getReadState()) { 182*1157Smax.romanov@nginx.com case PROCESSING: 183*1157Smax.romanov@nginx.com if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) { 184*1157Smax.romanov@nginx.com continue; 185*1157Smax.romanov@nginx.com } 186*1157Smax.romanov@nginx.com resumeProcessing(checkOpenOnError); 187*1157Smax.romanov@nginx.com return; 188*1157Smax.romanov@nginx.com case SUSPENDING_PROCESS: 189*1157Smax.romanov@nginx.com if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { 190*1157Smax.romanov@nginx.com continue; 191*1157Smax.romanov@nginx.com } 192*1157Smax.romanov@nginx.com return; 193*1157Smax.romanov@nginx.com default: 194*1157Smax.romanov@nginx.com throw new IllegalStateException( 195*1157Smax.romanov@nginx.com sm.getString("wsFrame.illegalReadState", getReadState())); 196*1157Smax.romanov@nginx.com } 197*1157Smax.romanov@nginx.com } 198*1157Smax.romanov@nginx.com } 199*1157Smax.romanov@nginx.com } 200*1157Smax.romanov@nginx.com 201*1157Smax.romanov@nginx.com 202*1157Smax.romanov@nginx.com @Override resumeProcessing()203*1157Smax.romanov@nginx.com protected void resumeProcessing() { 204*1157Smax.romanov@nginx.com resumeProcessing(true); 205*1157Smax.romanov@nginx.com } 206*1157Smax.romanov@nginx.com resumeProcessing(boolean checkOpenOnError)207*1157Smax.romanov@nginx.com private void resumeProcessing(boolean checkOpenOnError) { 208*1157Smax.romanov@nginx.com try { 209*1157Smax.romanov@nginx.com processSocketRead(); 210*1157Smax.romanov@nginx.com } catch (IOException e) { 211*1157Smax.romanov@nginx.com if (checkOpenOnError) { 212*1157Smax.romanov@nginx.com // Only send a close message on an IOException if the client 213*1157Smax.romanov@nginx.com // has not yet received a close control message from the server 214*1157Smax.romanov@nginx.com // as the IOException may be in response to the client 215*1157Smax.romanov@nginx.com // continuing to send a message after the server sent a close 216*1157Smax.romanov@nginx.com // control message. 217*1157Smax.romanov@nginx.com if (isOpen()) { 218*1157Smax.romanov@nginx.com if (log.isDebugEnabled()) { 219*1157Smax.romanov@nginx.com log.debug(sm.getString("wsFrameClient.ioe"), e); 220*1157Smax.romanov@nginx.com } 221*1157Smax.romanov@nginx.com close(e); 222*1157Smax.romanov@nginx.com } 223*1157Smax.romanov@nginx.com } else { 224*1157Smax.romanov@nginx.com close(e); 225*1157Smax.romanov@nginx.com } 226*1157Smax.romanov@nginx.com } 227*1157Smax.romanov@nginx.com } 228*1157Smax.romanov@nginx.com } 229