11157Smax.romanov@nginx.com /* 21157Smax.romanov@nginx.com * Licensed to the Apache Software Foundation (ASF) under one or more 31157Smax.romanov@nginx.com * contributor license agreements. See the NOTICE file distributed with 41157Smax.romanov@nginx.com * this work for additional information regarding copyright ownership. 51157Smax.romanov@nginx.com * The ASF licenses this file to You under the Apache License, Version 2.0 61157Smax.romanov@nginx.com * (the "License"); you may not use this file except in compliance with 71157Smax.romanov@nginx.com * the License. You may obtain a copy of the License at 81157Smax.romanov@nginx.com * 91157Smax.romanov@nginx.com * http://www.apache.org/licenses/LICENSE-2.0 101157Smax.romanov@nginx.com * 111157Smax.romanov@nginx.com * Unless required by applicable law or agreed to in writing, software 121157Smax.romanov@nginx.com * distributed under the License is distributed on an "AS IS" BASIS, 131157Smax.romanov@nginx.com * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 141157Smax.romanov@nginx.com * See the License for the specific language governing permissions and 151157Smax.romanov@nginx.com * limitations under the License. 161157Smax.romanov@nginx.com */ 171157Smax.romanov@nginx.com package nginx.unit.websocket; 181157Smax.romanov@nginx.com 191157Smax.romanov@nginx.com import java.io.IOException; 201157Smax.romanov@nginx.com import java.io.OutputStream; 211157Smax.romanov@nginx.com import java.io.Writer; 221157Smax.romanov@nginx.com import java.net.SocketTimeoutException; 231157Smax.romanov@nginx.com import java.nio.ByteBuffer; 241157Smax.romanov@nginx.com import java.nio.CharBuffer; 251157Smax.romanov@nginx.com import java.nio.charset.CharsetEncoder; 261157Smax.romanov@nginx.com import java.nio.charset.CoderResult; 271157Smax.romanov@nginx.com import java.util.ArrayDeque; 281157Smax.romanov@nginx.com import java.util.ArrayList; 291157Smax.romanov@nginx.com import java.util.List; 301157Smax.romanov@nginx.com import java.util.Queue; 311157Smax.romanov@nginx.com import java.util.concurrent.Future; 321157Smax.romanov@nginx.com import java.util.concurrent.Semaphore; 331157Smax.romanov@nginx.com import java.util.concurrent.TimeUnit; 341157Smax.romanov@nginx.com import java.util.concurrent.atomic.AtomicBoolean; 351157Smax.romanov@nginx.com 361157Smax.romanov@nginx.com import javax.websocket.CloseReason; 371157Smax.romanov@nginx.com import javax.websocket.CloseReason.CloseCodes; 381157Smax.romanov@nginx.com import javax.websocket.DeploymentException; 391157Smax.romanov@nginx.com import javax.websocket.EncodeException; 401157Smax.romanov@nginx.com import javax.websocket.Encoder; 411157Smax.romanov@nginx.com import javax.websocket.EndpointConfig; 421157Smax.romanov@nginx.com import javax.websocket.RemoteEndpoint; 431157Smax.romanov@nginx.com import javax.websocket.SendHandler; 441157Smax.romanov@nginx.com import javax.websocket.SendResult; 451157Smax.romanov@nginx.com 461157Smax.romanov@nginx.com import org.apache.juli.logging.Log; 471157Smax.romanov@nginx.com import org.apache.juli.logging.LogFactory; 481157Smax.romanov@nginx.com import org.apache.tomcat.util.buf.Utf8Encoder; 491157Smax.romanov@nginx.com import org.apache.tomcat.util.res.StringManager; 501157Smax.romanov@nginx.com 511157Smax.romanov@nginx.com import nginx.unit.Request; 521157Smax.romanov@nginx.com 531157Smax.romanov@nginx.com public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint { 541157Smax.romanov@nginx.com 551157Smax.romanov@nginx.com private static final StringManager sm = 561157Smax.romanov@nginx.com StringManager.getManager(WsRemoteEndpointImplBase.class); 571157Smax.romanov@nginx.com 581157Smax.romanov@nginx.com protected static final SendResult SENDRESULT_OK = new SendResult(); 591157Smax.romanov@nginx.com 601157Smax.romanov@nginx.com private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class); // must not be static 611157Smax.romanov@nginx.com 621157Smax.romanov@nginx.com private final StateMachine stateMachine = new StateMachine(); 631157Smax.romanov@nginx.com 641157Smax.romanov@nginx.com private final IntermediateMessageHandler intermediateMessageHandler = 651157Smax.romanov@nginx.com new IntermediateMessageHandler(this); 661157Smax.romanov@nginx.com 671157Smax.romanov@nginx.com private Transformation transformation = null; 681157Smax.romanov@nginx.com private final Semaphore messagePartInProgress = new Semaphore(1); 691157Smax.romanov@nginx.com private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); 701157Smax.romanov@nginx.com private final Object messagePartLock = new Object(); 711157Smax.romanov@nginx.com 721157Smax.romanov@nginx.com // State 731157Smax.romanov@nginx.com private volatile boolean closed = false; 741157Smax.romanov@nginx.com private boolean fragmented = false; 751157Smax.romanov@nginx.com private boolean nextFragmented = false; 761157Smax.romanov@nginx.com private boolean text = false; 771157Smax.romanov@nginx.com private boolean nextText = false; 781157Smax.romanov@nginx.com 791157Smax.romanov@nginx.com // Max size of WebSocket header is 14 bytes 801157Smax.romanov@nginx.com private final ByteBuffer headerBuffer = ByteBuffer.allocate(14); 811157Smax.romanov@nginx.com private final ByteBuffer outputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); 821157Smax.romanov@nginx.com private final CharsetEncoder encoder = new Utf8Encoder(); 831157Smax.romanov@nginx.com private final ByteBuffer encoderBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); 841157Smax.romanov@nginx.com private final AtomicBoolean batchingAllowed = new AtomicBoolean(false); 851157Smax.romanov@nginx.com private volatile long sendTimeout = -1; 861157Smax.romanov@nginx.com private WsSession wsSession; 871157Smax.romanov@nginx.com private List<EncoderEntry> encoderEntries = new ArrayList<>(); 881157Smax.romanov@nginx.com 891157Smax.romanov@nginx.com private Request request; 901157Smax.romanov@nginx.com 911157Smax.romanov@nginx.com setTransformation(Transformation transformation)921157Smax.romanov@nginx.com protected void setTransformation(Transformation transformation) { 931157Smax.romanov@nginx.com this.transformation = transformation; 941157Smax.romanov@nginx.com } 951157Smax.romanov@nginx.com 961157Smax.romanov@nginx.com getSendTimeout()971157Smax.romanov@nginx.com public long getSendTimeout() { 981157Smax.romanov@nginx.com return sendTimeout; 991157Smax.romanov@nginx.com } 1001157Smax.romanov@nginx.com 1011157Smax.romanov@nginx.com setSendTimeout(long timeout)1021157Smax.romanov@nginx.com public void setSendTimeout(long timeout) { 1031157Smax.romanov@nginx.com this.sendTimeout = timeout; 1041157Smax.romanov@nginx.com } 1051157Smax.romanov@nginx.com 1061157Smax.romanov@nginx.com 1071157Smax.romanov@nginx.com @Override setBatchingAllowed(boolean batchingAllowed)1081157Smax.romanov@nginx.com public void setBatchingAllowed(boolean batchingAllowed) throws IOException { 1091157Smax.romanov@nginx.com boolean oldValue = this.batchingAllowed.getAndSet(batchingAllowed); 1101157Smax.romanov@nginx.com 1111157Smax.romanov@nginx.com if (oldValue && !batchingAllowed) { 1121157Smax.romanov@nginx.com flushBatch(); 1131157Smax.romanov@nginx.com } 1141157Smax.romanov@nginx.com } 1151157Smax.romanov@nginx.com 1161157Smax.romanov@nginx.com 1171157Smax.romanov@nginx.com @Override getBatchingAllowed()1181157Smax.romanov@nginx.com public boolean getBatchingAllowed() { 1191157Smax.romanov@nginx.com return batchingAllowed.get(); 1201157Smax.romanov@nginx.com } 1211157Smax.romanov@nginx.com 1221157Smax.romanov@nginx.com 1231157Smax.romanov@nginx.com @Override flushBatch()1241157Smax.romanov@nginx.com public void flushBatch() throws IOException { 1251157Smax.romanov@nginx.com sendMessageBlock(Constants.INTERNAL_OPCODE_FLUSH, null, true); 1261157Smax.romanov@nginx.com } 1271157Smax.romanov@nginx.com 1281157Smax.romanov@nginx.com sendBytes(ByteBuffer data)1291157Smax.romanov@nginx.com public void sendBytes(ByteBuffer data) throws IOException { 1301157Smax.romanov@nginx.com if (data == null) { 1311157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 1321157Smax.romanov@nginx.com } 1331157Smax.romanov@nginx.com stateMachine.binaryStart(); 1341157Smax.romanov@nginx.com sendMessageBlock(Constants.OPCODE_BINARY, data, true); 1351157Smax.romanov@nginx.com stateMachine.complete(true); 1361157Smax.romanov@nginx.com } 1371157Smax.romanov@nginx.com 1381157Smax.romanov@nginx.com sendBytesByFuture(ByteBuffer data)1391157Smax.romanov@nginx.com public Future<Void> sendBytesByFuture(ByteBuffer data) { 1401157Smax.romanov@nginx.com FutureToSendHandler f2sh = new FutureToSendHandler(wsSession); 1411157Smax.romanov@nginx.com sendBytesByCompletion(data, f2sh); 1421157Smax.romanov@nginx.com return f2sh; 1431157Smax.romanov@nginx.com } 1441157Smax.romanov@nginx.com 1451157Smax.romanov@nginx.com sendBytesByCompletion(ByteBuffer data, SendHandler handler)1461157Smax.romanov@nginx.com public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) { 1471157Smax.romanov@nginx.com if (data == null) { 1481157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 1491157Smax.romanov@nginx.com } 1501157Smax.romanov@nginx.com if (handler == null) { 1511157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler")); 1521157Smax.romanov@nginx.com } 1531157Smax.romanov@nginx.com StateUpdateSendHandler sush = new StateUpdateSendHandler(handler, stateMachine); 1541157Smax.romanov@nginx.com stateMachine.binaryStart(); 1551157Smax.romanov@nginx.com startMessage(Constants.OPCODE_BINARY, data, true, sush); 1561157Smax.romanov@nginx.com } 1571157Smax.romanov@nginx.com 1581157Smax.romanov@nginx.com sendPartialBytes(ByteBuffer partialByte, boolean last)1591157Smax.romanov@nginx.com public void sendPartialBytes(ByteBuffer partialByte, boolean last) 1601157Smax.romanov@nginx.com throws IOException { 1611157Smax.romanov@nginx.com if (partialByte == null) { 1621157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 1631157Smax.romanov@nginx.com } 1641157Smax.romanov@nginx.com stateMachine.binaryPartialStart(); 1651157Smax.romanov@nginx.com sendMessageBlock(Constants.OPCODE_BINARY, partialByte, last); 1661157Smax.romanov@nginx.com stateMachine.complete(last); 1671157Smax.romanov@nginx.com } 1681157Smax.romanov@nginx.com 1691157Smax.romanov@nginx.com 1701157Smax.romanov@nginx.com @Override sendPing(ByteBuffer applicationData)1711157Smax.romanov@nginx.com public void sendPing(ByteBuffer applicationData) throws IOException, 1721157Smax.romanov@nginx.com IllegalArgumentException { 1731157Smax.romanov@nginx.com if (applicationData.remaining() > 125) { 1741157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.tooMuchData")); 1751157Smax.romanov@nginx.com } 1761157Smax.romanov@nginx.com sendMessageBlock(Constants.OPCODE_PING, applicationData, true); 1771157Smax.romanov@nginx.com } 1781157Smax.romanov@nginx.com 1791157Smax.romanov@nginx.com 1801157Smax.romanov@nginx.com @Override sendPong(ByteBuffer applicationData)1811157Smax.romanov@nginx.com public void sendPong(ByteBuffer applicationData) throws IOException, 1821157Smax.romanov@nginx.com IllegalArgumentException { 1831157Smax.romanov@nginx.com if (applicationData.remaining() > 125) { 1841157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.tooMuchData")); 1851157Smax.romanov@nginx.com } 1861157Smax.romanov@nginx.com sendMessageBlock(Constants.OPCODE_PONG, applicationData, true); 1871157Smax.romanov@nginx.com } 1881157Smax.romanov@nginx.com 1891157Smax.romanov@nginx.com sendString(String text)1901157Smax.romanov@nginx.com public void sendString(String text) throws IOException { 1911157Smax.romanov@nginx.com if (text == null) { 1921157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 1931157Smax.romanov@nginx.com } 1941157Smax.romanov@nginx.com stateMachine.textStart(); 1951157Smax.romanov@nginx.com sendMessageBlock(CharBuffer.wrap(text), true); 1961157Smax.romanov@nginx.com } 1971157Smax.romanov@nginx.com 1981157Smax.romanov@nginx.com sendStringByFuture(String text)1991157Smax.romanov@nginx.com public Future<Void> sendStringByFuture(String text) { 2001157Smax.romanov@nginx.com FutureToSendHandler f2sh = new FutureToSendHandler(wsSession); 2011157Smax.romanov@nginx.com sendStringByCompletion(text, f2sh); 2021157Smax.romanov@nginx.com return f2sh; 2031157Smax.romanov@nginx.com } 2041157Smax.romanov@nginx.com 2051157Smax.romanov@nginx.com sendStringByCompletion(String text, SendHandler handler)2061157Smax.romanov@nginx.com public void sendStringByCompletion(String text, SendHandler handler) { 2071157Smax.romanov@nginx.com if (text == null) { 2081157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 2091157Smax.romanov@nginx.com } 2101157Smax.romanov@nginx.com if (handler == null) { 2111157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler")); 2121157Smax.romanov@nginx.com } 2131157Smax.romanov@nginx.com stateMachine.textStart(); 2141157Smax.romanov@nginx.com TextMessageSendHandler tmsh = new TextMessageSendHandler(handler, 2151157Smax.romanov@nginx.com CharBuffer.wrap(text), true, encoder, encoderBuffer, this); 2161157Smax.romanov@nginx.com tmsh.write(); 2171157Smax.romanov@nginx.com // TextMessageSendHandler will update stateMachine when it completes 2181157Smax.romanov@nginx.com } 2191157Smax.romanov@nginx.com 2201157Smax.romanov@nginx.com sendPartialString(String fragment, boolean isLast)2211157Smax.romanov@nginx.com public void sendPartialString(String fragment, boolean isLast) 2221157Smax.romanov@nginx.com throws IOException { 2231157Smax.romanov@nginx.com if (fragment == null) { 2241157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 2251157Smax.romanov@nginx.com } 2261157Smax.romanov@nginx.com stateMachine.textPartialStart(); 2271157Smax.romanov@nginx.com sendMessageBlock(CharBuffer.wrap(fragment), isLast); 2281157Smax.romanov@nginx.com } 2291157Smax.romanov@nginx.com 2301157Smax.romanov@nginx.com getSendStream()2311157Smax.romanov@nginx.com public OutputStream getSendStream() { 2321157Smax.romanov@nginx.com stateMachine.streamStart(); 2331157Smax.romanov@nginx.com return new WsOutputStream(this); 2341157Smax.romanov@nginx.com } 2351157Smax.romanov@nginx.com 2361157Smax.romanov@nginx.com getSendWriter()2371157Smax.romanov@nginx.com public Writer getSendWriter() { 2381157Smax.romanov@nginx.com stateMachine.writeStart(); 2391157Smax.romanov@nginx.com return new WsWriter(this); 2401157Smax.romanov@nginx.com } 2411157Smax.romanov@nginx.com 2421157Smax.romanov@nginx.com sendMessageBlock(CharBuffer part, boolean last)2431157Smax.romanov@nginx.com void sendMessageBlock(CharBuffer part, boolean last) throws IOException { 2441157Smax.romanov@nginx.com long timeoutExpiry = getTimeoutExpiry(); 2451157Smax.romanov@nginx.com boolean isDone = false; 2461157Smax.romanov@nginx.com while (!isDone) { 2471157Smax.romanov@nginx.com encoderBuffer.clear(); 2481157Smax.romanov@nginx.com CoderResult cr = encoder.encode(part, encoderBuffer, true); 2491157Smax.romanov@nginx.com if (cr.isError()) { 2501157Smax.romanov@nginx.com throw new IllegalArgumentException(cr.toString()); 2511157Smax.romanov@nginx.com } 2521157Smax.romanov@nginx.com isDone = !cr.isOverflow(); 2531157Smax.romanov@nginx.com encoderBuffer.flip(); 2541157Smax.romanov@nginx.com sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeoutExpiry); 2551157Smax.romanov@nginx.com } 2561157Smax.romanov@nginx.com stateMachine.complete(last); 2571157Smax.romanov@nginx.com } 2581157Smax.romanov@nginx.com 2591157Smax.romanov@nginx.com sendMessageBlock(byte opCode, ByteBuffer payload, boolean last)2601157Smax.romanov@nginx.com void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last) 2611157Smax.romanov@nginx.com throws IOException { 2621157Smax.romanov@nginx.com sendMessageBlock(opCode, payload, last, getTimeoutExpiry()); 2631157Smax.romanov@nginx.com } 2641157Smax.romanov@nginx.com 2651157Smax.romanov@nginx.com getTimeoutExpiry()2661157Smax.romanov@nginx.com private long getTimeoutExpiry() { 2671157Smax.romanov@nginx.com // Get the timeout before we send the message. The message may 2681157Smax.romanov@nginx.com // trigger a session close and depending on timing the client 2691157Smax.romanov@nginx.com // session may close before we can read the timeout. 2701157Smax.romanov@nginx.com long timeout = getBlockingSendTimeout(); 2711157Smax.romanov@nginx.com if (timeout < 0) { 2721157Smax.romanov@nginx.com return Long.MAX_VALUE; 2731157Smax.romanov@nginx.com } else { 2741157Smax.romanov@nginx.com return System.currentTimeMillis() + timeout; 2751157Smax.romanov@nginx.com } 2761157Smax.romanov@nginx.com } 2771157Smax.romanov@nginx.com 2781157Smax.romanov@nginx.com private byte currentOpCode = Constants.OPCODE_CONTINUATION; 2791157Smax.romanov@nginx.com sendMessageBlock(byte opCode, ByteBuffer payload, boolean last, long timeoutExpiry)2801157Smax.romanov@nginx.com private void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last, 2811157Smax.romanov@nginx.com long timeoutExpiry) throws IOException { 2821157Smax.romanov@nginx.com wsSession.updateLastActive(); 2831157Smax.romanov@nginx.com 2841157Smax.romanov@nginx.com if (opCode == currentOpCode) { 2851157Smax.romanov@nginx.com opCode = Constants.OPCODE_CONTINUATION; 2861157Smax.romanov@nginx.com } 2871157Smax.romanov@nginx.com 2881157Smax.romanov@nginx.com request.sendWsFrame(payload, opCode, last, timeoutExpiry); 2891157Smax.romanov@nginx.com 2901157Smax.romanov@nginx.com if (!last && opCode != Constants.OPCODE_CONTINUATION) { 2911157Smax.romanov@nginx.com currentOpCode = opCode; 2921157Smax.romanov@nginx.com } 2931157Smax.romanov@nginx.com 2941157Smax.romanov@nginx.com if (last && opCode == Constants.OPCODE_CONTINUATION) { 2951157Smax.romanov@nginx.com currentOpCode = Constants.OPCODE_CONTINUATION; 2961157Smax.romanov@nginx.com } 2971157Smax.romanov@nginx.com } 2981157Smax.romanov@nginx.com 2991157Smax.romanov@nginx.com startMessage(byte opCode, ByteBuffer payload, boolean last, SendHandler handler)3001157Smax.romanov@nginx.com void startMessage(byte opCode, ByteBuffer payload, boolean last, 3011157Smax.romanov@nginx.com SendHandler handler) { 3021157Smax.romanov@nginx.com 3031157Smax.romanov@nginx.com wsSession.updateLastActive(); 3041157Smax.romanov@nginx.com 3051157Smax.romanov@nginx.com List<MessagePart> messageParts = new ArrayList<>(); 3061157Smax.romanov@nginx.com messageParts.add(new MessagePart(last, 0, opCode, payload, 3071157Smax.romanov@nginx.com intermediateMessageHandler, 3081157Smax.romanov@nginx.com new EndMessageHandler(this, handler), -1)); 3091157Smax.romanov@nginx.com 3101157Smax.romanov@nginx.com messageParts = transformation.sendMessagePart(messageParts); 3111157Smax.romanov@nginx.com 3121157Smax.romanov@nginx.com // Some extensions/transformations may buffer messages so it is possible 3131157Smax.romanov@nginx.com // that no message parts will be returned. If this is the case the 3141157Smax.romanov@nginx.com // trigger the supplied SendHandler 3151157Smax.romanov@nginx.com if (messageParts.size() == 0) { 3161157Smax.romanov@nginx.com handler.onResult(new SendResult()); 3171157Smax.romanov@nginx.com return; 3181157Smax.romanov@nginx.com } 3191157Smax.romanov@nginx.com 3201157Smax.romanov@nginx.com MessagePart mp = messageParts.remove(0); 3211157Smax.romanov@nginx.com 3221157Smax.romanov@nginx.com boolean doWrite = false; 3231157Smax.romanov@nginx.com synchronized (messagePartLock) { 3241157Smax.romanov@nginx.com if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed()) { 3251157Smax.romanov@nginx.com // Should not happen. To late to send batched messages now since 3261157Smax.romanov@nginx.com // the session has been closed. Complain loudly. 3271157Smax.romanov@nginx.com log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed")); 3281157Smax.romanov@nginx.com } 3291157Smax.romanov@nginx.com if (messagePartInProgress.tryAcquire()) { 3301157Smax.romanov@nginx.com doWrite = true; 3311157Smax.romanov@nginx.com } else { 3321157Smax.romanov@nginx.com // When a control message is sent while another message is being 3331157Smax.romanov@nginx.com // sent, the control message is queued. Chances are the 3341157Smax.romanov@nginx.com // subsequent data message part will end up queued while the 3351157Smax.romanov@nginx.com // control message is sent. The logic in this class (state 3361157Smax.romanov@nginx.com // machine, EndMessageHandler, TextMessageSendHandler) ensures 3371157Smax.romanov@nginx.com // that there will only ever be one data message part in the 3381157Smax.romanov@nginx.com // queue. There could be multiple control messages in the queue. 3391157Smax.romanov@nginx.com 3401157Smax.romanov@nginx.com // Add it to the queue 3411157Smax.romanov@nginx.com messagePartQueue.add(mp); 3421157Smax.romanov@nginx.com } 3431157Smax.romanov@nginx.com // Add any remaining messages to the queue 3441157Smax.romanov@nginx.com messagePartQueue.addAll(messageParts); 3451157Smax.romanov@nginx.com } 3461157Smax.romanov@nginx.com if (doWrite) { 3471157Smax.romanov@nginx.com // Actual write has to be outside sync block to avoid possible 3481157Smax.romanov@nginx.com // deadlock between messagePartLock and writeLock in 3491157Smax.romanov@nginx.com // o.a.coyote.http11.upgrade.AbstractServletOutputStream 3501157Smax.romanov@nginx.com writeMessagePart(mp); 3511157Smax.romanov@nginx.com } 3521157Smax.romanov@nginx.com } 3531157Smax.romanov@nginx.com 3541157Smax.romanov@nginx.com endMessage(SendHandler handler, SendResult result)3551157Smax.romanov@nginx.com void endMessage(SendHandler handler, SendResult result) { 3561157Smax.romanov@nginx.com boolean doWrite = false; 3571157Smax.romanov@nginx.com MessagePart mpNext = null; 3581157Smax.romanov@nginx.com synchronized (messagePartLock) { 3591157Smax.romanov@nginx.com 3601157Smax.romanov@nginx.com fragmented = nextFragmented; 3611157Smax.romanov@nginx.com text = nextText; 3621157Smax.romanov@nginx.com 3631157Smax.romanov@nginx.com mpNext = messagePartQueue.poll(); 3641157Smax.romanov@nginx.com if (mpNext == null) { 3651157Smax.romanov@nginx.com messagePartInProgress.release(); 3661157Smax.romanov@nginx.com } else if (!closed){ 3671157Smax.romanov@nginx.com // Session may have been closed unexpectedly in the middle of 3681157Smax.romanov@nginx.com // sending a fragmented message closing the endpoint. If this 3691157Smax.romanov@nginx.com // happens, clearly there is no point trying to send the rest of 3701157Smax.romanov@nginx.com // the message. 3711157Smax.romanov@nginx.com doWrite = true; 3721157Smax.romanov@nginx.com } 3731157Smax.romanov@nginx.com } 3741157Smax.romanov@nginx.com if (doWrite) { 3751157Smax.romanov@nginx.com // Actual write has to be outside sync block to avoid possible 3761157Smax.romanov@nginx.com // deadlock between messagePartLock and writeLock in 3771157Smax.romanov@nginx.com // o.a.coyote.http11.upgrade.AbstractServletOutputStream 3781157Smax.romanov@nginx.com writeMessagePart(mpNext); 3791157Smax.romanov@nginx.com } 3801157Smax.romanov@nginx.com 3811157Smax.romanov@nginx.com wsSession.updateLastActive(); 3821157Smax.romanov@nginx.com 3831157Smax.romanov@nginx.com // Some handlers, such as the IntermediateMessageHandler, do not have a 3841157Smax.romanov@nginx.com // nested handler so handler may be null. 3851157Smax.romanov@nginx.com if (handler != null) { 3861157Smax.romanov@nginx.com handler.onResult(result); 3871157Smax.romanov@nginx.com } 3881157Smax.romanov@nginx.com } 3891157Smax.romanov@nginx.com 3901157Smax.romanov@nginx.com writeMessagePart(MessagePart mp)3911157Smax.romanov@nginx.com void writeMessagePart(MessagePart mp) { 3921157Smax.romanov@nginx.com if (closed) { 3931157Smax.romanov@nginx.com throw new IllegalStateException( 3941157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closed")); 3951157Smax.romanov@nginx.com } 3961157Smax.romanov@nginx.com 3971157Smax.romanov@nginx.com if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { 3981157Smax.romanov@nginx.com nextFragmented = fragmented; 3991157Smax.romanov@nginx.com nextText = text; 4001157Smax.romanov@nginx.com outputBuffer.flip(); 4011157Smax.romanov@nginx.com SendHandler flushHandler = new OutputBufferFlushSendHandler( 4021157Smax.romanov@nginx.com outputBuffer, mp.getEndHandler()); 4031157Smax.romanov@nginx.com doWrite(flushHandler, mp.getBlockingWriteTimeoutExpiry(), outputBuffer); 4041157Smax.romanov@nginx.com return; 4051157Smax.romanov@nginx.com } 4061157Smax.romanov@nginx.com 4071157Smax.romanov@nginx.com // Control messages may be sent in the middle of fragmented message 4081157Smax.romanov@nginx.com // so they have no effect on the fragmented or text flags 4091157Smax.romanov@nginx.com boolean first; 4101157Smax.romanov@nginx.com if (Util.isControl(mp.getOpCode())) { 4111157Smax.romanov@nginx.com nextFragmented = fragmented; 4121157Smax.romanov@nginx.com nextText = text; 4131157Smax.romanov@nginx.com if (mp.getOpCode() == Constants.OPCODE_CLOSE) { 4141157Smax.romanov@nginx.com closed = true; 4151157Smax.romanov@nginx.com } 4161157Smax.romanov@nginx.com first = true; 4171157Smax.romanov@nginx.com } else { 4181157Smax.romanov@nginx.com boolean isText = Util.isText(mp.getOpCode()); 4191157Smax.romanov@nginx.com 4201157Smax.romanov@nginx.com if (fragmented) { 4211157Smax.romanov@nginx.com // Currently fragmented 4221157Smax.romanov@nginx.com if (text != isText) { 4231157Smax.romanov@nginx.com throw new IllegalStateException( 4241157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.changeType")); 4251157Smax.romanov@nginx.com } 4261157Smax.romanov@nginx.com nextText = text; 4271157Smax.romanov@nginx.com nextFragmented = !mp.isFin(); 4281157Smax.romanov@nginx.com first = false; 4291157Smax.romanov@nginx.com } else { 4301157Smax.romanov@nginx.com // Wasn't fragmented. Might be now 4311157Smax.romanov@nginx.com if (mp.isFin()) { 4321157Smax.romanov@nginx.com nextFragmented = false; 4331157Smax.romanov@nginx.com } else { 4341157Smax.romanov@nginx.com nextFragmented = true; 4351157Smax.romanov@nginx.com nextText = isText; 4361157Smax.romanov@nginx.com } 4371157Smax.romanov@nginx.com first = true; 4381157Smax.romanov@nginx.com } 4391157Smax.romanov@nginx.com } 4401157Smax.romanov@nginx.com 4411157Smax.romanov@nginx.com byte[] mask; 4421157Smax.romanov@nginx.com 4431157Smax.romanov@nginx.com if (isMasked()) { 4441157Smax.romanov@nginx.com mask = Util.generateMask(); 4451157Smax.romanov@nginx.com } else { 4461157Smax.romanov@nginx.com mask = null; 4471157Smax.romanov@nginx.com } 4481157Smax.romanov@nginx.com 4491157Smax.romanov@nginx.com headerBuffer.clear(); 4501157Smax.romanov@nginx.com writeHeader(headerBuffer, mp.isFin(), mp.getRsv(), mp.getOpCode(), 4511157Smax.romanov@nginx.com isMasked(), mp.getPayload(), mask, first); 4521157Smax.romanov@nginx.com headerBuffer.flip(); 4531157Smax.romanov@nginx.com 4541157Smax.romanov@nginx.com if (getBatchingAllowed() || isMasked()) { 4551157Smax.romanov@nginx.com // Need to write via output buffer 4561157Smax.romanov@nginx.com OutputBufferSendHandler obsh = new OutputBufferSendHandler( 4571157Smax.romanov@nginx.com mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(), 4581157Smax.romanov@nginx.com headerBuffer, mp.getPayload(), mask, 4591157Smax.romanov@nginx.com outputBuffer, !getBatchingAllowed(), this); 4601157Smax.romanov@nginx.com obsh.write(); 4611157Smax.romanov@nginx.com } else { 4621157Smax.romanov@nginx.com // Can write directly 4631157Smax.romanov@nginx.com doWrite(mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(), 4641157Smax.romanov@nginx.com headerBuffer, mp.getPayload()); 4651157Smax.romanov@nginx.com } 4661157Smax.romanov@nginx.com } 4671157Smax.romanov@nginx.com 4681157Smax.romanov@nginx.com getBlockingSendTimeout()4691157Smax.romanov@nginx.com private long getBlockingSendTimeout() { 4701157Smax.romanov@nginx.com Object obj = wsSession.getUserProperties().get(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY); 4711157Smax.romanov@nginx.com Long userTimeout = null; 4721157Smax.romanov@nginx.com if (obj instanceof Long) { 4731157Smax.romanov@nginx.com userTimeout = (Long) obj; 4741157Smax.romanov@nginx.com } 4751157Smax.romanov@nginx.com if (userTimeout == null) { 4761157Smax.romanov@nginx.com return Constants.DEFAULT_BLOCKING_SEND_TIMEOUT; 4771157Smax.romanov@nginx.com } else { 4781157Smax.romanov@nginx.com return userTimeout.longValue(); 4791157Smax.romanov@nginx.com } 4801157Smax.romanov@nginx.com } 4811157Smax.romanov@nginx.com 4821157Smax.romanov@nginx.com 4831157Smax.romanov@nginx.com /** 4841157Smax.romanov@nginx.com * Wraps the user provided handler so that the end point is notified when 4851157Smax.romanov@nginx.com * the message is complete. 4861157Smax.romanov@nginx.com */ 4871157Smax.romanov@nginx.com private static class EndMessageHandler implements SendHandler { 4881157Smax.romanov@nginx.com 4891157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 4901157Smax.romanov@nginx.com private final SendHandler handler; 4911157Smax.romanov@nginx.com EndMessageHandler(WsRemoteEndpointImplBase endpoint, SendHandler handler)4921157Smax.romanov@nginx.com public EndMessageHandler(WsRemoteEndpointImplBase endpoint, 4931157Smax.romanov@nginx.com SendHandler handler) { 4941157Smax.romanov@nginx.com this.endpoint = endpoint; 4951157Smax.romanov@nginx.com this.handler = handler; 4961157Smax.romanov@nginx.com } 4971157Smax.romanov@nginx.com 4981157Smax.romanov@nginx.com 4991157Smax.romanov@nginx.com @Override onResult(SendResult result)5001157Smax.romanov@nginx.com public void onResult(SendResult result) { 5011157Smax.romanov@nginx.com endpoint.endMessage(handler, result); 5021157Smax.romanov@nginx.com } 5031157Smax.romanov@nginx.com } 5041157Smax.romanov@nginx.com 5051157Smax.romanov@nginx.com 5061157Smax.romanov@nginx.com /** 5071157Smax.romanov@nginx.com * If a transformation needs to split a {@link MessagePart} into multiple 5081157Smax.romanov@nginx.com * {@link MessagePart}s, it uses this handler as the end handler for each of 5091157Smax.romanov@nginx.com * the additional {@link MessagePart}s. This handler notifies this this 5101157Smax.romanov@nginx.com * class that the {@link MessagePart} has been processed and that the next 5111157Smax.romanov@nginx.com * {@link MessagePart} in the queue should be started. The final 5121157Smax.romanov@nginx.com * {@link MessagePart} will use the {@link EndMessageHandler} provided with 5131157Smax.romanov@nginx.com * the original {@link MessagePart}. 5141157Smax.romanov@nginx.com */ 5151157Smax.romanov@nginx.com private static class IntermediateMessageHandler implements SendHandler { 5161157Smax.romanov@nginx.com 5171157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 5181157Smax.romanov@nginx.com IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint)5191157Smax.romanov@nginx.com public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) { 5201157Smax.romanov@nginx.com this.endpoint = endpoint; 5211157Smax.romanov@nginx.com } 5221157Smax.romanov@nginx.com 5231157Smax.romanov@nginx.com 5241157Smax.romanov@nginx.com @Override onResult(SendResult result)5251157Smax.romanov@nginx.com public void onResult(SendResult result) { 5261157Smax.romanov@nginx.com endpoint.endMessage(null, result); 5271157Smax.romanov@nginx.com } 5281157Smax.romanov@nginx.com } 5291157Smax.romanov@nginx.com 5301157Smax.romanov@nginx.com 5311157Smax.romanov@nginx.com @SuppressWarnings({"unchecked", "rawtypes"}) sendObject(Object obj)5321157Smax.romanov@nginx.com public void sendObject(Object obj) throws IOException, EncodeException { 5331157Smax.romanov@nginx.com if (obj == null) { 5341157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 5351157Smax.romanov@nginx.com } 5361157Smax.romanov@nginx.com /* 5371157Smax.romanov@nginx.com * Note that the implementation will convert primitives and their object 5381157Smax.romanov@nginx.com * equivalents by default but that users are free to specify their own 5391157Smax.romanov@nginx.com * encoders and decoders for this if they wish. 5401157Smax.romanov@nginx.com */ 5411157Smax.romanov@nginx.com Encoder encoder = findEncoder(obj); 5421157Smax.romanov@nginx.com if (encoder == null && Util.isPrimitive(obj.getClass())) { 5431157Smax.romanov@nginx.com String msg = obj.toString(); 5441157Smax.romanov@nginx.com sendString(msg); 5451157Smax.romanov@nginx.com return; 5461157Smax.romanov@nginx.com } 5471157Smax.romanov@nginx.com if (encoder == null && byte[].class.isAssignableFrom(obj.getClass())) { 5481157Smax.romanov@nginx.com ByteBuffer msg = ByteBuffer.wrap((byte[]) obj); 5491157Smax.romanov@nginx.com sendBytes(msg); 5501157Smax.romanov@nginx.com return; 5511157Smax.romanov@nginx.com } 5521157Smax.romanov@nginx.com 5531157Smax.romanov@nginx.com if (encoder instanceof Encoder.Text) { 5541157Smax.romanov@nginx.com String msg = ((Encoder.Text) encoder).encode(obj); 5551157Smax.romanov@nginx.com sendString(msg); 5561157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.TextStream) { 5571157Smax.romanov@nginx.com try (Writer w = getSendWriter()) { 5581157Smax.romanov@nginx.com ((Encoder.TextStream) encoder).encode(obj, w); 5591157Smax.romanov@nginx.com } 5601157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.Binary) { 5611157Smax.romanov@nginx.com ByteBuffer msg = ((Encoder.Binary) encoder).encode(obj); 5621157Smax.romanov@nginx.com sendBytes(msg); 5631157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.BinaryStream) { 5641157Smax.romanov@nginx.com try (OutputStream os = getSendStream()) { 5651157Smax.romanov@nginx.com ((Encoder.BinaryStream) encoder).encode(obj, os); 5661157Smax.romanov@nginx.com } 5671157Smax.romanov@nginx.com } else { 5681157Smax.romanov@nginx.com throw new EncodeException(obj, sm.getString( 5691157Smax.romanov@nginx.com "wsRemoteEndpoint.noEncoder", obj.getClass())); 5701157Smax.romanov@nginx.com } 5711157Smax.romanov@nginx.com } 5721157Smax.romanov@nginx.com 5731157Smax.romanov@nginx.com sendObjectByFuture(Object obj)5741157Smax.romanov@nginx.com public Future<Void> sendObjectByFuture(Object obj) { 5751157Smax.romanov@nginx.com FutureToSendHandler f2sh = new FutureToSendHandler(wsSession); 5761157Smax.romanov@nginx.com sendObjectByCompletion(obj, f2sh); 5771157Smax.romanov@nginx.com return f2sh; 5781157Smax.romanov@nginx.com } 5791157Smax.romanov@nginx.com 5801157Smax.romanov@nginx.com 5811157Smax.romanov@nginx.com @SuppressWarnings({"unchecked", "rawtypes"}) sendObjectByCompletion(Object obj, SendHandler completion)5821157Smax.romanov@nginx.com public void sendObjectByCompletion(Object obj, SendHandler completion) { 5831157Smax.romanov@nginx.com 5841157Smax.romanov@nginx.com if (obj == null) { 5851157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); 5861157Smax.romanov@nginx.com } 5871157Smax.romanov@nginx.com if (completion == null) { 5881157Smax.romanov@nginx.com throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler")); 5891157Smax.romanov@nginx.com } 5901157Smax.romanov@nginx.com 5911157Smax.romanov@nginx.com /* 5921157Smax.romanov@nginx.com * Note that the implementation will convert primitives and their object 5931157Smax.romanov@nginx.com * equivalents by default but that users are free to specify their own 5941157Smax.romanov@nginx.com * encoders and decoders for this if they wish. 5951157Smax.romanov@nginx.com */ 5961157Smax.romanov@nginx.com Encoder encoder = findEncoder(obj); 5971157Smax.romanov@nginx.com if (encoder == null && Util.isPrimitive(obj.getClass())) { 5981157Smax.romanov@nginx.com String msg = obj.toString(); 5991157Smax.romanov@nginx.com sendStringByCompletion(msg, completion); 6001157Smax.romanov@nginx.com return; 6011157Smax.romanov@nginx.com } 6021157Smax.romanov@nginx.com if (encoder == null && byte[].class.isAssignableFrom(obj.getClass())) { 6031157Smax.romanov@nginx.com ByteBuffer msg = ByteBuffer.wrap((byte[]) obj); 6041157Smax.romanov@nginx.com sendBytesByCompletion(msg, completion); 6051157Smax.romanov@nginx.com return; 6061157Smax.romanov@nginx.com } 6071157Smax.romanov@nginx.com 6081157Smax.romanov@nginx.com try { 6091157Smax.romanov@nginx.com if (encoder instanceof Encoder.Text) { 6101157Smax.romanov@nginx.com String msg = ((Encoder.Text) encoder).encode(obj); 6111157Smax.romanov@nginx.com sendStringByCompletion(msg, completion); 6121157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.TextStream) { 6131157Smax.romanov@nginx.com try (Writer w = getSendWriter()) { 6141157Smax.romanov@nginx.com ((Encoder.TextStream) encoder).encode(obj, w); 6151157Smax.romanov@nginx.com } 6161157Smax.romanov@nginx.com completion.onResult(new SendResult()); 6171157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.Binary) { 6181157Smax.romanov@nginx.com ByteBuffer msg = ((Encoder.Binary) encoder).encode(obj); 6191157Smax.romanov@nginx.com sendBytesByCompletion(msg, completion); 6201157Smax.romanov@nginx.com } else if (encoder instanceof Encoder.BinaryStream) { 6211157Smax.romanov@nginx.com try (OutputStream os = getSendStream()) { 6221157Smax.romanov@nginx.com ((Encoder.BinaryStream) encoder).encode(obj, os); 6231157Smax.romanov@nginx.com } 6241157Smax.romanov@nginx.com completion.onResult(new SendResult()); 6251157Smax.romanov@nginx.com } else { 6261157Smax.romanov@nginx.com throw new EncodeException(obj, sm.getString( 6271157Smax.romanov@nginx.com "wsRemoteEndpoint.noEncoder", obj.getClass())); 6281157Smax.romanov@nginx.com } 6291157Smax.romanov@nginx.com } catch (Exception e) { 6301157Smax.romanov@nginx.com SendResult sr = new SendResult(e); 6311157Smax.romanov@nginx.com completion.onResult(sr); 6321157Smax.romanov@nginx.com } 6331157Smax.romanov@nginx.com } 6341157Smax.romanov@nginx.com 6351157Smax.romanov@nginx.com setSession(WsSession wsSession)6361157Smax.romanov@nginx.com protected void setSession(WsSession wsSession) { 6371157Smax.romanov@nginx.com this.wsSession = wsSession; 6381157Smax.romanov@nginx.com } 6391157Smax.romanov@nginx.com 6401157Smax.romanov@nginx.com setRequest(Request request)6411157Smax.romanov@nginx.com protected void setRequest(Request request) { 6421157Smax.romanov@nginx.com this.request = request; 6431157Smax.romanov@nginx.com } 6441157Smax.romanov@nginx.com setEncoders(EndpointConfig endpointConfig)6451157Smax.romanov@nginx.com protected void setEncoders(EndpointConfig endpointConfig) 6461157Smax.romanov@nginx.com throws DeploymentException { 6471157Smax.romanov@nginx.com encoderEntries.clear(); 6481157Smax.romanov@nginx.com for (Class<? extends Encoder> encoderClazz : 6491157Smax.romanov@nginx.com endpointConfig.getEncoders()) { 6501157Smax.romanov@nginx.com Encoder instance; 6511157Smax.romanov@nginx.com try { 6521157Smax.romanov@nginx.com instance = encoderClazz.getConstructor().newInstance(); 6531157Smax.romanov@nginx.com instance.init(endpointConfig); 6541157Smax.romanov@nginx.com } catch (ReflectiveOperationException e) { 6551157Smax.romanov@nginx.com throw new DeploymentException( 6561157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.invalidEncoder", 6571157Smax.romanov@nginx.com encoderClazz.getName()), e); 6581157Smax.romanov@nginx.com } 6591157Smax.romanov@nginx.com EncoderEntry entry = new EncoderEntry( 6601157Smax.romanov@nginx.com Util.getEncoderType(encoderClazz), instance); 6611157Smax.romanov@nginx.com encoderEntries.add(entry); 6621157Smax.romanov@nginx.com } 6631157Smax.romanov@nginx.com } 6641157Smax.romanov@nginx.com 6651157Smax.romanov@nginx.com findEncoder(Object obj)6661157Smax.romanov@nginx.com private Encoder findEncoder(Object obj) { 6671157Smax.romanov@nginx.com for (EncoderEntry entry : encoderEntries) { 6681157Smax.romanov@nginx.com if (entry.getClazz().isAssignableFrom(obj.getClass())) { 6691157Smax.romanov@nginx.com return entry.getEncoder(); 6701157Smax.romanov@nginx.com } 6711157Smax.romanov@nginx.com } 6721157Smax.romanov@nginx.com return null; 6731157Smax.romanov@nginx.com } 6741157Smax.romanov@nginx.com 6751157Smax.romanov@nginx.com close()6761157Smax.romanov@nginx.com public final void close() { 6771157Smax.romanov@nginx.com for (EncoderEntry entry : encoderEntries) { 6781157Smax.romanov@nginx.com entry.getEncoder().destroy(); 6791157Smax.romanov@nginx.com } 6801157Smax.romanov@nginx.com 6811157Smax.romanov@nginx.com request.closeWs(); 6821157Smax.romanov@nginx.com } 6831157Smax.romanov@nginx.com 6841157Smax.romanov@nginx.com doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... data)6851157Smax.romanov@nginx.com protected abstract void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, 6861157Smax.romanov@nginx.com ByteBuffer... data); isMasked()6871157Smax.romanov@nginx.com protected abstract boolean isMasked(); doClose()6881157Smax.romanov@nginx.com protected abstract void doClose(); 6891157Smax.romanov@nginx.com writeHeader(ByteBuffer headerBuffer, boolean fin, int rsv, byte opCode, boolean masked, ByteBuffer payload, byte[] mask, boolean first)6901157Smax.romanov@nginx.com private static void writeHeader(ByteBuffer headerBuffer, boolean fin, 6911157Smax.romanov@nginx.com int rsv, byte opCode, boolean masked, ByteBuffer payload, 6921157Smax.romanov@nginx.com byte[] mask, boolean first) { 6931157Smax.romanov@nginx.com 6941157Smax.romanov@nginx.com byte b = 0; 6951157Smax.romanov@nginx.com 6961157Smax.romanov@nginx.com if (fin) { 6971157Smax.romanov@nginx.com // Set the fin bit 6981157Smax.romanov@nginx.com b -= 128; 6991157Smax.romanov@nginx.com } 7001157Smax.romanov@nginx.com 7011157Smax.romanov@nginx.com b += (rsv << 4); 7021157Smax.romanov@nginx.com 7031157Smax.romanov@nginx.com if (first) { 7041157Smax.romanov@nginx.com // This is the first fragment of this message 7051157Smax.romanov@nginx.com b += opCode; 7061157Smax.romanov@nginx.com } 7071157Smax.romanov@nginx.com // If not the first fragment, it is a continuation with opCode of zero 7081157Smax.romanov@nginx.com 7091157Smax.romanov@nginx.com headerBuffer.put(b); 7101157Smax.romanov@nginx.com 7111157Smax.romanov@nginx.com if (masked) { 7121157Smax.romanov@nginx.com b = (byte) 0x80; 7131157Smax.romanov@nginx.com } else { 7141157Smax.romanov@nginx.com b = 0; 7151157Smax.romanov@nginx.com } 7161157Smax.romanov@nginx.com 7171157Smax.romanov@nginx.com // Next write the mask && length length 7181157Smax.romanov@nginx.com if (payload.limit() < 126) { 7191157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() | b)); 7201157Smax.romanov@nginx.com } else if (payload.limit() < 65536) { 7211157Smax.romanov@nginx.com headerBuffer.put((byte) (126 | b)); 7221157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() >>> 8)); 7231157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() & 0xFF)); 7241157Smax.romanov@nginx.com } else { 7251157Smax.romanov@nginx.com // Will never be more than 2^31-1 7261157Smax.romanov@nginx.com headerBuffer.put((byte) (127 | b)); 7271157Smax.romanov@nginx.com headerBuffer.put((byte) 0); 7281157Smax.romanov@nginx.com headerBuffer.put((byte) 0); 7291157Smax.romanov@nginx.com headerBuffer.put((byte) 0); 7301157Smax.romanov@nginx.com headerBuffer.put((byte) 0); 7311157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() >>> 24)); 7321157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() >>> 16)); 7331157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() >>> 8)); 7341157Smax.romanov@nginx.com headerBuffer.put((byte) (payload.limit() & 0xFF)); 7351157Smax.romanov@nginx.com } 7361157Smax.romanov@nginx.com if (masked) { 7371157Smax.romanov@nginx.com headerBuffer.put(mask[0]); 7381157Smax.romanov@nginx.com headerBuffer.put(mask[1]); 7391157Smax.romanov@nginx.com headerBuffer.put(mask[2]); 7401157Smax.romanov@nginx.com headerBuffer.put(mask[3]); 7411157Smax.romanov@nginx.com } 7421157Smax.romanov@nginx.com } 7431157Smax.romanov@nginx.com 7441157Smax.romanov@nginx.com 7451157Smax.romanov@nginx.com private class TextMessageSendHandler implements SendHandler { 7461157Smax.romanov@nginx.com 7471157Smax.romanov@nginx.com private final SendHandler handler; 7481157Smax.romanov@nginx.com private final CharBuffer message; 7491157Smax.romanov@nginx.com private final boolean isLast; 7501157Smax.romanov@nginx.com private final CharsetEncoder encoder; 7511157Smax.romanov@nginx.com private final ByteBuffer buffer; 7521157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 7531157Smax.romanov@nginx.com private volatile boolean isDone = false; 7541157Smax.romanov@nginx.com TextMessageSendHandler(SendHandler handler, CharBuffer message, boolean isLast, CharsetEncoder encoder, ByteBuffer encoderBuffer, WsRemoteEndpointImplBase endpoint)7551157Smax.romanov@nginx.com public TextMessageSendHandler(SendHandler handler, CharBuffer message, 7561157Smax.romanov@nginx.com boolean isLast, CharsetEncoder encoder, 7571157Smax.romanov@nginx.com ByteBuffer encoderBuffer, WsRemoteEndpointImplBase endpoint) { 7581157Smax.romanov@nginx.com this.handler = handler; 7591157Smax.romanov@nginx.com this.message = message; 7601157Smax.romanov@nginx.com this.isLast = isLast; 7611157Smax.romanov@nginx.com this.encoder = encoder.reset(); 7621157Smax.romanov@nginx.com this.buffer = encoderBuffer; 7631157Smax.romanov@nginx.com this.endpoint = endpoint; 7641157Smax.romanov@nginx.com } 7651157Smax.romanov@nginx.com write()7661157Smax.romanov@nginx.com public void write() { 7671157Smax.romanov@nginx.com buffer.clear(); 7681157Smax.romanov@nginx.com CoderResult cr = encoder.encode(message, buffer, true); 7691157Smax.romanov@nginx.com if (cr.isError()) { 7701157Smax.romanov@nginx.com throw new IllegalArgumentException(cr.toString()); 7711157Smax.romanov@nginx.com } 7721157Smax.romanov@nginx.com isDone = !cr.isOverflow(); 7731157Smax.romanov@nginx.com buffer.flip(); 7741157Smax.romanov@nginx.com endpoint.startMessage(Constants.OPCODE_TEXT, buffer, 7751157Smax.romanov@nginx.com isDone && isLast, this); 7761157Smax.romanov@nginx.com } 7771157Smax.romanov@nginx.com 7781157Smax.romanov@nginx.com @Override onResult(SendResult result)7791157Smax.romanov@nginx.com public void onResult(SendResult result) { 7801157Smax.romanov@nginx.com if (isDone) { 7811157Smax.romanov@nginx.com endpoint.stateMachine.complete(isLast); 7821157Smax.romanov@nginx.com handler.onResult(result); 7831157Smax.romanov@nginx.com } else if(!result.isOK()) { 7841157Smax.romanov@nginx.com handler.onResult(result); 7851157Smax.romanov@nginx.com } else if (closed){ 7861157Smax.romanov@nginx.com SendResult sr = new SendResult(new IOException( 7871157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedDuringMessage"))); 7881157Smax.romanov@nginx.com handler.onResult(sr); 7891157Smax.romanov@nginx.com } else { 7901157Smax.romanov@nginx.com write(); 7911157Smax.romanov@nginx.com } 7921157Smax.romanov@nginx.com } 7931157Smax.romanov@nginx.com } 7941157Smax.romanov@nginx.com 7951157Smax.romanov@nginx.com 7961157Smax.romanov@nginx.com /** 7971157Smax.romanov@nginx.com * Used to write data to the output buffer, flushing the buffer if it fills 7981157Smax.romanov@nginx.com * up. 7991157Smax.romanov@nginx.com */ 8001157Smax.romanov@nginx.com private static class OutputBufferSendHandler implements SendHandler { 8011157Smax.romanov@nginx.com 8021157Smax.romanov@nginx.com private final SendHandler handler; 8031157Smax.romanov@nginx.com private final long blockingWriteTimeoutExpiry; 8041157Smax.romanov@nginx.com private final ByteBuffer headerBuffer; 8051157Smax.romanov@nginx.com private final ByteBuffer payload; 8061157Smax.romanov@nginx.com private final byte[] mask; 8071157Smax.romanov@nginx.com private final ByteBuffer outputBuffer; 8081157Smax.romanov@nginx.com private final boolean flushRequired; 8091157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 8101157Smax.romanov@nginx.com private int maskIndex = 0; 8111157Smax.romanov@nginx.com OutputBufferSendHandler(SendHandler completion, long blockingWriteTimeoutExpiry, ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask, ByteBuffer outputBuffer, boolean flushRequired, WsRemoteEndpointImplBase endpoint)8121157Smax.romanov@nginx.com public OutputBufferSendHandler(SendHandler completion, 8131157Smax.romanov@nginx.com long blockingWriteTimeoutExpiry, 8141157Smax.romanov@nginx.com ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask, 8151157Smax.romanov@nginx.com ByteBuffer outputBuffer, boolean flushRequired, 8161157Smax.romanov@nginx.com WsRemoteEndpointImplBase endpoint) { 8171157Smax.romanov@nginx.com this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry; 8181157Smax.romanov@nginx.com this.handler = completion; 8191157Smax.romanov@nginx.com this.headerBuffer = headerBuffer; 8201157Smax.romanov@nginx.com this.payload = payload; 8211157Smax.romanov@nginx.com this.mask = mask; 8221157Smax.romanov@nginx.com this.outputBuffer = outputBuffer; 8231157Smax.romanov@nginx.com this.flushRequired = flushRequired; 8241157Smax.romanov@nginx.com this.endpoint = endpoint; 8251157Smax.romanov@nginx.com } 8261157Smax.romanov@nginx.com write()8271157Smax.romanov@nginx.com public void write() { 8281157Smax.romanov@nginx.com // Write the header 8291157Smax.romanov@nginx.com while (headerBuffer.hasRemaining() && outputBuffer.hasRemaining()) { 8301157Smax.romanov@nginx.com outputBuffer.put(headerBuffer.get()); 8311157Smax.romanov@nginx.com } 8321157Smax.romanov@nginx.com if (headerBuffer.hasRemaining()) { 8331157Smax.romanov@nginx.com // Still more headers to write, need to flush 8341157Smax.romanov@nginx.com outputBuffer.flip(); 8351157Smax.romanov@nginx.com endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); 8361157Smax.romanov@nginx.com return; 8371157Smax.romanov@nginx.com } 8381157Smax.romanov@nginx.com 8391157Smax.romanov@nginx.com // Write the payload 8401157Smax.romanov@nginx.com int payloadLeft = payload.remaining(); 8411157Smax.romanov@nginx.com int payloadLimit = payload.limit(); 8421157Smax.romanov@nginx.com int outputSpace = outputBuffer.remaining(); 8431157Smax.romanov@nginx.com int toWrite = payloadLeft; 8441157Smax.romanov@nginx.com 8451157Smax.romanov@nginx.com if (payloadLeft > outputSpace) { 8461157Smax.romanov@nginx.com toWrite = outputSpace; 8471157Smax.romanov@nginx.com // Temporarily reduce the limit 8481157Smax.romanov@nginx.com payload.limit(payload.position() + toWrite); 8491157Smax.romanov@nginx.com } 8501157Smax.romanov@nginx.com 8511157Smax.romanov@nginx.com if (mask == null) { 8521157Smax.romanov@nginx.com // Use a bulk copy 8531157Smax.romanov@nginx.com outputBuffer.put(payload); 8541157Smax.romanov@nginx.com } else { 8551157Smax.romanov@nginx.com for (int i = 0; i < toWrite; i++) { 8561157Smax.romanov@nginx.com outputBuffer.put( 8571157Smax.romanov@nginx.com (byte) (payload.get() ^ (mask[maskIndex++] & 0xFF))); 8581157Smax.romanov@nginx.com if (maskIndex > 3) { 8591157Smax.romanov@nginx.com maskIndex = 0; 8601157Smax.romanov@nginx.com } 8611157Smax.romanov@nginx.com } 8621157Smax.romanov@nginx.com } 8631157Smax.romanov@nginx.com 8641157Smax.romanov@nginx.com if (payloadLeft > outputSpace) { 8651157Smax.romanov@nginx.com // Restore the original limit 8661157Smax.romanov@nginx.com payload.limit(payloadLimit); 8671157Smax.romanov@nginx.com // Still more data to write, need to flush 8681157Smax.romanov@nginx.com outputBuffer.flip(); 8691157Smax.romanov@nginx.com endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); 8701157Smax.romanov@nginx.com return; 8711157Smax.romanov@nginx.com } 8721157Smax.romanov@nginx.com 8731157Smax.romanov@nginx.com if (flushRequired) { 8741157Smax.romanov@nginx.com outputBuffer.flip(); 8751157Smax.romanov@nginx.com if (outputBuffer.remaining() == 0) { 8761157Smax.romanov@nginx.com handler.onResult(SENDRESULT_OK); 8771157Smax.romanov@nginx.com } else { 8781157Smax.romanov@nginx.com endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); 8791157Smax.romanov@nginx.com } 8801157Smax.romanov@nginx.com } else { 8811157Smax.romanov@nginx.com handler.onResult(SENDRESULT_OK); 8821157Smax.romanov@nginx.com } 8831157Smax.romanov@nginx.com } 8841157Smax.romanov@nginx.com 8851157Smax.romanov@nginx.com // ------------------------------------------------- SendHandler methods 8861157Smax.romanov@nginx.com @Override onResult(SendResult result)8871157Smax.romanov@nginx.com public void onResult(SendResult result) { 8881157Smax.romanov@nginx.com if (result.isOK()) { 8891157Smax.romanov@nginx.com if (outputBuffer.hasRemaining()) { 8901157Smax.romanov@nginx.com endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); 8911157Smax.romanov@nginx.com } else { 8921157Smax.romanov@nginx.com outputBuffer.clear(); 8931157Smax.romanov@nginx.com write(); 8941157Smax.romanov@nginx.com } 8951157Smax.romanov@nginx.com } else { 8961157Smax.romanov@nginx.com handler.onResult(result); 8971157Smax.romanov@nginx.com } 8981157Smax.romanov@nginx.com } 8991157Smax.romanov@nginx.com } 9001157Smax.romanov@nginx.com 9011157Smax.romanov@nginx.com 9021157Smax.romanov@nginx.com /** 9031157Smax.romanov@nginx.com * Ensures that the output buffer is cleared after it has been flushed. 9041157Smax.romanov@nginx.com */ 9051157Smax.romanov@nginx.com private static class OutputBufferFlushSendHandler implements SendHandler { 9061157Smax.romanov@nginx.com 9071157Smax.romanov@nginx.com private final ByteBuffer outputBuffer; 9081157Smax.romanov@nginx.com private final SendHandler handler; 9091157Smax.romanov@nginx.com OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler)9101157Smax.romanov@nginx.com public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler) { 9111157Smax.romanov@nginx.com this.outputBuffer = outputBuffer; 9121157Smax.romanov@nginx.com this.handler = handler; 9131157Smax.romanov@nginx.com } 9141157Smax.romanov@nginx.com 9151157Smax.romanov@nginx.com @Override onResult(SendResult result)9161157Smax.romanov@nginx.com public void onResult(SendResult result) { 9171157Smax.romanov@nginx.com if (result.isOK()) { 9181157Smax.romanov@nginx.com outputBuffer.clear(); 9191157Smax.romanov@nginx.com } 9201157Smax.romanov@nginx.com handler.onResult(result); 9211157Smax.romanov@nginx.com } 9221157Smax.romanov@nginx.com } 9231157Smax.romanov@nginx.com 9241157Smax.romanov@nginx.com 9251157Smax.romanov@nginx.com private static class WsOutputStream extends OutputStream { 9261157Smax.romanov@nginx.com 9271157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 9281157Smax.romanov@nginx.com private final ByteBuffer buffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); 9291157Smax.romanov@nginx.com private final Object closeLock = new Object(); 9301157Smax.romanov@nginx.com private volatile boolean closed = false; 9311157Smax.romanov@nginx.com private volatile boolean used = false; 9321157Smax.romanov@nginx.com WsOutputStream(WsRemoteEndpointImplBase endpoint)9331157Smax.romanov@nginx.com public WsOutputStream(WsRemoteEndpointImplBase endpoint) { 9341157Smax.romanov@nginx.com this.endpoint = endpoint; 9351157Smax.romanov@nginx.com } 9361157Smax.romanov@nginx.com 9371157Smax.romanov@nginx.com @Override write(int b)9381157Smax.romanov@nginx.com public void write(int b) throws IOException { 9391157Smax.romanov@nginx.com if (closed) { 9401157Smax.romanov@nginx.com throw new IllegalStateException( 9411157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedOutputStream")); 9421157Smax.romanov@nginx.com } 9431157Smax.romanov@nginx.com 9441157Smax.romanov@nginx.com used = true; 9451157Smax.romanov@nginx.com if (buffer.remaining() == 0) { 9461157Smax.romanov@nginx.com flush(); 9471157Smax.romanov@nginx.com } 9481157Smax.romanov@nginx.com buffer.put((byte) b); 9491157Smax.romanov@nginx.com } 9501157Smax.romanov@nginx.com 9511157Smax.romanov@nginx.com @Override write(byte[] b, int off, int len)9521157Smax.romanov@nginx.com public void write(byte[] b, int off, int len) throws IOException { 9531157Smax.romanov@nginx.com if (closed) { 9541157Smax.romanov@nginx.com throw new IllegalStateException( 9551157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedOutputStream")); 9561157Smax.romanov@nginx.com } 9571157Smax.romanov@nginx.com if (len == 0) { 9581157Smax.romanov@nginx.com return; 9591157Smax.romanov@nginx.com } 9601157Smax.romanov@nginx.com if ((off < 0) || (off > b.length) || (len < 0) || 9611157Smax.romanov@nginx.com ((off + len) > b.length) || ((off + len) < 0)) { 9621157Smax.romanov@nginx.com throw new IndexOutOfBoundsException(); 9631157Smax.romanov@nginx.com } 9641157Smax.romanov@nginx.com 9651157Smax.romanov@nginx.com used = true; 9661157Smax.romanov@nginx.com if (buffer.remaining() == 0) { 9671157Smax.romanov@nginx.com flush(); 9681157Smax.romanov@nginx.com } 9691157Smax.romanov@nginx.com int remaining = buffer.remaining(); 9701157Smax.romanov@nginx.com int written = 0; 9711157Smax.romanov@nginx.com 9721157Smax.romanov@nginx.com while (remaining < len - written) { 9731157Smax.romanov@nginx.com buffer.put(b, off + written, remaining); 9741157Smax.romanov@nginx.com written += remaining; 9751157Smax.romanov@nginx.com flush(); 9761157Smax.romanov@nginx.com remaining = buffer.remaining(); 9771157Smax.romanov@nginx.com } 9781157Smax.romanov@nginx.com buffer.put(b, off + written, len - written); 9791157Smax.romanov@nginx.com } 9801157Smax.romanov@nginx.com 9811157Smax.romanov@nginx.com @Override flush()9821157Smax.romanov@nginx.com public void flush() throws IOException { 9831157Smax.romanov@nginx.com if (closed) { 9841157Smax.romanov@nginx.com throw new IllegalStateException( 9851157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedOutputStream")); 9861157Smax.romanov@nginx.com } 9871157Smax.romanov@nginx.com 9881157Smax.romanov@nginx.com // Optimisation. If there is no data to flush then do not send an 9891157Smax.romanov@nginx.com // empty message. 9901157Smax.romanov@nginx.com if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || buffer.position() > 0) { 9911157Smax.romanov@nginx.com doWrite(false); 9921157Smax.romanov@nginx.com } 9931157Smax.romanov@nginx.com } 9941157Smax.romanov@nginx.com 9951157Smax.romanov@nginx.com @Override close()9961157Smax.romanov@nginx.com public void close() throws IOException { 9971157Smax.romanov@nginx.com synchronized (closeLock) { 9981157Smax.romanov@nginx.com if (closed) { 9991157Smax.romanov@nginx.com return; 10001157Smax.romanov@nginx.com } 10011157Smax.romanov@nginx.com closed = true; 10021157Smax.romanov@nginx.com } 10031157Smax.romanov@nginx.com 10041157Smax.romanov@nginx.com doWrite(true); 10051157Smax.romanov@nginx.com } 10061157Smax.romanov@nginx.com doWrite(boolean last)10071157Smax.romanov@nginx.com private void doWrite(boolean last) throws IOException { 10081157Smax.romanov@nginx.com if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || used) { 10091157Smax.romanov@nginx.com buffer.flip(); 10101157Smax.romanov@nginx.com endpoint.sendMessageBlock(Constants.OPCODE_BINARY, buffer, last); 10111157Smax.romanov@nginx.com } 10121157Smax.romanov@nginx.com endpoint.stateMachine.complete(last); 10131157Smax.romanov@nginx.com buffer.clear(); 10141157Smax.romanov@nginx.com } 10151157Smax.romanov@nginx.com } 10161157Smax.romanov@nginx.com 10171157Smax.romanov@nginx.com 10181157Smax.romanov@nginx.com private static class WsWriter extends Writer { 10191157Smax.romanov@nginx.com 10201157Smax.romanov@nginx.com private final WsRemoteEndpointImplBase endpoint; 10211157Smax.romanov@nginx.com private final CharBuffer buffer = CharBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); 10221157Smax.romanov@nginx.com private final Object closeLock = new Object(); 10231157Smax.romanov@nginx.com private volatile boolean closed = false; 10241157Smax.romanov@nginx.com private volatile boolean used = false; 10251157Smax.romanov@nginx.com WsWriter(WsRemoteEndpointImplBase endpoint)10261157Smax.romanov@nginx.com public WsWriter(WsRemoteEndpointImplBase endpoint) { 10271157Smax.romanov@nginx.com this.endpoint = endpoint; 10281157Smax.romanov@nginx.com } 10291157Smax.romanov@nginx.com 10301157Smax.romanov@nginx.com @Override write(char[] cbuf, int off, int len)10311157Smax.romanov@nginx.com public void write(char[] cbuf, int off, int len) throws IOException { 10321157Smax.romanov@nginx.com if (closed) { 10331157Smax.romanov@nginx.com throw new IllegalStateException( 10341157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedWriter")); 10351157Smax.romanov@nginx.com } 10361157Smax.romanov@nginx.com if (len == 0) { 10371157Smax.romanov@nginx.com return; 10381157Smax.romanov@nginx.com } 10391157Smax.romanov@nginx.com if ((off < 0) || (off > cbuf.length) || (len < 0) || 10401157Smax.romanov@nginx.com ((off + len) > cbuf.length) || ((off + len) < 0)) { 10411157Smax.romanov@nginx.com throw new IndexOutOfBoundsException(); 10421157Smax.romanov@nginx.com } 10431157Smax.romanov@nginx.com 10441157Smax.romanov@nginx.com used = true; 10451157Smax.romanov@nginx.com if (buffer.remaining() == 0) { 10461157Smax.romanov@nginx.com flush(); 10471157Smax.romanov@nginx.com } 10481157Smax.romanov@nginx.com int remaining = buffer.remaining(); 10491157Smax.romanov@nginx.com int written = 0; 10501157Smax.romanov@nginx.com 10511157Smax.romanov@nginx.com while (remaining < len - written) { 10521157Smax.romanov@nginx.com buffer.put(cbuf, off + written, remaining); 10531157Smax.romanov@nginx.com written += remaining; 10541157Smax.romanov@nginx.com flush(); 10551157Smax.romanov@nginx.com remaining = buffer.remaining(); 10561157Smax.romanov@nginx.com } 10571157Smax.romanov@nginx.com buffer.put(cbuf, off + written, len - written); 10581157Smax.romanov@nginx.com } 10591157Smax.romanov@nginx.com 10601157Smax.romanov@nginx.com @Override flush()10611157Smax.romanov@nginx.com public void flush() throws IOException { 10621157Smax.romanov@nginx.com if (closed) { 10631157Smax.romanov@nginx.com throw new IllegalStateException( 10641157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.closedWriter")); 10651157Smax.romanov@nginx.com } 10661157Smax.romanov@nginx.com 10671157Smax.romanov@nginx.com if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || buffer.position() > 0) { 10681157Smax.romanov@nginx.com doWrite(false); 10691157Smax.romanov@nginx.com } 10701157Smax.romanov@nginx.com } 10711157Smax.romanov@nginx.com 10721157Smax.romanov@nginx.com @Override close()10731157Smax.romanov@nginx.com public void close() throws IOException { 10741157Smax.romanov@nginx.com synchronized (closeLock) { 10751157Smax.romanov@nginx.com if (closed) { 10761157Smax.romanov@nginx.com return; 10771157Smax.romanov@nginx.com } 10781157Smax.romanov@nginx.com closed = true; 10791157Smax.romanov@nginx.com } 10801157Smax.romanov@nginx.com 10811157Smax.romanov@nginx.com doWrite(true); 10821157Smax.romanov@nginx.com } 10831157Smax.romanov@nginx.com doWrite(boolean last)10841157Smax.romanov@nginx.com private void doWrite(boolean last) throws IOException { 10851157Smax.romanov@nginx.com if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || used) { 10861157Smax.romanov@nginx.com buffer.flip(); 10871157Smax.romanov@nginx.com endpoint.sendMessageBlock(buffer, last); 10881157Smax.romanov@nginx.com buffer.clear(); 10891157Smax.romanov@nginx.com } else { 10901157Smax.romanov@nginx.com endpoint.stateMachine.complete(last); 10911157Smax.romanov@nginx.com } 10921157Smax.romanov@nginx.com } 10931157Smax.romanov@nginx.com } 10941157Smax.romanov@nginx.com 10951157Smax.romanov@nginx.com 10961157Smax.romanov@nginx.com private static class EncoderEntry { 10971157Smax.romanov@nginx.com 10981157Smax.romanov@nginx.com private final Class<?> clazz; 10991157Smax.romanov@nginx.com private final Encoder encoder; 11001157Smax.romanov@nginx.com EncoderEntry(Class<?> clazz, Encoder encoder)11011157Smax.romanov@nginx.com public EncoderEntry(Class<?> clazz, Encoder encoder) { 11021157Smax.romanov@nginx.com this.clazz = clazz; 11031157Smax.romanov@nginx.com this.encoder = encoder; 11041157Smax.romanov@nginx.com } 11051157Smax.romanov@nginx.com getClazz()11061157Smax.romanov@nginx.com public Class<?> getClazz() { 11071157Smax.romanov@nginx.com return clazz; 11081157Smax.romanov@nginx.com } 11091157Smax.romanov@nginx.com getEncoder()11101157Smax.romanov@nginx.com public Encoder getEncoder() { 11111157Smax.romanov@nginx.com return encoder; 11121157Smax.romanov@nginx.com } 11131157Smax.romanov@nginx.com } 11141157Smax.romanov@nginx.com 11151157Smax.romanov@nginx.com 11161157Smax.romanov@nginx.com private enum State { 11171157Smax.romanov@nginx.com OPEN, 11181157Smax.romanov@nginx.com STREAM_WRITING, 11191157Smax.romanov@nginx.com WRITER_WRITING, 11201157Smax.romanov@nginx.com BINARY_PARTIAL_WRITING, 11211157Smax.romanov@nginx.com BINARY_PARTIAL_READY, 11221157Smax.romanov@nginx.com BINARY_FULL_WRITING, 11231157Smax.romanov@nginx.com TEXT_PARTIAL_WRITING, 11241157Smax.romanov@nginx.com TEXT_PARTIAL_READY, 11251157Smax.romanov@nginx.com TEXT_FULL_WRITING 11261157Smax.romanov@nginx.com } 11271157Smax.romanov@nginx.com 11281157Smax.romanov@nginx.com 11291157Smax.romanov@nginx.com private static class StateMachine { 11301157Smax.romanov@nginx.com private State state = State.OPEN; 11311157Smax.romanov@nginx.com streamStart()11321157Smax.romanov@nginx.com public synchronized void streamStart() { 11331157Smax.romanov@nginx.com checkState(State.OPEN); 11341157Smax.romanov@nginx.com state = State.STREAM_WRITING; 11351157Smax.romanov@nginx.com } 11361157Smax.romanov@nginx.com writeStart()11371157Smax.romanov@nginx.com public synchronized void writeStart() { 11381157Smax.romanov@nginx.com checkState(State.OPEN); 11391157Smax.romanov@nginx.com state = State.WRITER_WRITING; 11401157Smax.romanov@nginx.com } 11411157Smax.romanov@nginx.com binaryPartialStart()11421157Smax.romanov@nginx.com public synchronized void binaryPartialStart() { 11431157Smax.romanov@nginx.com checkState(State.OPEN, State.BINARY_PARTIAL_READY); 11441157Smax.romanov@nginx.com state = State.BINARY_PARTIAL_WRITING; 11451157Smax.romanov@nginx.com } 11461157Smax.romanov@nginx.com binaryStart()11471157Smax.romanov@nginx.com public synchronized void binaryStart() { 11481157Smax.romanov@nginx.com checkState(State.OPEN); 11491157Smax.romanov@nginx.com state = State.BINARY_FULL_WRITING; 11501157Smax.romanov@nginx.com } 11511157Smax.romanov@nginx.com textPartialStart()11521157Smax.romanov@nginx.com public synchronized void textPartialStart() { 11531157Smax.romanov@nginx.com checkState(State.OPEN, State.TEXT_PARTIAL_READY); 11541157Smax.romanov@nginx.com state = State.TEXT_PARTIAL_WRITING; 11551157Smax.romanov@nginx.com } 11561157Smax.romanov@nginx.com textStart()11571157Smax.romanov@nginx.com public synchronized void textStart() { 11581157Smax.romanov@nginx.com checkState(State.OPEN); 11591157Smax.romanov@nginx.com state = State.TEXT_FULL_WRITING; 11601157Smax.romanov@nginx.com } 11611157Smax.romanov@nginx.com complete(boolean last)11621157Smax.romanov@nginx.com public synchronized void complete(boolean last) { 11631157Smax.romanov@nginx.com if (last) { 11641157Smax.romanov@nginx.com checkState(State.TEXT_PARTIAL_WRITING, State.TEXT_FULL_WRITING, 11651157Smax.romanov@nginx.com State.BINARY_PARTIAL_WRITING, State.BINARY_FULL_WRITING, 11661157Smax.romanov@nginx.com State.STREAM_WRITING, State.WRITER_WRITING); 11671157Smax.romanov@nginx.com state = State.OPEN; 11681157Smax.romanov@nginx.com } else { 11691157Smax.romanov@nginx.com checkState(State.TEXT_PARTIAL_WRITING, State.BINARY_PARTIAL_WRITING, 11701157Smax.romanov@nginx.com State.STREAM_WRITING, State.WRITER_WRITING); 11711157Smax.romanov@nginx.com if (state == State.TEXT_PARTIAL_WRITING) { 11721157Smax.romanov@nginx.com state = State.TEXT_PARTIAL_READY; 11731157Smax.romanov@nginx.com } else if (state == State.BINARY_PARTIAL_WRITING){ 11741157Smax.romanov@nginx.com state = State.BINARY_PARTIAL_READY; 11751157Smax.romanov@nginx.com } else if (state == State.WRITER_WRITING) { 11761157Smax.romanov@nginx.com // NO-OP. Leave state as is. 11771157Smax.romanov@nginx.com } else if (state == State.STREAM_WRITING) { 1178*2078Salx.manpages@gmail.com // NO-OP. Leave state as is. 11791157Smax.romanov@nginx.com } else { 11801157Smax.romanov@nginx.com // Should never happen 11811157Smax.romanov@nginx.com // The if ... else ... blocks above should cover all states 11821157Smax.romanov@nginx.com // permitted by the preceding checkState() call 11831157Smax.romanov@nginx.com throw new IllegalStateException( 11841157Smax.romanov@nginx.com "BUG: This code should never be called"); 11851157Smax.romanov@nginx.com } 11861157Smax.romanov@nginx.com } 11871157Smax.romanov@nginx.com } 11881157Smax.romanov@nginx.com checkState(State... required)11891157Smax.romanov@nginx.com private void checkState(State... required) { 11901157Smax.romanov@nginx.com for (State state : required) { 11911157Smax.romanov@nginx.com if (this.state == state) { 11921157Smax.romanov@nginx.com return; 11931157Smax.romanov@nginx.com } 11941157Smax.romanov@nginx.com } 11951157Smax.romanov@nginx.com throw new IllegalStateException( 11961157Smax.romanov@nginx.com sm.getString("wsRemoteEndpoint.wrongState", this.state)); 11971157Smax.romanov@nginx.com } 11981157Smax.romanov@nginx.com } 11991157Smax.romanov@nginx.com 12001157Smax.romanov@nginx.com 12011157Smax.romanov@nginx.com private static class StateUpdateSendHandler implements SendHandler { 12021157Smax.romanov@nginx.com 12031157Smax.romanov@nginx.com private final SendHandler handler; 12041157Smax.romanov@nginx.com private final StateMachine stateMachine; 12051157Smax.romanov@nginx.com StateUpdateSendHandler(SendHandler handler, StateMachine stateMachine)12061157Smax.romanov@nginx.com public StateUpdateSendHandler(SendHandler handler, StateMachine stateMachine) { 12071157Smax.romanov@nginx.com this.handler = handler; 12081157Smax.romanov@nginx.com this.stateMachine = stateMachine; 12091157Smax.romanov@nginx.com } 12101157Smax.romanov@nginx.com 12111157Smax.romanov@nginx.com @Override onResult(SendResult result)12121157Smax.romanov@nginx.com public void onResult(SendResult result) { 12131157Smax.romanov@nginx.com if (result.isOK()) { 12141157Smax.romanov@nginx.com stateMachine.complete(true); 12151157Smax.romanov@nginx.com } 12161157Smax.romanov@nginx.com handler.onResult(result); 12171157Smax.romanov@nginx.com } 12181157Smax.romanov@nginx.com } 12191157Smax.romanov@nginx.com 12201157Smax.romanov@nginx.com 12211157Smax.romanov@nginx.com private static class BlockingSendHandler implements SendHandler { 12221157Smax.romanov@nginx.com 12231157Smax.romanov@nginx.com private SendResult sendResult = null; 12241157Smax.romanov@nginx.com 12251157Smax.romanov@nginx.com @Override onResult(SendResult result)12261157Smax.romanov@nginx.com public void onResult(SendResult result) { 12271157Smax.romanov@nginx.com sendResult = result; 12281157Smax.romanov@nginx.com } 12291157Smax.romanov@nginx.com getSendResult()12301157Smax.romanov@nginx.com public SendResult getSendResult() { 12311157Smax.romanov@nginx.com return sendResult; 12321157Smax.romanov@nginx.com } 12331157Smax.romanov@nginx.com } 12341157Smax.romanov@nginx.com } 1235