1*1157Smax.romanov@nginx.com /* 2*1157Smax.romanov@nginx.com * Licensed to the Apache Software Foundation (ASF) under one or more 3*1157Smax.romanov@nginx.com * contributor license agreements. See the NOTICE file distributed with 4*1157Smax.romanov@nginx.com * this work for additional information regarding copyright ownership. 5*1157Smax.romanov@nginx.com * The ASF licenses this file to You under the Apache License, Version 2.0 6*1157Smax.romanov@nginx.com * (the "License"); you may not use this file except in compliance with 7*1157Smax.romanov@nginx.com * the License. You may obtain a copy of the License at 8*1157Smax.romanov@nginx.com * 9*1157Smax.romanov@nginx.com * http://www.apache.org/licenses/LICENSE-2.0 10*1157Smax.romanov@nginx.com * 11*1157Smax.romanov@nginx.com * Unless required by applicable law or agreed to in writing, software 12*1157Smax.romanov@nginx.com * distributed under the License is distributed on an "AS IS" BASIS, 13*1157Smax.romanov@nginx.com * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*1157Smax.romanov@nginx.com * See the License for the specific language governing permissions and 15*1157Smax.romanov@nginx.com * limitations under the License. 16*1157Smax.romanov@nginx.com */ 17*1157Smax.romanov@nginx.com package nginx.unit.websocket; 18*1157Smax.romanov@nginx.com 19*1157Smax.romanov@nginx.com import java.io.IOException; 20*1157Smax.romanov@nginx.com import java.nio.ByteBuffer; 21*1157Smax.romanov@nginx.com import java.nio.CharBuffer; 22*1157Smax.romanov@nginx.com import java.nio.charset.CharsetDecoder; 23*1157Smax.romanov@nginx.com import java.nio.charset.CoderResult; 24*1157Smax.romanov@nginx.com import java.nio.charset.CodingErrorAction; 25*1157Smax.romanov@nginx.com import java.util.List; 26*1157Smax.romanov@nginx.com import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 27*1157Smax.romanov@nginx.com 28*1157Smax.romanov@nginx.com import javax.websocket.CloseReason; 29*1157Smax.romanov@nginx.com import javax.websocket.CloseReason.CloseCodes; 30*1157Smax.romanov@nginx.com import javax.websocket.Extension; 31*1157Smax.romanov@nginx.com import javax.websocket.MessageHandler; 32*1157Smax.romanov@nginx.com import javax.websocket.PongMessage; 33*1157Smax.romanov@nginx.com 34*1157Smax.romanov@nginx.com import org.apache.juli.logging.Log; 35*1157Smax.romanov@nginx.com import org.apache.tomcat.util.ExceptionUtils; 36*1157Smax.romanov@nginx.com import org.apache.tomcat.util.buf.Utf8Decoder; 37*1157Smax.romanov@nginx.com import org.apache.tomcat.util.res.StringManager; 38*1157Smax.romanov@nginx.com 39*1157Smax.romanov@nginx.com /** 40*1157Smax.romanov@nginx.com * Takes the ServletInputStream, processes the WebSocket frames it contains and 41*1157Smax.romanov@nginx.com * extracts the messages. WebSocket Pings received will be responded to 42*1157Smax.romanov@nginx.com * automatically without any action required by the application. 43*1157Smax.romanov@nginx.com */ 44*1157Smax.romanov@nginx.com public abstract class WsFrameBase { 45*1157Smax.romanov@nginx.com 46*1157Smax.romanov@nginx.com private static final StringManager sm = StringManager.getManager(WsFrameBase.class); 47*1157Smax.romanov@nginx.com 48*1157Smax.romanov@nginx.com // Connection level attributes 49*1157Smax.romanov@nginx.com protected final WsSession wsSession; 50*1157Smax.romanov@nginx.com protected final ByteBuffer inputBuffer; 51*1157Smax.romanov@nginx.com private final Transformation transformation; 52*1157Smax.romanov@nginx.com 53*1157Smax.romanov@nginx.com // Attributes for control messages 54*1157Smax.romanov@nginx.com // Control messages can appear in the middle of other messages so need 55*1157Smax.romanov@nginx.com // separate attributes 56*1157Smax.romanov@nginx.com private final ByteBuffer controlBufferBinary = ByteBuffer.allocate(125); 57*1157Smax.romanov@nginx.com private final CharBuffer controlBufferText = CharBuffer.allocate(125); 58*1157Smax.romanov@nginx.com 59*1157Smax.romanov@nginx.com // Attributes of the current message 60*1157Smax.romanov@nginx.com private final CharsetDecoder utf8DecoderControl = new Utf8Decoder(). 61*1157Smax.romanov@nginx.com onMalformedInput(CodingErrorAction.REPORT). 62*1157Smax.romanov@nginx.com onUnmappableCharacter(CodingErrorAction.REPORT); 63*1157Smax.romanov@nginx.com private final CharsetDecoder utf8DecoderMessage = new Utf8Decoder(). 64*1157Smax.romanov@nginx.com onMalformedInput(CodingErrorAction.REPORT). 65*1157Smax.romanov@nginx.com onUnmappableCharacter(CodingErrorAction.REPORT); 66*1157Smax.romanov@nginx.com private boolean continuationExpected = false; 67*1157Smax.romanov@nginx.com private boolean textMessage = false; 68*1157Smax.romanov@nginx.com private ByteBuffer messageBufferBinary; 69*1157Smax.romanov@nginx.com private CharBuffer messageBufferText; 70*1157Smax.romanov@nginx.com // Cache the message handler in force when the message starts so it is used 71*1157Smax.romanov@nginx.com // consistently for the entire message 72*1157Smax.romanov@nginx.com private MessageHandler binaryMsgHandler = null; 73*1157Smax.romanov@nginx.com private MessageHandler textMsgHandler = null; 74*1157Smax.romanov@nginx.com 75*1157Smax.romanov@nginx.com // Attributes of the current frame 76*1157Smax.romanov@nginx.com private boolean fin = false; 77*1157Smax.romanov@nginx.com private int rsv = 0; 78*1157Smax.romanov@nginx.com private byte opCode = 0; 79*1157Smax.romanov@nginx.com private final byte[] mask = new byte[4]; 80*1157Smax.romanov@nginx.com private int maskIndex = 0; 81*1157Smax.romanov@nginx.com private long payloadLength = 0; 82*1157Smax.romanov@nginx.com private volatile long payloadWritten = 0; 83*1157Smax.romanov@nginx.com 84*1157Smax.romanov@nginx.com // Attributes tracking state 85*1157Smax.romanov@nginx.com private volatile State state = State.NEW_FRAME; 86*1157Smax.romanov@nginx.com private volatile boolean open = true; 87*1157Smax.romanov@nginx.com 88*1157Smax.romanov@nginx.com private static final AtomicReferenceFieldUpdater<WsFrameBase, ReadState> READ_STATE_UPDATER = 89*1157Smax.romanov@nginx.com AtomicReferenceFieldUpdater.newUpdater(WsFrameBase.class, ReadState.class, "readState"); 90*1157Smax.romanov@nginx.com private volatile ReadState readState = ReadState.WAITING; 91*1157Smax.romanov@nginx.com WsFrameBase(WsSession wsSession, Transformation transformation)92*1157Smax.romanov@nginx.com public WsFrameBase(WsSession wsSession, Transformation transformation) { 93*1157Smax.romanov@nginx.com inputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); 94*1157Smax.romanov@nginx.com inputBuffer.position(0).limit(0); 95*1157Smax.romanov@nginx.com messageBufferBinary = ByteBuffer.allocate(wsSession.getMaxBinaryMessageBufferSize()); 96*1157Smax.romanov@nginx.com messageBufferText = CharBuffer.allocate(wsSession.getMaxTextMessageBufferSize()); 97*1157Smax.romanov@nginx.com this.wsSession = wsSession; 98*1157Smax.romanov@nginx.com Transformation finalTransformation; 99*1157Smax.romanov@nginx.com if (isMasked()) { 100*1157Smax.romanov@nginx.com finalTransformation = new UnmaskTransformation(); 101*1157Smax.romanov@nginx.com } else { 102*1157Smax.romanov@nginx.com finalTransformation = new NoopTransformation(); 103*1157Smax.romanov@nginx.com } 104*1157Smax.romanov@nginx.com if (transformation == null) { 105*1157Smax.romanov@nginx.com this.transformation = finalTransformation; 106*1157Smax.romanov@nginx.com } else { 107*1157Smax.romanov@nginx.com transformation.setNext(finalTransformation); 108*1157Smax.romanov@nginx.com this.transformation = transformation; 109*1157Smax.romanov@nginx.com } 110*1157Smax.romanov@nginx.com } 111*1157Smax.romanov@nginx.com 112*1157Smax.romanov@nginx.com processInputBuffer()113*1157Smax.romanov@nginx.com protected void processInputBuffer() throws IOException { 114*1157Smax.romanov@nginx.com while (!isSuspended()) { 115*1157Smax.romanov@nginx.com wsSession.updateLastActive(); 116*1157Smax.romanov@nginx.com if (state == State.NEW_FRAME) { 117*1157Smax.romanov@nginx.com if (!processInitialHeader()) { 118*1157Smax.romanov@nginx.com break; 119*1157Smax.romanov@nginx.com } 120*1157Smax.romanov@nginx.com // If a close frame has been received, no further data should 121*1157Smax.romanov@nginx.com // have seen 122*1157Smax.romanov@nginx.com if (!open) { 123*1157Smax.romanov@nginx.com throw new IOException(sm.getString("wsFrame.closed")); 124*1157Smax.romanov@nginx.com } 125*1157Smax.romanov@nginx.com } 126*1157Smax.romanov@nginx.com if (state == State.PARTIAL_HEADER) { 127*1157Smax.romanov@nginx.com if (!processRemainingHeader()) { 128*1157Smax.romanov@nginx.com break; 129*1157Smax.romanov@nginx.com } 130*1157Smax.romanov@nginx.com } 131*1157Smax.romanov@nginx.com if (state == State.DATA) { 132*1157Smax.romanov@nginx.com if (!processData()) { 133*1157Smax.romanov@nginx.com break; 134*1157Smax.romanov@nginx.com } 135*1157Smax.romanov@nginx.com } 136*1157Smax.romanov@nginx.com } 137*1157Smax.romanov@nginx.com } 138*1157Smax.romanov@nginx.com 139*1157Smax.romanov@nginx.com 140*1157Smax.romanov@nginx.com /** 141*1157Smax.romanov@nginx.com * @return <code>true</code> if sufficient data was present to process all 142*1157Smax.romanov@nginx.com * of the initial header 143*1157Smax.romanov@nginx.com */ processInitialHeader()144*1157Smax.romanov@nginx.com private boolean processInitialHeader() throws IOException { 145*1157Smax.romanov@nginx.com // Need at least two bytes of data to do this 146*1157Smax.romanov@nginx.com if (inputBuffer.remaining() < 2) { 147*1157Smax.romanov@nginx.com return false; 148*1157Smax.romanov@nginx.com } 149*1157Smax.romanov@nginx.com int b = inputBuffer.get(); 150*1157Smax.romanov@nginx.com fin = (b & 0x80) != 0; 151*1157Smax.romanov@nginx.com rsv = (b & 0x70) >>> 4; 152*1157Smax.romanov@nginx.com opCode = (byte) (b & 0x0F); 153*1157Smax.romanov@nginx.com if (!transformation.validateRsv(rsv, opCode)) { 154*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 155*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 156*1157Smax.romanov@nginx.com sm.getString("wsFrame.wrongRsv", Integer.valueOf(rsv), Integer.valueOf(opCode)))); 157*1157Smax.romanov@nginx.com } 158*1157Smax.romanov@nginx.com 159*1157Smax.romanov@nginx.com if (Util.isControl(opCode)) { 160*1157Smax.romanov@nginx.com if (!fin) { 161*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 162*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 163*1157Smax.romanov@nginx.com sm.getString("wsFrame.controlFragmented"))); 164*1157Smax.romanov@nginx.com } 165*1157Smax.romanov@nginx.com if (opCode != Constants.OPCODE_PING && 166*1157Smax.romanov@nginx.com opCode != Constants.OPCODE_PONG && 167*1157Smax.romanov@nginx.com opCode != Constants.OPCODE_CLOSE) { 168*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 169*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 170*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidOpCode", Integer.valueOf(opCode)))); 171*1157Smax.romanov@nginx.com } 172*1157Smax.romanov@nginx.com } else { 173*1157Smax.romanov@nginx.com if (continuationExpected) { 174*1157Smax.romanov@nginx.com if (!Util.isContinuation(opCode)) { 175*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 176*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 177*1157Smax.romanov@nginx.com sm.getString("wsFrame.noContinuation"))); 178*1157Smax.romanov@nginx.com } 179*1157Smax.romanov@nginx.com } else { 180*1157Smax.romanov@nginx.com try { 181*1157Smax.romanov@nginx.com if (opCode == Constants.OPCODE_BINARY) { 182*1157Smax.romanov@nginx.com // New binary message 183*1157Smax.romanov@nginx.com textMessage = false; 184*1157Smax.romanov@nginx.com int size = wsSession.getMaxBinaryMessageBufferSize(); 185*1157Smax.romanov@nginx.com if (size != messageBufferBinary.capacity()) { 186*1157Smax.romanov@nginx.com messageBufferBinary = ByteBuffer.allocate(size); 187*1157Smax.romanov@nginx.com } 188*1157Smax.romanov@nginx.com binaryMsgHandler = wsSession.getBinaryMessageHandler(); 189*1157Smax.romanov@nginx.com textMsgHandler = null; 190*1157Smax.romanov@nginx.com } else if (opCode == Constants.OPCODE_TEXT) { 191*1157Smax.romanov@nginx.com // New text message 192*1157Smax.romanov@nginx.com textMessage = true; 193*1157Smax.romanov@nginx.com int size = wsSession.getMaxTextMessageBufferSize(); 194*1157Smax.romanov@nginx.com if (size != messageBufferText.capacity()) { 195*1157Smax.romanov@nginx.com messageBufferText = CharBuffer.allocate(size); 196*1157Smax.romanov@nginx.com } 197*1157Smax.romanov@nginx.com binaryMsgHandler = null; 198*1157Smax.romanov@nginx.com textMsgHandler = wsSession.getTextMessageHandler(); 199*1157Smax.romanov@nginx.com } else { 200*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 201*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 202*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidOpCode", Integer.valueOf(opCode)))); 203*1157Smax.romanov@nginx.com } 204*1157Smax.romanov@nginx.com } catch (IllegalStateException ise) { 205*1157Smax.romanov@nginx.com // Thrown if the session is already closed 206*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 207*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 208*1157Smax.romanov@nginx.com sm.getString("wsFrame.sessionClosed"))); 209*1157Smax.romanov@nginx.com } 210*1157Smax.romanov@nginx.com } 211*1157Smax.romanov@nginx.com continuationExpected = !fin; 212*1157Smax.romanov@nginx.com } 213*1157Smax.romanov@nginx.com b = inputBuffer.get(); 214*1157Smax.romanov@nginx.com // Client data must be masked 215*1157Smax.romanov@nginx.com if ((b & 0x80) == 0 && isMasked()) { 216*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 217*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 218*1157Smax.romanov@nginx.com sm.getString("wsFrame.notMasked"))); 219*1157Smax.romanov@nginx.com } 220*1157Smax.romanov@nginx.com payloadLength = b & 0x7F; 221*1157Smax.romanov@nginx.com state = State.PARTIAL_HEADER; 222*1157Smax.romanov@nginx.com if (getLog().isDebugEnabled()) { 223*1157Smax.romanov@nginx.com getLog().debug(sm.getString("wsFrame.partialHeaderComplete", Boolean.toString(fin), 224*1157Smax.romanov@nginx.com Integer.toString(rsv), Integer.toString(opCode), Long.toString(payloadLength))); 225*1157Smax.romanov@nginx.com } 226*1157Smax.romanov@nginx.com return true; 227*1157Smax.romanov@nginx.com } 228*1157Smax.romanov@nginx.com 229*1157Smax.romanov@nginx.com isMasked()230*1157Smax.romanov@nginx.com protected abstract boolean isMasked(); getLog()231*1157Smax.romanov@nginx.com protected abstract Log getLog(); 232*1157Smax.romanov@nginx.com 233*1157Smax.romanov@nginx.com 234*1157Smax.romanov@nginx.com /** 235*1157Smax.romanov@nginx.com * @return <code>true</code> if sufficient data was present to complete the 236*1157Smax.romanov@nginx.com * processing of the header 237*1157Smax.romanov@nginx.com */ processRemainingHeader()238*1157Smax.romanov@nginx.com private boolean processRemainingHeader() throws IOException { 239*1157Smax.romanov@nginx.com // Ignore the 2 bytes already read. 4 for the mask 240*1157Smax.romanov@nginx.com int headerLength; 241*1157Smax.romanov@nginx.com if (isMasked()) { 242*1157Smax.romanov@nginx.com headerLength = 4; 243*1157Smax.romanov@nginx.com } else { 244*1157Smax.romanov@nginx.com headerLength = 0; 245*1157Smax.romanov@nginx.com } 246*1157Smax.romanov@nginx.com // Add additional bytes depending on length 247*1157Smax.romanov@nginx.com if (payloadLength == 126) { 248*1157Smax.romanov@nginx.com headerLength += 2; 249*1157Smax.romanov@nginx.com } else if (payloadLength == 127) { 250*1157Smax.romanov@nginx.com headerLength += 8; 251*1157Smax.romanov@nginx.com } 252*1157Smax.romanov@nginx.com if (inputBuffer.remaining() < headerLength) { 253*1157Smax.romanov@nginx.com return false; 254*1157Smax.romanov@nginx.com } 255*1157Smax.romanov@nginx.com // Calculate new payload length if necessary 256*1157Smax.romanov@nginx.com if (payloadLength == 126) { 257*1157Smax.romanov@nginx.com payloadLength = byteArrayToLong(inputBuffer.array(), 258*1157Smax.romanov@nginx.com inputBuffer.arrayOffset() + inputBuffer.position(), 2); 259*1157Smax.romanov@nginx.com inputBuffer.position(inputBuffer.position() + 2); 260*1157Smax.romanov@nginx.com } else if (payloadLength == 127) { 261*1157Smax.romanov@nginx.com payloadLength = byteArrayToLong(inputBuffer.array(), 262*1157Smax.romanov@nginx.com inputBuffer.arrayOffset() + inputBuffer.position(), 8); 263*1157Smax.romanov@nginx.com inputBuffer.position(inputBuffer.position() + 8); 264*1157Smax.romanov@nginx.com } 265*1157Smax.romanov@nginx.com if (Util.isControl(opCode)) { 266*1157Smax.romanov@nginx.com if (payloadLength > 125) { 267*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 268*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 269*1157Smax.romanov@nginx.com sm.getString("wsFrame.controlPayloadTooBig", Long.valueOf(payloadLength)))); 270*1157Smax.romanov@nginx.com } 271*1157Smax.romanov@nginx.com if (!fin) { 272*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 273*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 274*1157Smax.romanov@nginx.com sm.getString("wsFrame.controlNoFin"))); 275*1157Smax.romanov@nginx.com } 276*1157Smax.romanov@nginx.com } 277*1157Smax.romanov@nginx.com if (isMasked()) { 278*1157Smax.romanov@nginx.com inputBuffer.get(mask, 0, 4); 279*1157Smax.romanov@nginx.com } 280*1157Smax.romanov@nginx.com state = State.DATA; 281*1157Smax.romanov@nginx.com return true; 282*1157Smax.romanov@nginx.com } 283*1157Smax.romanov@nginx.com 284*1157Smax.romanov@nginx.com processData()285*1157Smax.romanov@nginx.com private boolean processData() throws IOException { 286*1157Smax.romanov@nginx.com boolean result; 287*1157Smax.romanov@nginx.com if (Util.isControl(opCode)) { 288*1157Smax.romanov@nginx.com result = processDataControl(); 289*1157Smax.romanov@nginx.com } else if (textMessage) { 290*1157Smax.romanov@nginx.com if (textMsgHandler == null) { 291*1157Smax.romanov@nginx.com result = swallowInput(); 292*1157Smax.romanov@nginx.com } else { 293*1157Smax.romanov@nginx.com result = processDataText(); 294*1157Smax.romanov@nginx.com } 295*1157Smax.romanov@nginx.com } else { 296*1157Smax.romanov@nginx.com if (binaryMsgHandler == null) { 297*1157Smax.romanov@nginx.com result = swallowInput(); 298*1157Smax.romanov@nginx.com } else { 299*1157Smax.romanov@nginx.com result = processDataBinary(); 300*1157Smax.romanov@nginx.com } 301*1157Smax.romanov@nginx.com } 302*1157Smax.romanov@nginx.com checkRoomPayload(); 303*1157Smax.romanov@nginx.com return result; 304*1157Smax.romanov@nginx.com } 305*1157Smax.romanov@nginx.com 306*1157Smax.romanov@nginx.com processDataControl()307*1157Smax.romanov@nginx.com private boolean processDataControl() throws IOException { 308*1157Smax.romanov@nginx.com TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, controlBufferBinary); 309*1157Smax.romanov@nginx.com if (TransformationResult.UNDERFLOW.equals(tr)) { 310*1157Smax.romanov@nginx.com return false; 311*1157Smax.romanov@nginx.com } 312*1157Smax.romanov@nginx.com // Control messages have fixed message size so 313*1157Smax.romanov@nginx.com // TransformationResult.OVERFLOW is not possible here 314*1157Smax.romanov@nginx.com 315*1157Smax.romanov@nginx.com controlBufferBinary.flip(); 316*1157Smax.romanov@nginx.com if (opCode == Constants.OPCODE_CLOSE) { 317*1157Smax.romanov@nginx.com open = false; 318*1157Smax.romanov@nginx.com String reason = null; 319*1157Smax.romanov@nginx.com int code = CloseCodes.NORMAL_CLOSURE.getCode(); 320*1157Smax.romanov@nginx.com if (controlBufferBinary.remaining() == 1) { 321*1157Smax.romanov@nginx.com controlBufferBinary.clear(); 322*1157Smax.romanov@nginx.com // Payload must be zero or 2+ bytes long 323*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 324*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 325*1157Smax.romanov@nginx.com sm.getString("wsFrame.oneByteCloseCode"))); 326*1157Smax.romanov@nginx.com } 327*1157Smax.romanov@nginx.com if (controlBufferBinary.remaining() > 1) { 328*1157Smax.romanov@nginx.com code = controlBufferBinary.getShort(); 329*1157Smax.romanov@nginx.com if (controlBufferBinary.remaining() > 0) { 330*1157Smax.romanov@nginx.com CoderResult cr = utf8DecoderControl.decode(controlBufferBinary, 331*1157Smax.romanov@nginx.com controlBufferText, true); 332*1157Smax.romanov@nginx.com if (cr.isError()) { 333*1157Smax.romanov@nginx.com controlBufferBinary.clear(); 334*1157Smax.romanov@nginx.com controlBufferText.clear(); 335*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 336*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 337*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidUtf8Close"))); 338*1157Smax.romanov@nginx.com } 339*1157Smax.romanov@nginx.com // There will be no overflow as the output buffer is big 340*1157Smax.romanov@nginx.com // enough. There will be no underflow as all the data is 341*1157Smax.romanov@nginx.com // passed to the decoder in a single call. 342*1157Smax.romanov@nginx.com controlBufferText.flip(); 343*1157Smax.romanov@nginx.com reason = controlBufferText.toString(); 344*1157Smax.romanov@nginx.com } 345*1157Smax.romanov@nginx.com } 346*1157Smax.romanov@nginx.com wsSession.onClose(new CloseReason(Util.getCloseCode(code), reason)); 347*1157Smax.romanov@nginx.com } else if (opCode == Constants.OPCODE_PING) { 348*1157Smax.romanov@nginx.com if (wsSession.isOpen()) { 349*1157Smax.romanov@nginx.com wsSession.getBasicRemote().sendPong(controlBufferBinary); 350*1157Smax.romanov@nginx.com } 351*1157Smax.romanov@nginx.com } else if (opCode == Constants.OPCODE_PONG) { 352*1157Smax.romanov@nginx.com MessageHandler.Whole<PongMessage> mhPong = wsSession.getPongMessageHandler(); 353*1157Smax.romanov@nginx.com if (mhPong != null) { 354*1157Smax.romanov@nginx.com try { 355*1157Smax.romanov@nginx.com mhPong.onMessage(new WsPongMessage(controlBufferBinary)); 356*1157Smax.romanov@nginx.com } catch (Throwable t) { 357*1157Smax.romanov@nginx.com handleThrowableOnSend(t); 358*1157Smax.romanov@nginx.com } finally { 359*1157Smax.romanov@nginx.com controlBufferBinary.clear(); 360*1157Smax.romanov@nginx.com } 361*1157Smax.romanov@nginx.com } 362*1157Smax.romanov@nginx.com } else { 363*1157Smax.romanov@nginx.com // Should have caught this earlier but just in case... 364*1157Smax.romanov@nginx.com controlBufferBinary.clear(); 365*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 366*1157Smax.romanov@nginx.com CloseCodes.PROTOCOL_ERROR, 367*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidOpCode", Integer.valueOf(opCode)))); 368*1157Smax.romanov@nginx.com } 369*1157Smax.romanov@nginx.com controlBufferBinary.clear(); 370*1157Smax.romanov@nginx.com newFrame(); 371*1157Smax.romanov@nginx.com return true; 372*1157Smax.romanov@nginx.com } 373*1157Smax.romanov@nginx.com 374*1157Smax.romanov@nginx.com 375*1157Smax.romanov@nginx.com @SuppressWarnings("unchecked") sendMessageText(boolean last)376*1157Smax.romanov@nginx.com protected void sendMessageText(boolean last) throws WsIOException { 377*1157Smax.romanov@nginx.com if (textMsgHandler instanceof WrappedMessageHandler) { 378*1157Smax.romanov@nginx.com long maxMessageSize = ((WrappedMessageHandler) textMsgHandler).getMaxMessageSize(); 379*1157Smax.romanov@nginx.com if (maxMessageSize > -1 && messageBufferText.remaining() > maxMessageSize) { 380*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG, 381*1157Smax.romanov@nginx.com sm.getString("wsFrame.messageTooBig", 382*1157Smax.romanov@nginx.com Long.valueOf(messageBufferText.remaining()), 383*1157Smax.romanov@nginx.com Long.valueOf(maxMessageSize)))); 384*1157Smax.romanov@nginx.com } 385*1157Smax.romanov@nginx.com } 386*1157Smax.romanov@nginx.com 387*1157Smax.romanov@nginx.com try { 388*1157Smax.romanov@nginx.com if (textMsgHandler instanceof MessageHandler.Partial<?>) { 389*1157Smax.romanov@nginx.com ((MessageHandler.Partial<String>) textMsgHandler) 390*1157Smax.romanov@nginx.com .onMessage(messageBufferText.toString(), last); 391*1157Smax.romanov@nginx.com } else { 392*1157Smax.romanov@nginx.com // Caller ensures last == true if this branch is used 393*1157Smax.romanov@nginx.com ((MessageHandler.Whole<String>) textMsgHandler) 394*1157Smax.romanov@nginx.com .onMessage(messageBufferText.toString()); 395*1157Smax.romanov@nginx.com } 396*1157Smax.romanov@nginx.com } catch (Throwable t) { 397*1157Smax.romanov@nginx.com handleThrowableOnSend(t); 398*1157Smax.romanov@nginx.com } finally { 399*1157Smax.romanov@nginx.com messageBufferText.clear(); 400*1157Smax.romanov@nginx.com } 401*1157Smax.romanov@nginx.com } 402*1157Smax.romanov@nginx.com 403*1157Smax.romanov@nginx.com processDataText()404*1157Smax.romanov@nginx.com private boolean processDataText() throws IOException { 405*1157Smax.romanov@nginx.com // Copy the available data to the buffer 406*1157Smax.romanov@nginx.com TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary); 407*1157Smax.romanov@nginx.com while (!TransformationResult.END_OF_FRAME.equals(tr)) { 408*1157Smax.romanov@nginx.com // Frame not complete - we ran out of something 409*1157Smax.romanov@nginx.com // Convert bytes to UTF-8 410*1157Smax.romanov@nginx.com messageBufferBinary.flip(); 411*1157Smax.romanov@nginx.com while (true) { 412*1157Smax.romanov@nginx.com CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText, 413*1157Smax.romanov@nginx.com false); 414*1157Smax.romanov@nginx.com if (cr.isError()) { 415*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 416*1157Smax.romanov@nginx.com CloseCodes.NOT_CONSISTENT, 417*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidUtf8"))); 418*1157Smax.romanov@nginx.com } else if (cr.isOverflow()) { 419*1157Smax.romanov@nginx.com // Ran out of space in text buffer - flush it 420*1157Smax.romanov@nginx.com if (usePartial()) { 421*1157Smax.romanov@nginx.com messageBufferText.flip(); 422*1157Smax.romanov@nginx.com sendMessageText(false); 423*1157Smax.romanov@nginx.com messageBufferText.clear(); 424*1157Smax.romanov@nginx.com } else { 425*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 426*1157Smax.romanov@nginx.com CloseCodes.TOO_BIG, 427*1157Smax.romanov@nginx.com sm.getString("wsFrame.textMessageTooBig"))); 428*1157Smax.romanov@nginx.com } 429*1157Smax.romanov@nginx.com } else if (cr.isUnderflow()) { 430*1157Smax.romanov@nginx.com // Compact what we have to create as much space as possible 431*1157Smax.romanov@nginx.com messageBufferBinary.compact(); 432*1157Smax.romanov@nginx.com 433*1157Smax.romanov@nginx.com // Need more input 434*1157Smax.romanov@nginx.com // What did we run out of? 435*1157Smax.romanov@nginx.com if (TransformationResult.OVERFLOW.equals(tr)) { 436*1157Smax.romanov@nginx.com // Ran out of message buffer - exit inner loop and 437*1157Smax.romanov@nginx.com // refill 438*1157Smax.romanov@nginx.com break; 439*1157Smax.romanov@nginx.com } else { 440*1157Smax.romanov@nginx.com // TransformationResult.UNDERFLOW 441*1157Smax.romanov@nginx.com // Ran out of input data - get some more 442*1157Smax.romanov@nginx.com return false; 443*1157Smax.romanov@nginx.com } 444*1157Smax.romanov@nginx.com } 445*1157Smax.romanov@nginx.com } 446*1157Smax.romanov@nginx.com // Read more input data 447*1157Smax.romanov@nginx.com tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary); 448*1157Smax.romanov@nginx.com } 449*1157Smax.romanov@nginx.com 450*1157Smax.romanov@nginx.com messageBufferBinary.flip(); 451*1157Smax.romanov@nginx.com boolean last = false; 452*1157Smax.romanov@nginx.com // Frame is fully received 453*1157Smax.romanov@nginx.com // Convert bytes to UTF-8 454*1157Smax.romanov@nginx.com while (true) { 455*1157Smax.romanov@nginx.com CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText, 456*1157Smax.romanov@nginx.com last); 457*1157Smax.romanov@nginx.com if (cr.isError()) { 458*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 459*1157Smax.romanov@nginx.com CloseCodes.NOT_CONSISTENT, 460*1157Smax.romanov@nginx.com sm.getString("wsFrame.invalidUtf8"))); 461*1157Smax.romanov@nginx.com } else if (cr.isOverflow()) { 462*1157Smax.romanov@nginx.com // Ran out of space in text buffer - flush it 463*1157Smax.romanov@nginx.com if (usePartial()) { 464*1157Smax.romanov@nginx.com messageBufferText.flip(); 465*1157Smax.romanov@nginx.com sendMessageText(false); 466*1157Smax.romanov@nginx.com messageBufferText.clear(); 467*1157Smax.romanov@nginx.com } else { 468*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason( 469*1157Smax.romanov@nginx.com CloseCodes.TOO_BIG, 470*1157Smax.romanov@nginx.com sm.getString("wsFrame.textMessageTooBig"))); 471*1157Smax.romanov@nginx.com } 472*1157Smax.romanov@nginx.com } else if (cr.isUnderflow() && !last) { 473*1157Smax.romanov@nginx.com // End of frame and possible message as well. 474*1157Smax.romanov@nginx.com 475*1157Smax.romanov@nginx.com if (continuationExpected) { 476*1157Smax.romanov@nginx.com // If partial messages are supported, send what we have 477*1157Smax.romanov@nginx.com // managed to decode 478*1157Smax.romanov@nginx.com if (usePartial()) { 479*1157Smax.romanov@nginx.com messageBufferText.flip(); 480*1157Smax.romanov@nginx.com sendMessageText(false); 481*1157Smax.romanov@nginx.com messageBufferText.clear(); 482*1157Smax.romanov@nginx.com } 483*1157Smax.romanov@nginx.com messageBufferBinary.compact(); 484*1157Smax.romanov@nginx.com newFrame(); 485*1157Smax.romanov@nginx.com // Process next frame 486*1157Smax.romanov@nginx.com return true; 487*1157Smax.romanov@nginx.com } else { 488*1157Smax.romanov@nginx.com // Make sure coder has flushed all output 489*1157Smax.romanov@nginx.com last = true; 490*1157Smax.romanov@nginx.com } 491*1157Smax.romanov@nginx.com } else { 492*1157Smax.romanov@nginx.com // End of message 493*1157Smax.romanov@nginx.com messageBufferText.flip(); 494*1157Smax.romanov@nginx.com sendMessageText(true); 495*1157Smax.romanov@nginx.com 496*1157Smax.romanov@nginx.com newMessage(); 497*1157Smax.romanov@nginx.com return true; 498*1157Smax.romanov@nginx.com } 499*1157Smax.romanov@nginx.com } 500*1157Smax.romanov@nginx.com } 501*1157Smax.romanov@nginx.com 502*1157Smax.romanov@nginx.com processDataBinary()503*1157Smax.romanov@nginx.com private boolean processDataBinary() throws IOException { 504*1157Smax.romanov@nginx.com // Copy the available data to the buffer 505*1157Smax.romanov@nginx.com TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary); 506*1157Smax.romanov@nginx.com while (!TransformationResult.END_OF_FRAME.equals(tr)) { 507*1157Smax.romanov@nginx.com // Frame not complete - what did we run out of? 508*1157Smax.romanov@nginx.com if (TransformationResult.UNDERFLOW.equals(tr)) { 509*1157Smax.romanov@nginx.com // Ran out of input data - get some more 510*1157Smax.romanov@nginx.com return false; 511*1157Smax.romanov@nginx.com } 512*1157Smax.romanov@nginx.com 513*1157Smax.romanov@nginx.com // Ran out of message buffer - flush it 514*1157Smax.romanov@nginx.com if (!usePartial()) { 515*1157Smax.romanov@nginx.com CloseReason cr = new CloseReason(CloseCodes.TOO_BIG, 516*1157Smax.romanov@nginx.com sm.getString("wsFrame.bufferTooSmall", 517*1157Smax.romanov@nginx.com Integer.valueOf(messageBufferBinary.capacity()), 518*1157Smax.romanov@nginx.com Long.valueOf(payloadLength))); 519*1157Smax.romanov@nginx.com throw new WsIOException(cr); 520*1157Smax.romanov@nginx.com } 521*1157Smax.romanov@nginx.com messageBufferBinary.flip(); 522*1157Smax.romanov@nginx.com ByteBuffer copy = ByteBuffer.allocate(messageBufferBinary.limit()); 523*1157Smax.romanov@nginx.com copy.put(messageBufferBinary); 524*1157Smax.romanov@nginx.com copy.flip(); 525*1157Smax.romanov@nginx.com sendMessageBinary(copy, false); 526*1157Smax.romanov@nginx.com messageBufferBinary.clear(); 527*1157Smax.romanov@nginx.com // Read more data 528*1157Smax.romanov@nginx.com tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary); 529*1157Smax.romanov@nginx.com } 530*1157Smax.romanov@nginx.com 531*1157Smax.romanov@nginx.com // Frame is fully received 532*1157Smax.romanov@nginx.com // Send the message if either: 533*1157Smax.romanov@nginx.com // - partial messages are supported 534*1157Smax.romanov@nginx.com // - the message is complete 535*1157Smax.romanov@nginx.com if (usePartial() || !continuationExpected) { 536*1157Smax.romanov@nginx.com messageBufferBinary.flip(); 537*1157Smax.romanov@nginx.com ByteBuffer copy = ByteBuffer.allocate(messageBufferBinary.limit()); 538*1157Smax.romanov@nginx.com copy.put(messageBufferBinary); 539*1157Smax.romanov@nginx.com copy.flip(); 540*1157Smax.romanov@nginx.com sendMessageBinary(copy, !continuationExpected); 541*1157Smax.romanov@nginx.com messageBufferBinary.clear(); 542*1157Smax.romanov@nginx.com } 543*1157Smax.romanov@nginx.com 544*1157Smax.romanov@nginx.com if (continuationExpected) { 545*1157Smax.romanov@nginx.com // More data for this message expected, start a new frame 546*1157Smax.romanov@nginx.com newFrame(); 547*1157Smax.romanov@nginx.com } else { 548*1157Smax.romanov@nginx.com // Message is complete, start a new message 549*1157Smax.romanov@nginx.com newMessage(); 550*1157Smax.romanov@nginx.com } 551*1157Smax.romanov@nginx.com 552*1157Smax.romanov@nginx.com return true; 553*1157Smax.romanov@nginx.com } 554*1157Smax.romanov@nginx.com 555*1157Smax.romanov@nginx.com handleThrowableOnSend(Throwable t)556*1157Smax.romanov@nginx.com private void handleThrowableOnSend(Throwable t) throws WsIOException { 557*1157Smax.romanov@nginx.com ExceptionUtils.handleThrowable(t); 558*1157Smax.romanov@nginx.com wsSession.getLocal().onError(wsSession, t); 559*1157Smax.romanov@nginx.com CloseReason cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, 560*1157Smax.romanov@nginx.com sm.getString("wsFrame.ioeTriggeredClose")); 561*1157Smax.romanov@nginx.com throw new WsIOException(cr); 562*1157Smax.romanov@nginx.com } 563*1157Smax.romanov@nginx.com 564*1157Smax.romanov@nginx.com 565*1157Smax.romanov@nginx.com @SuppressWarnings("unchecked") sendMessageBinary(ByteBuffer msg, boolean last)566*1157Smax.romanov@nginx.com protected void sendMessageBinary(ByteBuffer msg, boolean last) throws WsIOException { 567*1157Smax.romanov@nginx.com if (binaryMsgHandler instanceof WrappedMessageHandler) { 568*1157Smax.romanov@nginx.com long maxMessageSize = ((WrappedMessageHandler) binaryMsgHandler).getMaxMessageSize(); 569*1157Smax.romanov@nginx.com if (maxMessageSize > -1 && msg.remaining() > maxMessageSize) { 570*1157Smax.romanov@nginx.com throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG, 571*1157Smax.romanov@nginx.com sm.getString("wsFrame.messageTooBig", 572*1157Smax.romanov@nginx.com Long.valueOf(msg.remaining()), 573*1157Smax.romanov@nginx.com Long.valueOf(maxMessageSize)))); 574*1157Smax.romanov@nginx.com } 575*1157Smax.romanov@nginx.com } 576*1157Smax.romanov@nginx.com try { 577*1157Smax.romanov@nginx.com if (binaryMsgHandler instanceof MessageHandler.Partial<?>) { 578*1157Smax.romanov@nginx.com ((MessageHandler.Partial<ByteBuffer>) binaryMsgHandler).onMessage(msg, last); 579*1157Smax.romanov@nginx.com } else { 580*1157Smax.romanov@nginx.com // Caller ensures last == true if this branch is used 581*1157Smax.romanov@nginx.com ((MessageHandler.Whole<ByteBuffer>) binaryMsgHandler).onMessage(msg); 582*1157Smax.romanov@nginx.com } 583*1157Smax.romanov@nginx.com } catch (Throwable t) { 584*1157Smax.romanov@nginx.com handleThrowableOnSend(t); 585*1157Smax.romanov@nginx.com } 586*1157Smax.romanov@nginx.com } 587*1157Smax.romanov@nginx.com 588*1157Smax.romanov@nginx.com newMessage()589*1157Smax.romanov@nginx.com private void newMessage() { 590*1157Smax.romanov@nginx.com messageBufferBinary.clear(); 591*1157Smax.romanov@nginx.com messageBufferText.clear(); 592*1157Smax.romanov@nginx.com utf8DecoderMessage.reset(); 593*1157Smax.romanov@nginx.com continuationExpected = false; 594*1157Smax.romanov@nginx.com newFrame(); 595*1157Smax.romanov@nginx.com } 596*1157Smax.romanov@nginx.com 597*1157Smax.romanov@nginx.com newFrame()598*1157Smax.romanov@nginx.com private void newFrame() { 599*1157Smax.romanov@nginx.com if (inputBuffer.remaining() == 0) { 600*1157Smax.romanov@nginx.com inputBuffer.position(0).limit(0); 601*1157Smax.romanov@nginx.com } 602*1157Smax.romanov@nginx.com 603*1157Smax.romanov@nginx.com maskIndex = 0; 604*1157Smax.romanov@nginx.com payloadWritten = 0; 605*1157Smax.romanov@nginx.com state = State.NEW_FRAME; 606*1157Smax.romanov@nginx.com 607*1157Smax.romanov@nginx.com // These get reset in processInitialHeader() 608*1157Smax.romanov@nginx.com // fin, rsv, opCode, payloadLength, mask 609*1157Smax.romanov@nginx.com 610*1157Smax.romanov@nginx.com checkRoomHeaders(); 611*1157Smax.romanov@nginx.com } 612*1157Smax.romanov@nginx.com 613*1157Smax.romanov@nginx.com checkRoomHeaders()614*1157Smax.romanov@nginx.com private void checkRoomHeaders() { 615*1157Smax.romanov@nginx.com // Is the start of the current frame too near the end of the input 616*1157Smax.romanov@nginx.com // buffer? 617*1157Smax.romanov@nginx.com if (inputBuffer.capacity() - inputBuffer.position() < 131) { 618*1157Smax.romanov@nginx.com // Limit based on a control frame with a full payload 619*1157Smax.romanov@nginx.com makeRoom(); 620*1157Smax.romanov@nginx.com } 621*1157Smax.romanov@nginx.com } 622*1157Smax.romanov@nginx.com 623*1157Smax.romanov@nginx.com checkRoomPayload()624*1157Smax.romanov@nginx.com private void checkRoomPayload() { 625*1157Smax.romanov@nginx.com if (inputBuffer.capacity() - inputBuffer.position() - payloadLength + payloadWritten < 0) { 626*1157Smax.romanov@nginx.com makeRoom(); 627*1157Smax.romanov@nginx.com } 628*1157Smax.romanov@nginx.com } 629*1157Smax.romanov@nginx.com 630*1157Smax.romanov@nginx.com makeRoom()631*1157Smax.romanov@nginx.com private void makeRoom() { 632*1157Smax.romanov@nginx.com inputBuffer.compact(); 633*1157Smax.romanov@nginx.com inputBuffer.flip(); 634*1157Smax.romanov@nginx.com } 635*1157Smax.romanov@nginx.com 636*1157Smax.romanov@nginx.com usePartial()637*1157Smax.romanov@nginx.com private boolean usePartial() { 638*1157Smax.romanov@nginx.com if (Util.isControl(opCode)) { 639*1157Smax.romanov@nginx.com return false; 640*1157Smax.romanov@nginx.com } else if (textMessage) { 641*1157Smax.romanov@nginx.com return textMsgHandler instanceof MessageHandler.Partial; 642*1157Smax.romanov@nginx.com } else { 643*1157Smax.romanov@nginx.com // Must be binary 644*1157Smax.romanov@nginx.com return binaryMsgHandler instanceof MessageHandler.Partial; 645*1157Smax.romanov@nginx.com } 646*1157Smax.romanov@nginx.com } 647*1157Smax.romanov@nginx.com 648*1157Smax.romanov@nginx.com swallowInput()649*1157Smax.romanov@nginx.com private boolean swallowInput() { 650*1157Smax.romanov@nginx.com long toSkip = Math.min(payloadLength - payloadWritten, inputBuffer.remaining()); 651*1157Smax.romanov@nginx.com inputBuffer.position(inputBuffer.position() + (int) toSkip); 652*1157Smax.romanov@nginx.com payloadWritten += toSkip; 653*1157Smax.romanov@nginx.com if (payloadWritten == payloadLength) { 654*1157Smax.romanov@nginx.com if (continuationExpected) { 655*1157Smax.romanov@nginx.com newFrame(); 656*1157Smax.romanov@nginx.com } else { 657*1157Smax.romanov@nginx.com newMessage(); 658*1157Smax.romanov@nginx.com } 659*1157Smax.romanov@nginx.com return true; 660*1157Smax.romanov@nginx.com } else { 661*1157Smax.romanov@nginx.com return false; 662*1157Smax.romanov@nginx.com } 663*1157Smax.romanov@nginx.com } 664*1157Smax.romanov@nginx.com 665*1157Smax.romanov@nginx.com byteArrayToLong(byte[] b, int start, int len)666*1157Smax.romanov@nginx.com protected static long byteArrayToLong(byte[] b, int start, int len) throws IOException { 667*1157Smax.romanov@nginx.com if (len > 8) { 668*1157Smax.romanov@nginx.com throw new IOException(sm.getString("wsFrame.byteToLongFail", Long.valueOf(len))); 669*1157Smax.romanov@nginx.com } 670*1157Smax.romanov@nginx.com int shift = 0; 671*1157Smax.romanov@nginx.com long result = 0; 672*1157Smax.romanov@nginx.com for (int i = start + len - 1; i >= start; i--) { 673*1157Smax.romanov@nginx.com result = result + ((b[i] & 0xFF) << shift); 674*1157Smax.romanov@nginx.com shift += 8; 675*1157Smax.romanov@nginx.com } 676*1157Smax.romanov@nginx.com return result; 677*1157Smax.romanov@nginx.com } 678*1157Smax.romanov@nginx.com 679*1157Smax.romanov@nginx.com isOpen()680*1157Smax.romanov@nginx.com protected boolean isOpen() { 681*1157Smax.romanov@nginx.com return open; 682*1157Smax.romanov@nginx.com } 683*1157Smax.romanov@nginx.com 684*1157Smax.romanov@nginx.com getTransformation()685*1157Smax.romanov@nginx.com protected Transformation getTransformation() { 686*1157Smax.romanov@nginx.com return transformation; 687*1157Smax.romanov@nginx.com } 688*1157Smax.romanov@nginx.com 689*1157Smax.romanov@nginx.com 690*1157Smax.romanov@nginx.com private enum State { 691*1157Smax.romanov@nginx.com NEW_FRAME, PARTIAL_HEADER, DATA 692*1157Smax.romanov@nginx.com } 693*1157Smax.romanov@nginx.com 694*1157Smax.romanov@nginx.com 695*1157Smax.romanov@nginx.com /** 696*1157Smax.romanov@nginx.com * WAITING - not suspended 697*1157Smax.romanov@nginx.com * Server case: waiting for a notification that data 698*1157Smax.romanov@nginx.com * is ready to be read from the socket, the socket is 699*1157Smax.romanov@nginx.com * registered to the poller 700*1157Smax.romanov@nginx.com * Client case: data has been read from the socket and 701*1157Smax.romanov@nginx.com * is waiting for data to be processed 702*1157Smax.romanov@nginx.com * PROCESSING - not suspended 703*1157Smax.romanov@nginx.com * Server case: reading from the socket and processing 704*1157Smax.romanov@nginx.com * the data 705*1157Smax.romanov@nginx.com * Client case: processing the data if such has 706*1157Smax.romanov@nginx.com * already been read and more data will be read from 707*1157Smax.romanov@nginx.com * the socket 708*1157Smax.romanov@nginx.com * SUSPENDING_WAIT - suspended, a call to suspend() was made while in 709*1157Smax.romanov@nginx.com * WAITING state. A call to resume() will do nothing 710*1157Smax.romanov@nginx.com * and will transition to WAITING state 711*1157Smax.romanov@nginx.com * SUSPENDING_PROCESS - suspended, a call to suspend() was made while in 712*1157Smax.romanov@nginx.com * PROCESSING state. A call to resume() will do 713*1157Smax.romanov@nginx.com * nothing and will transition to PROCESSING state 714*1157Smax.romanov@nginx.com * SUSPENDED - suspended 715*1157Smax.romanov@nginx.com * Server case: processing data finished 716*1157Smax.romanov@nginx.com * (SUSPENDING_PROCESS) / a notification was received 717*1157Smax.romanov@nginx.com * that data is ready to be read from the socket 718*1157Smax.romanov@nginx.com * (SUSPENDING_WAIT), socket is not registered to the 719*1157Smax.romanov@nginx.com * poller 720*1157Smax.romanov@nginx.com * Client case: processing data finished 721*1157Smax.romanov@nginx.com * (SUSPENDING_PROCESS) / data has been read from the 722*1157Smax.romanov@nginx.com * socket and is available for processing 723*1157Smax.romanov@nginx.com * (SUSPENDING_WAIT) 724*1157Smax.romanov@nginx.com * A call to resume() will: 725*1157Smax.romanov@nginx.com * Server case: register the socket to the poller 726*1157Smax.romanov@nginx.com * Client case: resume data processing 727*1157Smax.romanov@nginx.com * CLOSING - not suspended, a close will be send 728*1157Smax.romanov@nginx.com * 729*1157Smax.romanov@nginx.com * <pre> 730*1157Smax.romanov@nginx.com * resume data to be resume 731*1157Smax.romanov@nginx.com * no action processed no action 732*1157Smax.romanov@nginx.com * |---------------| |---------------| |----------| 733*1157Smax.romanov@nginx.com * | v | v v | 734*1157Smax.romanov@nginx.com * | |----------WAITING --------PROCESSING----| | 735*1157Smax.romanov@nginx.com * | | ^ processing | | 736*1157Smax.romanov@nginx.com * | | | finished | | 737*1157Smax.romanov@nginx.com * | | | | | 738*1157Smax.romanov@nginx.com * | suspend | suspend | 739*1157Smax.romanov@nginx.com * | | | | | 740*1157Smax.romanov@nginx.com * | | resume | | 741*1157Smax.romanov@nginx.com * | | register socket to poller (server) | | 742*1157Smax.romanov@nginx.com * | | resume data processing (client) | | 743*1157Smax.romanov@nginx.com * | | | | | 744*1157Smax.romanov@nginx.com * | v | v | 745*1157Smax.romanov@nginx.com * SUSPENDING_WAIT | SUSPENDING_PROCESS 746*1157Smax.romanov@nginx.com * | | | 747*1157Smax.romanov@nginx.com * | data available | processing finished | 748*1157Smax.romanov@nginx.com * |------------- SUSPENDED ----------------------| 749*1157Smax.romanov@nginx.com * </pre> 750*1157Smax.romanov@nginx.com */ 751*1157Smax.romanov@nginx.com protected enum ReadState { 752*1157Smax.romanov@nginx.com WAITING (false), 753*1157Smax.romanov@nginx.com PROCESSING (false), 754*1157Smax.romanov@nginx.com SUSPENDING_WAIT (true), 755*1157Smax.romanov@nginx.com SUSPENDING_PROCESS(true), 756*1157Smax.romanov@nginx.com SUSPENDED (true), 757*1157Smax.romanov@nginx.com CLOSING (false); 758*1157Smax.romanov@nginx.com 759*1157Smax.romanov@nginx.com private final boolean isSuspended; 760*1157Smax.romanov@nginx.com ReadState(boolean isSuspended)761*1157Smax.romanov@nginx.com ReadState(boolean isSuspended) { 762*1157Smax.romanov@nginx.com this.isSuspended = isSuspended; 763*1157Smax.romanov@nginx.com } 764*1157Smax.romanov@nginx.com isSuspended()765*1157Smax.romanov@nginx.com public boolean isSuspended() { 766*1157Smax.romanov@nginx.com return isSuspended; 767*1157Smax.romanov@nginx.com } 768*1157Smax.romanov@nginx.com } 769*1157Smax.romanov@nginx.com suspend()770*1157Smax.romanov@nginx.com public void suspend() { 771*1157Smax.romanov@nginx.com while (true) { 772*1157Smax.romanov@nginx.com switch (readState) { 773*1157Smax.romanov@nginx.com case WAITING: 774*1157Smax.romanov@nginx.com if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.WAITING, 775*1157Smax.romanov@nginx.com ReadState.SUSPENDING_WAIT)) { 776*1157Smax.romanov@nginx.com continue; 777*1157Smax.romanov@nginx.com } 778*1157Smax.romanov@nginx.com return; 779*1157Smax.romanov@nginx.com case PROCESSING: 780*1157Smax.romanov@nginx.com if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.PROCESSING, 781*1157Smax.romanov@nginx.com ReadState.SUSPENDING_PROCESS)) { 782*1157Smax.romanov@nginx.com continue; 783*1157Smax.romanov@nginx.com } 784*1157Smax.romanov@nginx.com return; 785*1157Smax.romanov@nginx.com case SUSPENDING_WAIT: 786*1157Smax.romanov@nginx.com if (readState != ReadState.SUSPENDING_WAIT) { 787*1157Smax.romanov@nginx.com continue; 788*1157Smax.romanov@nginx.com } else { 789*1157Smax.romanov@nginx.com if (getLog().isWarnEnabled()) { 790*1157Smax.romanov@nginx.com getLog().warn(sm.getString("wsFrame.suspendRequested")); 791*1157Smax.romanov@nginx.com } 792*1157Smax.romanov@nginx.com } 793*1157Smax.romanov@nginx.com return; 794*1157Smax.romanov@nginx.com case SUSPENDING_PROCESS: 795*1157Smax.romanov@nginx.com if (readState != ReadState.SUSPENDING_PROCESS) { 796*1157Smax.romanov@nginx.com continue; 797*1157Smax.romanov@nginx.com } else { 798*1157Smax.romanov@nginx.com if (getLog().isWarnEnabled()) { 799*1157Smax.romanov@nginx.com getLog().warn(sm.getString("wsFrame.suspendRequested")); 800*1157Smax.romanov@nginx.com } 801*1157Smax.romanov@nginx.com } 802*1157Smax.romanov@nginx.com return; 803*1157Smax.romanov@nginx.com case SUSPENDED: 804*1157Smax.romanov@nginx.com if (readState != ReadState.SUSPENDED) { 805*1157Smax.romanov@nginx.com continue; 806*1157Smax.romanov@nginx.com } else { 807*1157Smax.romanov@nginx.com if (getLog().isWarnEnabled()) { 808*1157Smax.romanov@nginx.com getLog().warn(sm.getString("wsFrame.alreadySuspended")); 809*1157Smax.romanov@nginx.com } 810*1157Smax.romanov@nginx.com } 811*1157Smax.romanov@nginx.com return; 812*1157Smax.romanov@nginx.com case CLOSING: 813*1157Smax.romanov@nginx.com return; 814*1157Smax.romanov@nginx.com default: 815*1157Smax.romanov@nginx.com throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state)); 816*1157Smax.romanov@nginx.com } 817*1157Smax.romanov@nginx.com } 818*1157Smax.romanov@nginx.com } 819*1157Smax.romanov@nginx.com resume()820*1157Smax.romanov@nginx.com public void resume() { 821*1157Smax.romanov@nginx.com while (true) { 822*1157Smax.romanov@nginx.com switch (readState) { 823*1157Smax.romanov@nginx.com case WAITING: 824*1157Smax.romanov@nginx.com if (readState != ReadState.WAITING) { 825*1157Smax.romanov@nginx.com continue; 826*1157Smax.romanov@nginx.com } else { 827*1157Smax.romanov@nginx.com if (getLog().isWarnEnabled()) { 828*1157Smax.romanov@nginx.com getLog().warn(sm.getString("wsFrame.alreadyResumed")); 829*1157Smax.romanov@nginx.com } 830*1157Smax.romanov@nginx.com } 831*1157Smax.romanov@nginx.com return; 832*1157Smax.romanov@nginx.com case PROCESSING: 833*1157Smax.romanov@nginx.com if (readState != ReadState.PROCESSING) { 834*1157Smax.romanov@nginx.com continue; 835*1157Smax.romanov@nginx.com } else { 836*1157Smax.romanov@nginx.com if (getLog().isWarnEnabled()) { 837*1157Smax.romanov@nginx.com getLog().warn(sm.getString("wsFrame.alreadyResumed")); 838*1157Smax.romanov@nginx.com } 839*1157Smax.romanov@nginx.com } 840*1157Smax.romanov@nginx.com return; 841*1157Smax.romanov@nginx.com case SUSPENDING_WAIT: 842*1157Smax.romanov@nginx.com if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDING_WAIT, 843*1157Smax.romanov@nginx.com ReadState.WAITING)) { 844*1157Smax.romanov@nginx.com continue; 845*1157Smax.romanov@nginx.com } 846*1157Smax.romanov@nginx.com return; 847*1157Smax.romanov@nginx.com case SUSPENDING_PROCESS: 848*1157Smax.romanov@nginx.com if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDING_PROCESS, 849*1157Smax.romanov@nginx.com ReadState.PROCESSING)) { 850*1157Smax.romanov@nginx.com continue; 851*1157Smax.romanov@nginx.com } 852*1157Smax.romanov@nginx.com return; 853*1157Smax.romanov@nginx.com case SUSPENDED: 854*1157Smax.romanov@nginx.com if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDED, 855*1157Smax.romanov@nginx.com ReadState.WAITING)) { 856*1157Smax.romanov@nginx.com continue; 857*1157Smax.romanov@nginx.com } 858*1157Smax.romanov@nginx.com resumeProcessing(); 859*1157Smax.romanov@nginx.com return; 860*1157Smax.romanov@nginx.com case CLOSING: 861*1157Smax.romanov@nginx.com return; 862*1157Smax.romanov@nginx.com default: 863*1157Smax.romanov@nginx.com throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state)); 864*1157Smax.romanov@nginx.com } 865*1157Smax.romanov@nginx.com } 866*1157Smax.romanov@nginx.com } 867*1157Smax.romanov@nginx.com isSuspended()868*1157Smax.romanov@nginx.com protected boolean isSuspended() { 869*1157Smax.romanov@nginx.com return readState.isSuspended(); 870*1157Smax.romanov@nginx.com } 871*1157Smax.romanov@nginx.com getReadState()872*1157Smax.romanov@nginx.com protected ReadState getReadState() { 873*1157Smax.romanov@nginx.com return readState; 874*1157Smax.romanov@nginx.com } 875*1157Smax.romanov@nginx.com changeReadState(ReadState newState)876*1157Smax.romanov@nginx.com protected void changeReadState(ReadState newState) { 877*1157Smax.romanov@nginx.com READ_STATE_UPDATER.set(this, newState); 878*1157Smax.romanov@nginx.com } 879*1157Smax.romanov@nginx.com changeReadState(ReadState oldState, ReadState newState)880*1157Smax.romanov@nginx.com protected boolean changeReadState(ReadState oldState, ReadState newState) { 881*1157Smax.romanov@nginx.com return READ_STATE_UPDATER.compareAndSet(this, oldState, newState); 882*1157Smax.romanov@nginx.com } 883*1157Smax.romanov@nginx.com 884*1157Smax.romanov@nginx.com /** 885*1157Smax.romanov@nginx.com * This method will be invoked when the read operation is resumed. 886*1157Smax.romanov@nginx.com * As the suspend of the read operation can be invoked at any time, when 887*1157Smax.romanov@nginx.com * implementing this method one should consider that there might still be 888*1157Smax.romanov@nginx.com * data remaining into the internal buffers that needs to be processed 889*1157Smax.romanov@nginx.com * before reading again from the socket. 890*1157Smax.romanov@nginx.com */ resumeProcessing()891*1157Smax.romanov@nginx.com protected abstract void resumeProcessing(); 892*1157Smax.romanov@nginx.com 893*1157Smax.romanov@nginx.com 894*1157Smax.romanov@nginx.com private abstract class TerminalTransformation implements Transformation { 895*1157Smax.romanov@nginx.com 896*1157Smax.romanov@nginx.com @Override validateRsvBits(int i)897*1157Smax.romanov@nginx.com public boolean validateRsvBits(int i) { 898*1157Smax.romanov@nginx.com // Terminal transformations don't use RSV bits and there is no next 899*1157Smax.romanov@nginx.com // transformation so always return true. 900*1157Smax.romanov@nginx.com return true; 901*1157Smax.romanov@nginx.com } 902*1157Smax.romanov@nginx.com 903*1157Smax.romanov@nginx.com @Override getExtensionResponse()904*1157Smax.romanov@nginx.com public Extension getExtensionResponse() { 905*1157Smax.romanov@nginx.com // Return null since terminal transformations are not extensions 906*1157Smax.romanov@nginx.com return null; 907*1157Smax.romanov@nginx.com } 908*1157Smax.romanov@nginx.com 909*1157Smax.romanov@nginx.com @Override setNext(Transformation t)910*1157Smax.romanov@nginx.com public void setNext(Transformation t) { 911*1157Smax.romanov@nginx.com // NO-OP since this is the terminal transformation 912*1157Smax.romanov@nginx.com } 913*1157Smax.romanov@nginx.com 914*1157Smax.romanov@nginx.com /** 915*1157Smax.romanov@nginx.com * {@inheritDoc} 916*1157Smax.romanov@nginx.com * <p> 917*1157Smax.romanov@nginx.com * Anything other than a value of zero for rsv is invalid. 918*1157Smax.romanov@nginx.com */ 919*1157Smax.romanov@nginx.com @Override validateRsv(int rsv, byte opCode)920*1157Smax.romanov@nginx.com public boolean validateRsv(int rsv, byte opCode) { 921*1157Smax.romanov@nginx.com return rsv == 0; 922*1157Smax.romanov@nginx.com } 923*1157Smax.romanov@nginx.com 924*1157Smax.romanov@nginx.com @Override close()925*1157Smax.romanov@nginx.com public void close() { 926*1157Smax.romanov@nginx.com // NO-OP for the terminal transformations 927*1157Smax.romanov@nginx.com } 928*1157Smax.romanov@nginx.com } 929*1157Smax.romanov@nginx.com 930*1157Smax.romanov@nginx.com 931*1157Smax.romanov@nginx.com /** 932*1157Smax.romanov@nginx.com * For use by the client implementation that needs to obtain payload data 933*1157Smax.romanov@nginx.com * without the need for unmasking. 934*1157Smax.romanov@nginx.com */ 935*1157Smax.romanov@nginx.com private final class NoopTransformation extends TerminalTransformation { 936*1157Smax.romanov@nginx.com 937*1157Smax.romanov@nginx.com @Override getMoreData(byte opCode, boolean fin, int rsv, ByteBuffer dest)938*1157Smax.romanov@nginx.com public TransformationResult getMoreData(byte opCode, boolean fin, int rsv, 939*1157Smax.romanov@nginx.com ByteBuffer dest) { 940*1157Smax.romanov@nginx.com // opCode is ignored as the transformation is the same for all 941*1157Smax.romanov@nginx.com // opCodes 942*1157Smax.romanov@nginx.com // rsv is ignored as it known to be zero at this point 943*1157Smax.romanov@nginx.com long toWrite = Math.min(payloadLength - payloadWritten, inputBuffer.remaining()); 944*1157Smax.romanov@nginx.com toWrite = Math.min(toWrite, dest.remaining()); 945*1157Smax.romanov@nginx.com 946*1157Smax.romanov@nginx.com int orgLimit = inputBuffer.limit(); 947*1157Smax.romanov@nginx.com inputBuffer.limit(inputBuffer.position() + (int) toWrite); 948*1157Smax.romanov@nginx.com dest.put(inputBuffer); 949*1157Smax.romanov@nginx.com inputBuffer.limit(orgLimit); 950*1157Smax.romanov@nginx.com payloadWritten += toWrite; 951*1157Smax.romanov@nginx.com 952*1157Smax.romanov@nginx.com if (payloadWritten == payloadLength) { 953*1157Smax.romanov@nginx.com return TransformationResult.END_OF_FRAME; 954*1157Smax.romanov@nginx.com } else if (inputBuffer.remaining() == 0) { 955*1157Smax.romanov@nginx.com return TransformationResult.UNDERFLOW; 956*1157Smax.romanov@nginx.com } else { 957*1157Smax.romanov@nginx.com // !dest.hasRemaining() 958*1157Smax.romanov@nginx.com return TransformationResult.OVERFLOW; 959*1157Smax.romanov@nginx.com } 960*1157Smax.romanov@nginx.com } 961*1157Smax.romanov@nginx.com 962*1157Smax.romanov@nginx.com 963*1157Smax.romanov@nginx.com @Override sendMessagePart(List<MessagePart> messageParts)964*1157Smax.romanov@nginx.com public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) { 965*1157Smax.romanov@nginx.com // TODO Masking should move to this method 966*1157Smax.romanov@nginx.com // NO-OP send so simply return the message unchanged. 967*1157Smax.romanov@nginx.com return messageParts; 968*1157Smax.romanov@nginx.com } 969*1157Smax.romanov@nginx.com } 970*1157Smax.romanov@nginx.com 971*1157Smax.romanov@nginx.com 972*1157Smax.romanov@nginx.com /** 973*1157Smax.romanov@nginx.com * For use by the server implementation that needs to obtain payload data 974*1157Smax.romanov@nginx.com * and unmask it before any further processing. 975*1157Smax.romanov@nginx.com */ 976*1157Smax.romanov@nginx.com private final class UnmaskTransformation extends TerminalTransformation { 977*1157Smax.romanov@nginx.com 978*1157Smax.romanov@nginx.com @Override getMoreData(byte opCode, boolean fin, int rsv, ByteBuffer dest)979*1157Smax.romanov@nginx.com public TransformationResult getMoreData(byte opCode, boolean fin, int rsv, 980*1157Smax.romanov@nginx.com ByteBuffer dest) { 981*1157Smax.romanov@nginx.com // opCode is ignored as the transformation is the same for all 982*1157Smax.romanov@nginx.com // opCodes 983*1157Smax.romanov@nginx.com // rsv is ignored as it known to be zero at this point 984*1157Smax.romanov@nginx.com while (payloadWritten < payloadLength && inputBuffer.remaining() > 0 && 985*1157Smax.romanov@nginx.com dest.hasRemaining()) { 986*1157Smax.romanov@nginx.com byte b = (byte) ((inputBuffer.get() ^ mask[maskIndex]) & 0xFF); 987*1157Smax.romanov@nginx.com maskIndex++; 988*1157Smax.romanov@nginx.com if (maskIndex == 4) { 989*1157Smax.romanov@nginx.com maskIndex = 0; 990*1157Smax.romanov@nginx.com } 991*1157Smax.romanov@nginx.com payloadWritten++; 992*1157Smax.romanov@nginx.com dest.put(b); 993*1157Smax.romanov@nginx.com } 994*1157Smax.romanov@nginx.com if (payloadWritten == payloadLength) { 995*1157Smax.romanov@nginx.com return TransformationResult.END_OF_FRAME; 996*1157Smax.romanov@nginx.com } else if (inputBuffer.remaining() == 0) { 997*1157Smax.romanov@nginx.com return TransformationResult.UNDERFLOW; 998*1157Smax.romanov@nginx.com } else { 999*1157Smax.romanov@nginx.com // !dest.hasRemaining() 1000*1157Smax.romanov@nginx.com return TransformationResult.OVERFLOW; 1001*1157Smax.romanov@nginx.com } 1002*1157Smax.romanov@nginx.com } 1003*1157Smax.romanov@nginx.com 1004*1157Smax.romanov@nginx.com @Override sendMessagePart(List<MessagePart> messageParts)1005*1157Smax.romanov@nginx.com public List<MessagePart> sendMessagePart(List<MessagePart> messageParts) { 1006*1157Smax.romanov@nginx.com // NO-OP send so simply return the message unchanged. 1007*1157Smax.romanov@nginx.com return messageParts; 1008*1157Smax.romanov@nginx.com } 1009*1157Smax.romanov@nginx.com } 1010*1157Smax.romanov@nginx.com } 1011