11157Smax.romanov@nginx.com /*
21157Smax.romanov@nginx.com  *  Licensed to the Apache Software Foundation (ASF) under one or more
31157Smax.romanov@nginx.com  *  contributor license agreements.  See the NOTICE file distributed with
41157Smax.romanov@nginx.com  *  this work for additional information regarding copyright ownership.
51157Smax.romanov@nginx.com  *  The ASF licenses this file to You under the Apache License, Version 2.0
61157Smax.romanov@nginx.com  *  (the "License"); you may not use this file except in compliance with
71157Smax.romanov@nginx.com  *  the License.  You may obtain a copy of the License at
81157Smax.romanov@nginx.com  *
91157Smax.romanov@nginx.com  *      http://www.apache.org/licenses/LICENSE-2.0
101157Smax.romanov@nginx.com  *
111157Smax.romanov@nginx.com  *  Unless required by applicable law or agreed to in writing, software
121157Smax.romanov@nginx.com  *  distributed under the License is distributed on an "AS IS" BASIS,
131157Smax.romanov@nginx.com  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
141157Smax.romanov@nginx.com  *  See the License for the specific language governing permissions and
151157Smax.romanov@nginx.com  *  limitations under the License.
161157Smax.romanov@nginx.com  */
171157Smax.romanov@nginx.com package nginx.unit.websocket;
181157Smax.romanov@nginx.com 
191157Smax.romanov@nginx.com import java.io.IOException;
201157Smax.romanov@nginx.com import java.io.OutputStream;
211157Smax.romanov@nginx.com import java.io.Writer;
221157Smax.romanov@nginx.com import java.net.SocketTimeoutException;
231157Smax.romanov@nginx.com import java.nio.ByteBuffer;
241157Smax.romanov@nginx.com import java.nio.CharBuffer;
251157Smax.romanov@nginx.com import java.nio.charset.CharsetEncoder;
261157Smax.romanov@nginx.com import java.nio.charset.CoderResult;
271157Smax.romanov@nginx.com import java.util.ArrayDeque;
281157Smax.romanov@nginx.com import java.util.ArrayList;
291157Smax.romanov@nginx.com import java.util.List;
301157Smax.romanov@nginx.com import java.util.Queue;
311157Smax.romanov@nginx.com import java.util.concurrent.Future;
321157Smax.romanov@nginx.com import java.util.concurrent.Semaphore;
331157Smax.romanov@nginx.com import java.util.concurrent.TimeUnit;
341157Smax.romanov@nginx.com import java.util.concurrent.atomic.AtomicBoolean;
351157Smax.romanov@nginx.com 
361157Smax.romanov@nginx.com import javax.websocket.CloseReason;
371157Smax.romanov@nginx.com import javax.websocket.CloseReason.CloseCodes;
381157Smax.romanov@nginx.com import javax.websocket.DeploymentException;
391157Smax.romanov@nginx.com import javax.websocket.EncodeException;
401157Smax.romanov@nginx.com import javax.websocket.Encoder;
411157Smax.romanov@nginx.com import javax.websocket.EndpointConfig;
421157Smax.romanov@nginx.com import javax.websocket.RemoteEndpoint;
431157Smax.romanov@nginx.com import javax.websocket.SendHandler;
441157Smax.romanov@nginx.com import javax.websocket.SendResult;
451157Smax.romanov@nginx.com 
461157Smax.romanov@nginx.com import org.apache.juli.logging.Log;
471157Smax.romanov@nginx.com import org.apache.juli.logging.LogFactory;
481157Smax.romanov@nginx.com import org.apache.tomcat.util.buf.Utf8Encoder;
491157Smax.romanov@nginx.com import org.apache.tomcat.util.res.StringManager;
501157Smax.romanov@nginx.com 
511157Smax.romanov@nginx.com import nginx.unit.Request;
521157Smax.romanov@nginx.com 
531157Smax.romanov@nginx.com public abstract class WsRemoteEndpointImplBase implements RemoteEndpoint {
541157Smax.romanov@nginx.com 
551157Smax.romanov@nginx.com     private static final StringManager sm =
561157Smax.romanov@nginx.com             StringManager.getManager(WsRemoteEndpointImplBase.class);
571157Smax.romanov@nginx.com 
581157Smax.romanov@nginx.com     protected static final SendResult SENDRESULT_OK = new SendResult();
591157Smax.romanov@nginx.com 
601157Smax.romanov@nginx.com     private final Log log = LogFactory.getLog(WsRemoteEndpointImplBase.class); // must not be static
611157Smax.romanov@nginx.com 
621157Smax.romanov@nginx.com     private final StateMachine stateMachine = new StateMachine();
631157Smax.romanov@nginx.com 
641157Smax.romanov@nginx.com     private final IntermediateMessageHandler intermediateMessageHandler =
651157Smax.romanov@nginx.com             new IntermediateMessageHandler(this);
661157Smax.romanov@nginx.com 
671157Smax.romanov@nginx.com     private Transformation transformation = null;
681157Smax.romanov@nginx.com     private final Semaphore messagePartInProgress = new Semaphore(1);
691157Smax.romanov@nginx.com     private final Queue<MessagePart> messagePartQueue = new ArrayDeque<>();
701157Smax.romanov@nginx.com     private final Object messagePartLock = new Object();
711157Smax.romanov@nginx.com 
721157Smax.romanov@nginx.com     // State
731157Smax.romanov@nginx.com     private volatile boolean closed = false;
741157Smax.romanov@nginx.com     private boolean fragmented = false;
751157Smax.romanov@nginx.com     private boolean nextFragmented = false;
761157Smax.romanov@nginx.com     private boolean text = false;
771157Smax.romanov@nginx.com     private boolean nextText = false;
781157Smax.romanov@nginx.com 
791157Smax.romanov@nginx.com     // Max size of WebSocket header is 14 bytes
801157Smax.romanov@nginx.com     private final ByteBuffer headerBuffer = ByteBuffer.allocate(14);
811157Smax.romanov@nginx.com     private final ByteBuffer outputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
821157Smax.romanov@nginx.com     private final CharsetEncoder encoder = new Utf8Encoder();
831157Smax.romanov@nginx.com     private final ByteBuffer encoderBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
841157Smax.romanov@nginx.com     private final AtomicBoolean batchingAllowed = new AtomicBoolean(false);
851157Smax.romanov@nginx.com     private volatile long sendTimeout = -1;
861157Smax.romanov@nginx.com     private WsSession wsSession;
871157Smax.romanov@nginx.com     private List<EncoderEntry> encoderEntries = new ArrayList<>();
881157Smax.romanov@nginx.com 
891157Smax.romanov@nginx.com     private Request request;
901157Smax.romanov@nginx.com 
911157Smax.romanov@nginx.com 
setTransformation(Transformation transformation)921157Smax.romanov@nginx.com     protected void setTransformation(Transformation transformation) {
931157Smax.romanov@nginx.com         this.transformation = transformation;
941157Smax.romanov@nginx.com     }
951157Smax.romanov@nginx.com 
961157Smax.romanov@nginx.com 
getSendTimeout()971157Smax.romanov@nginx.com     public long getSendTimeout() {
981157Smax.romanov@nginx.com         return sendTimeout;
991157Smax.romanov@nginx.com     }
1001157Smax.romanov@nginx.com 
1011157Smax.romanov@nginx.com 
setSendTimeout(long timeout)1021157Smax.romanov@nginx.com     public void setSendTimeout(long timeout) {
1031157Smax.romanov@nginx.com         this.sendTimeout = timeout;
1041157Smax.romanov@nginx.com     }
1051157Smax.romanov@nginx.com 
1061157Smax.romanov@nginx.com 
1071157Smax.romanov@nginx.com     @Override
setBatchingAllowed(boolean batchingAllowed)1081157Smax.romanov@nginx.com     public void setBatchingAllowed(boolean batchingAllowed) throws IOException {
1091157Smax.romanov@nginx.com         boolean oldValue = this.batchingAllowed.getAndSet(batchingAllowed);
1101157Smax.romanov@nginx.com 
1111157Smax.romanov@nginx.com         if (oldValue && !batchingAllowed) {
1121157Smax.romanov@nginx.com             flushBatch();
1131157Smax.romanov@nginx.com         }
1141157Smax.romanov@nginx.com     }
1151157Smax.romanov@nginx.com 
1161157Smax.romanov@nginx.com 
1171157Smax.romanov@nginx.com     @Override
getBatchingAllowed()1181157Smax.romanov@nginx.com     public boolean getBatchingAllowed() {
1191157Smax.romanov@nginx.com         return batchingAllowed.get();
1201157Smax.romanov@nginx.com     }
1211157Smax.romanov@nginx.com 
1221157Smax.romanov@nginx.com 
1231157Smax.romanov@nginx.com     @Override
flushBatch()1241157Smax.romanov@nginx.com     public void flushBatch() throws IOException {
1251157Smax.romanov@nginx.com         sendMessageBlock(Constants.INTERNAL_OPCODE_FLUSH, null, true);
1261157Smax.romanov@nginx.com     }
1271157Smax.romanov@nginx.com 
1281157Smax.romanov@nginx.com 
sendBytes(ByteBuffer data)1291157Smax.romanov@nginx.com     public void sendBytes(ByteBuffer data) throws IOException {
1301157Smax.romanov@nginx.com         if (data == null) {
1311157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
1321157Smax.romanov@nginx.com         }
1331157Smax.romanov@nginx.com         stateMachine.binaryStart();
1341157Smax.romanov@nginx.com         sendMessageBlock(Constants.OPCODE_BINARY, data, true);
1351157Smax.romanov@nginx.com         stateMachine.complete(true);
1361157Smax.romanov@nginx.com     }
1371157Smax.romanov@nginx.com 
1381157Smax.romanov@nginx.com 
sendBytesByFuture(ByteBuffer data)1391157Smax.romanov@nginx.com     public Future<Void> sendBytesByFuture(ByteBuffer data) {
1401157Smax.romanov@nginx.com         FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
1411157Smax.romanov@nginx.com         sendBytesByCompletion(data, f2sh);
1421157Smax.romanov@nginx.com         return f2sh;
1431157Smax.romanov@nginx.com     }
1441157Smax.romanov@nginx.com 
1451157Smax.romanov@nginx.com 
sendBytesByCompletion(ByteBuffer data, SendHandler handler)1461157Smax.romanov@nginx.com     public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) {
1471157Smax.romanov@nginx.com         if (data == null) {
1481157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
1491157Smax.romanov@nginx.com         }
1501157Smax.romanov@nginx.com         if (handler == null) {
1511157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler"));
1521157Smax.romanov@nginx.com         }
1531157Smax.romanov@nginx.com         StateUpdateSendHandler sush = new StateUpdateSendHandler(handler, stateMachine);
1541157Smax.romanov@nginx.com         stateMachine.binaryStart();
1551157Smax.romanov@nginx.com         startMessage(Constants.OPCODE_BINARY, data, true, sush);
1561157Smax.romanov@nginx.com     }
1571157Smax.romanov@nginx.com 
1581157Smax.romanov@nginx.com 
sendPartialBytes(ByteBuffer partialByte, boolean last)1591157Smax.romanov@nginx.com     public void sendPartialBytes(ByteBuffer partialByte, boolean last)
1601157Smax.romanov@nginx.com             throws IOException {
1611157Smax.romanov@nginx.com         if (partialByte == null) {
1621157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
1631157Smax.romanov@nginx.com         }
1641157Smax.romanov@nginx.com         stateMachine.binaryPartialStart();
1651157Smax.romanov@nginx.com         sendMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
1661157Smax.romanov@nginx.com         stateMachine.complete(last);
1671157Smax.romanov@nginx.com     }
1681157Smax.romanov@nginx.com 
1691157Smax.romanov@nginx.com 
1701157Smax.romanov@nginx.com     @Override
sendPing(ByteBuffer applicationData)1711157Smax.romanov@nginx.com     public void sendPing(ByteBuffer applicationData) throws IOException,
1721157Smax.romanov@nginx.com             IllegalArgumentException {
1731157Smax.romanov@nginx.com         if (applicationData.remaining() > 125) {
1741157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.tooMuchData"));
1751157Smax.romanov@nginx.com         }
1761157Smax.romanov@nginx.com         sendMessageBlock(Constants.OPCODE_PING, applicationData, true);
1771157Smax.romanov@nginx.com     }
1781157Smax.romanov@nginx.com 
1791157Smax.romanov@nginx.com 
1801157Smax.romanov@nginx.com     @Override
sendPong(ByteBuffer applicationData)1811157Smax.romanov@nginx.com     public void sendPong(ByteBuffer applicationData) throws IOException,
1821157Smax.romanov@nginx.com             IllegalArgumentException {
1831157Smax.romanov@nginx.com         if (applicationData.remaining() > 125) {
1841157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.tooMuchData"));
1851157Smax.romanov@nginx.com         }
1861157Smax.romanov@nginx.com         sendMessageBlock(Constants.OPCODE_PONG, applicationData, true);
1871157Smax.romanov@nginx.com     }
1881157Smax.romanov@nginx.com 
1891157Smax.romanov@nginx.com 
sendString(String text)1901157Smax.romanov@nginx.com     public void sendString(String text) throws IOException {
1911157Smax.romanov@nginx.com         if (text == null) {
1921157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
1931157Smax.romanov@nginx.com         }
1941157Smax.romanov@nginx.com         stateMachine.textStart();
1951157Smax.romanov@nginx.com         sendMessageBlock(CharBuffer.wrap(text), true);
1961157Smax.romanov@nginx.com     }
1971157Smax.romanov@nginx.com 
1981157Smax.romanov@nginx.com 
sendStringByFuture(String text)1991157Smax.romanov@nginx.com     public Future<Void> sendStringByFuture(String text) {
2001157Smax.romanov@nginx.com         FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
2011157Smax.romanov@nginx.com         sendStringByCompletion(text, f2sh);
2021157Smax.romanov@nginx.com         return f2sh;
2031157Smax.romanov@nginx.com     }
2041157Smax.romanov@nginx.com 
2051157Smax.romanov@nginx.com 
sendStringByCompletion(String text, SendHandler handler)2061157Smax.romanov@nginx.com     public void sendStringByCompletion(String text, SendHandler handler) {
2071157Smax.romanov@nginx.com         if (text == null) {
2081157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
2091157Smax.romanov@nginx.com         }
2101157Smax.romanov@nginx.com         if (handler == null) {
2111157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler"));
2121157Smax.romanov@nginx.com         }
2131157Smax.romanov@nginx.com         stateMachine.textStart();
2141157Smax.romanov@nginx.com         TextMessageSendHandler tmsh = new TextMessageSendHandler(handler,
2151157Smax.romanov@nginx.com                 CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
2161157Smax.romanov@nginx.com         tmsh.write();
2171157Smax.romanov@nginx.com         // TextMessageSendHandler will update stateMachine when it completes
2181157Smax.romanov@nginx.com     }
2191157Smax.romanov@nginx.com 
2201157Smax.romanov@nginx.com 
sendPartialString(String fragment, boolean isLast)2211157Smax.romanov@nginx.com     public void sendPartialString(String fragment, boolean isLast)
2221157Smax.romanov@nginx.com             throws IOException {
2231157Smax.romanov@nginx.com         if (fragment == null) {
2241157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
2251157Smax.romanov@nginx.com         }
2261157Smax.romanov@nginx.com         stateMachine.textPartialStart();
2271157Smax.romanov@nginx.com         sendMessageBlock(CharBuffer.wrap(fragment), isLast);
2281157Smax.romanov@nginx.com     }
2291157Smax.romanov@nginx.com 
2301157Smax.romanov@nginx.com 
getSendStream()2311157Smax.romanov@nginx.com     public OutputStream getSendStream() {
2321157Smax.romanov@nginx.com         stateMachine.streamStart();
2331157Smax.romanov@nginx.com         return new WsOutputStream(this);
2341157Smax.romanov@nginx.com     }
2351157Smax.romanov@nginx.com 
2361157Smax.romanov@nginx.com 
getSendWriter()2371157Smax.romanov@nginx.com     public Writer getSendWriter() {
2381157Smax.romanov@nginx.com         stateMachine.writeStart();
2391157Smax.romanov@nginx.com         return new WsWriter(this);
2401157Smax.romanov@nginx.com     }
2411157Smax.romanov@nginx.com 
2421157Smax.romanov@nginx.com 
sendMessageBlock(CharBuffer part, boolean last)2431157Smax.romanov@nginx.com     void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
2441157Smax.romanov@nginx.com         long timeoutExpiry = getTimeoutExpiry();
2451157Smax.romanov@nginx.com         boolean isDone = false;
2461157Smax.romanov@nginx.com         while (!isDone) {
2471157Smax.romanov@nginx.com             encoderBuffer.clear();
2481157Smax.romanov@nginx.com             CoderResult cr = encoder.encode(part, encoderBuffer, true);
2491157Smax.romanov@nginx.com             if (cr.isError()) {
2501157Smax.romanov@nginx.com                 throw new IllegalArgumentException(cr.toString());
2511157Smax.romanov@nginx.com             }
2521157Smax.romanov@nginx.com             isDone = !cr.isOverflow();
2531157Smax.romanov@nginx.com             encoderBuffer.flip();
2541157Smax.romanov@nginx.com             sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeoutExpiry);
2551157Smax.romanov@nginx.com         }
2561157Smax.romanov@nginx.com         stateMachine.complete(last);
2571157Smax.romanov@nginx.com     }
2581157Smax.romanov@nginx.com 
2591157Smax.romanov@nginx.com 
sendMessageBlock(byte opCode, ByteBuffer payload, boolean last)2601157Smax.romanov@nginx.com     void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last)
2611157Smax.romanov@nginx.com             throws IOException {
2621157Smax.romanov@nginx.com         sendMessageBlock(opCode, payload, last, getTimeoutExpiry());
2631157Smax.romanov@nginx.com     }
2641157Smax.romanov@nginx.com 
2651157Smax.romanov@nginx.com 
getTimeoutExpiry()2661157Smax.romanov@nginx.com     private long getTimeoutExpiry() {
2671157Smax.romanov@nginx.com         // Get the timeout before we send the message. The message may
2681157Smax.romanov@nginx.com         // trigger a session close and depending on timing the client
2691157Smax.romanov@nginx.com         // session may close before we can read the timeout.
2701157Smax.romanov@nginx.com         long timeout = getBlockingSendTimeout();
2711157Smax.romanov@nginx.com         if (timeout < 0) {
2721157Smax.romanov@nginx.com             return Long.MAX_VALUE;
2731157Smax.romanov@nginx.com         } else {
2741157Smax.romanov@nginx.com             return System.currentTimeMillis() + timeout;
2751157Smax.romanov@nginx.com         }
2761157Smax.romanov@nginx.com     }
2771157Smax.romanov@nginx.com 
2781157Smax.romanov@nginx.com     private byte currentOpCode = Constants.OPCODE_CONTINUATION;
2791157Smax.romanov@nginx.com 
sendMessageBlock(byte opCode, ByteBuffer payload, boolean last, long timeoutExpiry)2801157Smax.romanov@nginx.com     private void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last,
2811157Smax.romanov@nginx.com             long timeoutExpiry) throws IOException {
2821157Smax.romanov@nginx.com         wsSession.updateLastActive();
2831157Smax.romanov@nginx.com 
2841157Smax.romanov@nginx.com         if (opCode == currentOpCode) {
2851157Smax.romanov@nginx.com             opCode = Constants.OPCODE_CONTINUATION;
2861157Smax.romanov@nginx.com         }
2871157Smax.romanov@nginx.com 
2881157Smax.romanov@nginx.com         request.sendWsFrame(payload, opCode, last, timeoutExpiry);
2891157Smax.romanov@nginx.com 
2901157Smax.romanov@nginx.com         if (!last && opCode != Constants.OPCODE_CONTINUATION) {
2911157Smax.romanov@nginx.com             currentOpCode = opCode;
2921157Smax.romanov@nginx.com         }
2931157Smax.romanov@nginx.com 
2941157Smax.romanov@nginx.com         if (last && opCode == Constants.OPCODE_CONTINUATION) {
2951157Smax.romanov@nginx.com             currentOpCode = Constants.OPCODE_CONTINUATION;
2961157Smax.romanov@nginx.com         }
2971157Smax.romanov@nginx.com     }
2981157Smax.romanov@nginx.com 
2991157Smax.romanov@nginx.com 
startMessage(byte opCode, ByteBuffer payload, boolean last, SendHandler handler)3001157Smax.romanov@nginx.com     void startMessage(byte opCode, ByteBuffer payload, boolean last,
3011157Smax.romanov@nginx.com             SendHandler handler) {
3021157Smax.romanov@nginx.com 
3031157Smax.romanov@nginx.com         wsSession.updateLastActive();
3041157Smax.romanov@nginx.com 
3051157Smax.romanov@nginx.com         List<MessagePart> messageParts = new ArrayList<>();
3061157Smax.romanov@nginx.com         messageParts.add(new MessagePart(last, 0, opCode, payload,
3071157Smax.romanov@nginx.com                 intermediateMessageHandler,
3081157Smax.romanov@nginx.com                 new EndMessageHandler(this, handler), -1));
3091157Smax.romanov@nginx.com 
3101157Smax.romanov@nginx.com         messageParts = transformation.sendMessagePart(messageParts);
3111157Smax.romanov@nginx.com 
3121157Smax.romanov@nginx.com         // Some extensions/transformations may buffer messages so it is possible
3131157Smax.romanov@nginx.com         // that no message parts will be returned. If this is the case the
3141157Smax.romanov@nginx.com         // trigger the supplied SendHandler
3151157Smax.romanov@nginx.com         if (messageParts.size() == 0) {
3161157Smax.romanov@nginx.com             handler.onResult(new SendResult());
3171157Smax.romanov@nginx.com             return;
3181157Smax.romanov@nginx.com         }
3191157Smax.romanov@nginx.com 
3201157Smax.romanov@nginx.com         MessagePart mp = messageParts.remove(0);
3211157Smax.romanov@nginx.com 
3221157Smax.romanov@nginx.com         boolean doWrite = false;
3231157Smax.romanov@nginx.com         synchronized (messagePartLock) {
3241157Smax.romanov@nginx.com             if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed()) {
3251157Smax.romanov@nginx.com                 // Should not happen. To late to send batched messages now since
3261157Smax.romanov@nginx.com                 // the session has been closed. Complain loudly.
3271157Smax.romanov@nginx.com                 log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
3281157Smax.romanov@nginx.com             }
3291157Smax.romanov@nginx.com             if (messagePartInProgress.tryAcquire()) {
3301157Smax.romanov@nginx.com                 doWrite = true;
3311157Smax.romanov@nginx.com             } else {
3321157Smax.romanov@nginx.com                 // When a control message is sent while another message is being
3331157Smax.romanov@nginx.com                 // sent, the control message is queued. Chances are the
3341157Smax.romanov@nginx.com                 // subsequent data message part will end up queued while the
3351157Smax.romanov@nginx.com                 // control message is sent. The logic in this class (state
3361157Smax.romanov@nginx.com                 // machine, EndMessageHandler, TextMessageSendHandler) ensures
3371157Smax.romanov@nginx.com                 // that there will only ever be one data message part in the
3381157Smax.romanov@nginx.com                 // queue. There could be multiple control messages in the queue.
3391157Smax.romanov@nginx.com 
3401157Smax.romanov@nginx.com                 // Add it to the queue
3411157Smax.romanov@nginx.com                 messagePartQueue.add(mp);
3421157Smax.romanov@nginx.com             }
3431157Smax.romanov@nginx.com             // Add any remaining messages to the queue
3441157Smax.romanov@nginx.com             messagePartQueue.addAll(messageParts);
3451157Smax.romanov@nginx.com         }
3461157Smax.romanov@nginx.com         if (doWrite) {
3471157Smax.romanov@nginx.com             // Actual write has to be outside sync block to avoid possible
3481157Smax.romanov@nginx.com             // deadlock between messagePartLock and writeLock in
3491157Smax.romanov@nginx.com             // o.a.coyote.http11.upgrade.AbstractServletOutputStream
3501157Smax.romanov@nginx.com             writeMessagePart(mp);
3511157Smax.romanov@nginx.com         }
3521157Smax.romanov@nginx.com     }
3531157Smax.romanov@nginx.com 
3541157Smax.romanov@nginx.com 
endMessage(SendHandler handler, SendResult result)3551157Smax.romanov@nginx.com     void endMessage(SendHandler handler, SendResult result) {
3561157Smax.romanov@nginx.com         boolean doWrite = false;
3571157Smax.romanov@nginx.com         MessagePart mpNext = null;
3581157Smax.romanov@nginx.com         synchronized (messagePartLock) {
3591157Smax.romanov@nginx.com 
3601157Smax.romanov@nginx.com             fragmented = nextFragmented;
3611157Smax.romanov@nginx.com             text = nextText;
3621157Smax.romanov@nginx.com 
3631157Smax.romanov@nginx.com             mpNext = messagePartQueue.poll();
3641157Smax.romanov@nginx.com             if (mpNext == null) {
3651157Smax.romanov@nginx.com                 messagePartInProgress.release();
3661157Smax.romanov@nginx.com             } else if (!closed){
3671157Smax.romanov@nginx.com                 // Session may have been closed unexpectedly in the middle of
3681157Smax.romanov@nginx.com                 // sending a fragmented message closing the endpoint. If this
3691157Smax.romanov@nginx.com                 // happens, clearly there is no point trying to send the rest of
3701157Smax.romanov@nginx.com                 // the message.
3711157Smax.romanov@nginx.com                 doWrite = true;
3721157Smax.romanov@nginx.com             }
3731157Smax.romanov@nginx.com         }
3741157Smax.romanov@nginx.com         if (doWrite) {
3751157Smax.romanov@nginx.com             // Actual write has to be outside sync block to avoid possible
3761157Smax.romanov@nginx.com             // deadlock between messagePartLock and writeLock in
3771157Smax.romanov@nginx.com             // o.a.coyote.http11.upgrade.AbstractServletOutputStream
3781157Smax.romanov@nginx.com             writeMessagePart(mpNext);
3791157Smax.romanov@nginx.com         }
3801157Smax.romanov@nginx.com 
3811157Smax.romanov@nginx.com         wsSession.updateLastActive();
3821157Smax.romanov@nginx.com 
3831157Smax.romanov@nginx.com         // Some handlers, such as the IntermediateMessageHandler, do not have a
3841157Smax.romanov@nginx.com         // nested handler so handler may be null.
3851157Smax.romanov@nginx.com         if (handler != null) {
3861157Smax.romanov@nginx.com             handler.onResult(result);
3871157Smax.romanov@nginx.com         }
3881157Smax.romanov@nginx.com     }
3891157Smax.romanov@nginx.com 
3901157Smax.romanov@nginx.com 
writeMessagePart(MessagePart mp)3911157Smax.romanov@nginx.com     void writeMessagePart(MessagePart mp) {
3921157Smax.romanov@nginx.com         if (closed) {
3931157Smax.romanov@nginx.com             throw new IllegalStateException(
3941157Smax.romanov@nginx.com                     sm.getString("wsRemoteEndpoint.closed"));
3951157Smax.romanov@nginx.com         }
3961157Smax.romanov@nginx.com 
3971157Smax.romanov@nginx.com         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
3981157Smax.romanov@nginx.com             nextFragmented = fragmented;
3991157Smax.romanov@nginx.com             nextText = text;
4001157Smax.romanov@nginx.com             outputBuffer.flip();
4011157Smax.romanov@nginx.com             SendHandler flushHandler = new OutputBufferFlushSendHandler(
4021157Smax.romanov@nginx.com                     outputBuffer, mp.getEndHandler());
4031157Smax.romanov@nginx.com             doWrite(flushHandler, mp.getBlockingWriteTimeoutExpiry(), outputBuffer);
4041157Smax.romanov@nginx.com             return;
4051157Smax.romanov@nginx.com         }
4061157Smax.romanov@nginx.com 
4071157Smax.romanov@nginx.com         // Control messages may be sent in the middle of fragmented message
4081157Smax.romanov@nginx.com         // so they have no effect on the fragmented or text flags
4091157Smax.romanov@nginx.com         boolean first;
4101157Smax.romanov@nginx.com         if (Util.isControl(mp.getOpCode())) {
4111157Smax.romanov@nginx.com             nextFragmented = fragmented;
4121157Smax.romanov@nginx.com             nextText = text;
4131157Smax.romanov@nginx.com             if (mp.getOpCode() == Constants.OPCODE_CLOSE) {
4141157Smax.romanov@nginx.com                 closed = true;
4151157Smax.romanov@nginx.com             }
4161157Smax.romanov@nginx.com             first = true;
4171157Smax.romanov@nginx.com         } else {
4181157Smax.romanov@nginx.com             boolean isText = Util.isText(mp.getOpCode());
4191157Smax.romanov@nginx.com 
4201157Smax.romanov@nginx.com             if (fragmented) {
4211157Smax.romanov@nginx.com                 // Currently fragmented
4221157Smax.romanov@nginx.com                 if (text != isText) {
4231157Smax.romanov@nginx.com                     throw new IllegalStateException(
4241157Smax.romanov@nginx.com                             sm.getString("wsRemoteEndpoint.changeType"));
4251157Smax.romanov@nginx.com                 }
4261157Smax.romanov@nginx.com                 nextText = text;
4271157Smax.romanov@nginx.com                 nextFragmented = !mp.isFin();
4281157Smax.romanov@nginx.com                 first = false;
4291157Smax.romanov@nginx.com             } else {
4301157Smax.romanov@nginx.com                 // Wasn't fragmented. Might be now
4311157Smax.romanov@nginx.com                 if (mp.isFin()) {
4321157Smax.romanov@nginx.com                     nextFragmented = false;
4331157Smax.romanov@nginx.com                 } else {
4341157Smax.romanov@nginx.com                     nextFragmented = true;
4351157Smax.romanov@nginx.com                     nextText = isText;
4361157Smax.romanov@nginx.com                 }
4371157Smax.romanov@nginx.com                 first = true;
4381157Smax.romanov@nginx.com             }
4391157Smax.romanov@nginx.com         }
4401157Smax.romanov@nginx.com 
4411157Smax.romanov@nginx.com         byte[] mask;
4421157Smax.romanov@nginx.com 
4431157Smax.romanov@nginx.com         if (isMasked()) {
4441157Smax.romanov@nginx.com             mask = Util.generateMask();
4451157Smax.romanov@nginx.com         } else {
4461157Smax.romanov@nginx.com             mask = null;
4471157Smax.romanov@nginx.com         }
4481157Smax.romanov@nginx.com 
4491157Smax.romanov@nginx.com         headerBuffer.clear();
4501157Smax.romanov@nginx.com         writeHeader(headerBuffer, mp.isFin(), mp.getRsv(), mp.getOpCode(),
4511157Smax.romanov@nginx.com                 isMasked(), mp.getPayload(), mask, first);
4521157Smax.romanov@nginx.com         headerBuffer.flip();
4531157Smax.romanov@nginx.com 
4541157Smax.romanov@nginx.com         if (getBatchingAllowed() || isMasked()) {
4551157Smax.romanov@nginx.com             // Need to write via output buffer
4561157Smax.romanov@nginx.com             OutputBufferSendHandler obsh = new OutputBufferSendHandler(
4571157Smax.romanov@nginx.com                     mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
4581157Smax.romanov@nginx.com                     headerBuffer, mp.getPayload(), mask,
4591157Smax.romanov@nginx.com                     outputBuffer, !getBatchingAllowed(), this);
4601157Smax.romanov@nginx.com             obsh.write();
4611157Smax.romanov@nginx.com         } else {
4621157Smax.romanov@nginx.com             // Can write directly
4631157Smax.romanov@nginx.com             doWrite(mp.getEndHandler(), mp.getBlockingWriteTimeoutExpiry(),
4641157Smax.romanov@nginx.com                     headerBuffer, mp.getPayload());
4651157Smax.romanov@nginx.com         }
4661157Smax.romanov@nginx.com     }
4671157Smax.romanov@nginx.com 
4681157Smax.romanov@nginx.com 
getBlockingSendTimeout()4691157Smax.romanov@nginx.com     private long getBlockingSendTimeout() {
4701157Smax.romanov@nginx.com         Object obj = wsSession.getUserProperties().get(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY);
4711157Smax.romanov@nginx.com         Long userTimeout = null;
4721157Smax.romanov@nginx.com         if (obj instanceof Long) {
4731157Smax.romanov@nginx.com             userTimeout = (Long) obj;
4741157Smax.romanov@nginx.com         }
4751157Smax.romanov@nginx.com         if (userTimeout == null) {
4761157Smax.romanov@nginx.com             return Constants.DEFAULT_BLOCKING_SEND_TIMEOUT;
4771157Smax.romanov@nginx.com         } else {
4781157Smax.romanov@nginx.com             return userTimeout.longValue();
4791157Smax.romanov@nginx.com         }
4801157Smax.romanov@nginx.com     }
4811157Smax.romanov@nginx.com 
4821157Smax.romanov@nginx.com 
4831157Smax.romanov@nginx.com     /**
4841157Smax.romanov@nginx.com      * Wraps the user provided handler so that the end point is notified when
4851157Smax.romanov@nginx.com      * the message is complete.
4861157Smax.romanov@nginx.com      */
4871157Smax.romanov@nginx.com     private static class EndMessageHandler implements SendHandler {
4881157Smax.romanov@nginx.com 
4891157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
4901157Smax.romanov@nginx.com         private final SendHandler handler;
4911157Smax.romanov@nginx.com 
EndMessageHandler(WsRemoteEndpointImplBase endpoint, SendHandler handler)4921157Smax.romanov@nginx.com         public EndMessageHandler(WsRemoteEndpointImplBase endpoint,
4931157Smax.romanov@nginx.com                 SendHandler handler) {
4941157Smax.romanov@nginx.com             this.endpoint = endpoint;
4951157Smax.romanov@nginx.com             this.handler = handler;
4961157Smax.romanov@nginx.com         }
4971157Smax.romanov@nginx.com 
4981157Smax.romanov@nginx.com 
4991157Smax.romanov@nginx.com         @Override
onResult(SendResult result)5001157Smax.romanov@nginx.com         public void onResult(SendResult result) {
5011157Smax.romanov@nginx.com             endpoint.endMessage(handler, result);
5021157Smax.romanov@nginx.com         }
5031157Smax.romanov@nginx.com     }
5041157Smax.romanov@nginx.com 
5051157Smax.romanov@nginx.com 
5061157Smax.romanov@nginx.com     /**
5071157Smax.romanov@nginx.com      * If a transformation needs to split a {@link MessagePart} into multiple
5081157Smax.romanov@nginx.com      * {@link MessagePart}s, it uses this handler as the end handler for each of
5091157Smax.romanov@nginx.com      * the additional {@link MessagePart}s. This handler notifies this this
5101157Smax.romanov@nginx.com      * class that the {@link MessagePart} has been processed and that the next
5111157Smax.romanov@nginx.com      * {@link MessagePart} in the queue should be started. The final
5121157Smax.romanov@nginx.com      * {@link MessagePart} will use the {@link EndMessageHandler} provided with
5131157Smax.romanov@nginx.com      * the original {@link MessagePart}.
5141157Smax.romanov@nginx.com      */
5151157Smax.romanov@nginx.com     private static class IntermediateMessageHandler implements SendHandler {
5161157Smax.romanov@nginx.com 
5171157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
5181157Smax.romanov@nginx.com 
IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint)5191157Smax.romanov@nginx.com         public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) {
5201157Smax.romanov@nginx.com             this.endpoint = endpoint;
5211157Smax.romanov@nginx.com         }
5221157Smax.romanov@nginx.com 
5231157Smax.romanov@nginx.com 
5241157Smax.romanov@nginx.com         @Override
onResult(SendResult result)5251157Smax.romanov@nginx.com         public void onResult(SendResult result) {
5261157Smax.romanov@nginx.com             endpoint.endMessage(null, result);
5271157Smax.romanov@nginx.com         }
5281157Smax.romanov@nginx.com     }
5291157Smax.romanov@nginx.com 
5301157Smax.romanov@nginx.com 
5311157Smax.romanov@nginx.com     @SuppressWarnings({"unchecked", "rawtypes"})
sendObject(Object obj)5321157Smax.romanov@nginx.com     public void sendObject(Object obj) throws IOException, EncodeException {
5331157Smax.romanov@nginx.com         if (obj == null) {
5341157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
5351157Smax.romanov@nginx.com         }
5361157Smax.romanov@nginx.com         /*
5371157Smax.romanov@nginx.com          * Note that the implementation will convert primitives and their object
5381157Smax.romanov@nginx.com          * equivalents by default but that users are free to specify their own
5391157Smax.romanov@nginx.com          * encoders and decoders for this if they wish.
5401157Smax.romanov@nginx.com          */
5411157Smax.romanov@nginx.com         Encoder encoder = findEncoder(obj);
5421157Smax.romanov@nginx.com         if (encoder == null && Util.isPrimitive(obj.getClass())) {
5431157Smax.romanov@nginx.com             String msg = obj.toString();
5441157Smax.romanov@nginx.com             sendString(msg);
5451157Smax.romanov@nginx.com             return;
5461157Smax.romanov@nginx.com         }
5471157Smax.romanov@nginx.com         if (encoder == null && byte[].class.isAssignableFrom(obj.getClass())) {
5481157Smax.romanov@nginx.com             ByteBuffer msg = ByteBuffer.wrap((byte[]) obj);
5491157Smax.romanov@nginx.com             sendBytes(msg);
5501157Smax.romanov@nginx.com             return;
5511157Smax.romanov@nginx.com         }
5521157Smax.romanov@nginx.com 
5531157Smax.romanov@nginx.com         if (encoder instanceof Encoder.Text) {
5541157Smax.romanov@nginx.com             String msg = ((Encoder.Text) encoder).encode(obj);
5551157Smax.romanov@nginx.com             sendString(msg);
5561157Smax.romanov@nginx.com         } else if (encoder instanceof Encoder.TextStream) {
5571157Smax.romanov@nginx.com             try (Writer w = getSendWriter()) {
5581157Smax.romanov@nginx.com                 ((Encoder.TextStream) encoder).encode(obj, w);
5591157Smax.romanov@nginx.com             }
5601157Smax.romanov@nginx.com         } else if (encoder instanceof Encoder.Binary) {
5611157Smax.romanov@nginx.com             ByteBuffer msg = ((Encoder.Binary) encoder).encode(obj);
5621157Smax.romanov@nginx.com             sendBytes(msg);
5631157Smax.romanov@nginx.com         } else if (encoder instanceof Encoder.BinaryStream) {
5641157Smax.romanov@nginx.com             try (OutputStream os = getSendStream()) {
5651157Smax.romanov@nginx.com                 ((Encoder.BinaryStream) encoder).encode(obj, os);
5661157Smax.romanov@nginx.com             }
5671157Smax.romanov@nginx.com         } else {
5681157Smax.romanov@nginx.com             throw new EncodeException(obj, sm.getString(
5691157Smax.romanov@nginx.com                     "wsRemoteEndpoint.noEncoder", obj.getClass()));
5701157Smax.romanov@nginx.com         }
5711157Smax.romanov@nginx.com     }
5721157Smax.romanov@nginx.com 
5731157Smax.romanov@nginx.com 
sendObjectByFuture(Object obj)5741157Smax.romanov@nginx.com     public Future<Void> sendObjectByFuture(Object obj) {
5751157Smax.romanov@nginx.com         FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
5761157Smax.romanov@nginx.com         sendObjectByCompletion(obj, f2sh);
5771157Smax.romanov@nginx.com         return f2sh;
5781157Smax.romanov@nginx.com     }
5791157Smax.romanov@nginx.com 
5801157Smax.romanov@nginx.com 
5811157Smax.romanov@nginx.com     @SuppressWarnings({"unchecked", "rawtypes"})
sendObjectByCompletion(Object obj, SendHandler completion)5821157Smax.romanov@nginx.com     public void sendObjectByCompletion(Object obj, SendHandler completion) {
5831157Smax.romanov@nginx.com 
5841157Smax.romanov@nginx.com         if (obj == null) {
5851157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData"));
5861157Smax.romanov@nginx.com         }
5871157Smax.romanov@nginx.com         if (completion == null) {
5881157Smax.romanov@nginx.com             throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullHandler"));
5891157Smax.romanov@nginx.com         }
5901157Smax.romanov@nginx.com 
5911157Smax.romanov@nginx.com         /*
5921157Smax.romanov@nginx.com          * Note that the implementation will convert primitives and their object
5931157Smax.romanov@nginx.com          * equivalents by default but that users are free to specify their own
5941157Smax.romanov@nginx.com          * encoders and decoders for this if they wish.
5951157Smax.romanov@nginx.com          */
5961157Smax.romanov@nginx.com         Encoder encoder = findEncoder(obj);
5971157Smax.romanov@nginx.com         if (encoder == null && Util.isPrimitive(obj.getClass())) {
5981157Smax.romanov@nginx.com             String msg = obj.toString();
5991157Smax.romanov@nginx.com             sendStringByCompletion(msg, completion);
6001157Smax.romanov@nginx.com             return;
6011157Smax.romanov@nginx.com         }
6021157Smax.romanov@nginx.com         if (encoder == null && byte[].class.isAssignableFrom(obj.getClass())) {
6031157Smax.romanov@nginx.com             ByteBuffer msg = ByteBuffer.wrap((byte[]) obj);
6041157Smax.romanov@nginx.com             sendBytesByCompletion(msg, completion);
6051157Smax.romanov@nginx.com             return;
6061157Smax.romanov@nginx.com         }
6071157Smax.romanov@nginx.com 
6081157Smax.romanov@nginx.com         try {
6091157Smax.romanov@nginx.com             if (encoder instanceof Encoder.Text) {
6101157Smax.romanov@nginx.com                 String msg = ((Encoder.Text) encoder).encode(obj);
6111157Smax.romanov@nginx.com                 sendStringByCompletion(msg, completion);
6121157Smax.romanov@nginx.com             } else if (encoder instanceof Encoder.TextStream) {
6131157Smax.romanov@nginx.com                 try (Writer w = getSendWriter()) {
6141157Smax.romanov@nginx.com                     ((Encoder.TextStream) encoder).encode(obj, w);
6151157Smax.romanov@nginx.com                 }
6161157Smax.romanov@nginx.com                 completion.onResult(new SendResult());
6171157Smax.romanov@nginx.com             } else if (encoder instanceof Encoder.Binary) {
6181157Smax.romanov@nginx.com                 ByteBuffer msg = ((Encoder.Binary) encoder).encode(obj);
6191157Smax.romanov@nginx.com                 sendBytesByCompletion(msg, completion);
6201157Smax.romanov@nginx.com             } else if (encoder instanceof Encoder.BinaryStream) {
6211157Smax.romanov@nginx.com                 try (OutputStream os = getSendStream()) {
6221157Smax.romanov@nginx.com                     ((Encoder.BinaryStream) encoder).encode(obj, os);
6231157Smax.romanov@nginx.com                 }
6241157Smax.romanov@nginx.com                 completion.onResult(new SendResult());
6251157Smax.romanov@nginx.com             } else {
6261157Smax.romanov@nginx.com                 throw new EncodeException(obj, sm.getString(
6271157Smax.romanov@nginx.com                         "wsRemoteEndpoint.noEncoder", obj.getClass()));
6281157Smax.romanov@nginx.com             }
6291157Smax.romanov@nginx.com         } catch (Exception e) {
6301157Smax.romanov@nginx.com             SendResult sr = new SendResult(e);
6311157Smax.romanov@nginx.com             completion.onResult(sr);
6321157Smax.romanov@nginx.com         }
6331157Smax.romanov@nginx.com     }
6341157Smax.romanov@nginx.com 
6351157Smax.romanov@nginx.com 
setSession(WsSession wsSession)6361157Smax.romanov@nginx.com     protected void setSession(WsSession wsSession) {
6371157Smax.romanov@nginx.com         this.wsSession = wsSession;
6381157Smax.romanov@nginx.com     }
6391157Smax.romanov@nginx.com 
6401157Smax.romanov@nginx.com 
setRequest(Request request)6411157Smax.romanov@nginx.com     protected void setRequest(Request request) {
6421157Smax.romanov@nginx.com         this.request = request;
6431157Smax.romanov@nginx.com     }
6441157Smax.romanov@nginx.com 
setEncoders(EndpointConfig endpointConfig)6451157Smax.romanov@nginx.com     protected void setEncoders(EndpointConfig endpointConfig)
6461157Smax.romanov@nginx.com             throws DeploymentException {
6471157Smax.romanov@nginx.com         encoderEntries.clear();
6481157Smax.romanov@nginx.com         for (Class<? extends Encoder> encoderClazz :
6491157Smax.romanov@nginx.com                 endpointConfig.getEncoders()) {
6501157Smax.romanov@nginx.com             Encoder instance;
6511157Smax.romanov@nginx.com             try {
6521157Smax.romanov@nginx.com                 instance = encoderClazz.getConstructor().newInstance();
6531157Smax.romanov@nginx.com                 instance.init(endpointConfig);
6541157Smax.romanov@nginx.com             } catch (ReflectiveOperationException e) {
6551157Smax.romanov@nginx.com                 throw new DeploymentException(
6561157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.invalidEncoder",
6571157Smax.romanov@nginx.com                                 encoderClazz.getName()), e);
6581157Smax.romanov@nginx.com             }
6591157Smax.romanov@nginx.com             EncoderEntry entry = new EncoderEntry(
6601157Smax.romanov@nginx.com                     Util.getEncoderType(encoderClazz), instance);
6611157Smax.romanov@nginx.com             encoderEntries.add(entry);
6621157Smax.romanov@nginx.com         }
6631157Smax.romanov@nginx.com     }
6641157Smax.romanov@nginx.com 
6651157Smax.romanov@nginx.com 
findEncoder(Object obj)6661157Smax.romanov@nginx.com     private Encoder findEncoder(Object obj) {
6671157Smax.romanov@nginx.com         for (EncoderEntry entry : encoderEntries) {
6681157Smax.romanov@nginx.com             if (entry.getClazz().isAssignableFrom(obj.getClass())) {
6691157Smax.romanov@nginx.com                 return entry.getEncoder();
6701157Smax.romanov@nginx.com             }
6711157Smax.romanov@nginx.com         }
6721157Smax.romanov@nginx.com         return null;
6731157Smax.romanov@nginx.com     }
6741157Smax.romanov@nginx.com 
6751157Smax.romanov@nginx.com 
close()6761157Smax.romanov@nginx.com     public final void close() {
6771157Smax.romanov@nginx.com         for (EncoderEntry entry : encoderEntries) {
6781157Smax.romanov@nginx.com             entry.getEncoder().destroy();
6791157Smax.romanov@nginx.com         }
6801157Smax.romanov@nginx.com 
6811157Smax.romanov@nginx.com         request.closeWs();
6821157Smax.romanov@nginx.com     }
6831157Smax.romanov@nginx.com 
6841157Smax.romanov@nginx.com 
doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... data)6851157Smax.romanov@nginx.com     protected abstract void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
6861157Smax.romanov@nginx.com             ByteBuffer... data);
isMasked()6871157Smax.romanov@nginx.com     protected abstract boolean isMasked();
doClose()6881157Smax.romanov@nginx.com     protected abstract void doClose();
6891157Smax.romanov@nginx.com 
writeHeader(ByteBuffer headerBuffer, boolean fin, int rsv, byte opCode, boolean masked, ByteBuffer payload, byte[] mask, boolean first)6901157Smax.romanov@nginx.com     private static void writeHeader(ByteBuffer headerBuffer, boolean fin,
6911157Smax.romanov@nginx.com             int rsv, byte opCode, boolean masked, ByteBuffer payload,
6921157Smax.romanov@nginx.com             byte[] mask, boolean first) {
6931157Smax.romanov@nginx.com 
6941157Smax.romanov@nginx.com         byte b = 0;
6951157Smax.romanov@nginx.com 
6961157Smax.romanov@nginx.com         if (fin) {
6971157Smax.romanov@nginx.com             // Set the fin bit
6981157Smax.romanov@nginx.com             b -= 128;
6991157Smax.romanov@nginx.com         }
7001157Smax.romanov@nginx.com 
7011157Smax.romanov@nginx.com         b += (rsv << 4);
7021157Smax.romanov@nginx.com 
7031157Smax.romanov@nginx.com         if (first) {
7041157Smax.romanov@nginx.com             // This is the first fragment of this message
7051157Smax.romanov@nginx.com             b += opCode;
7061157Smax.romanov@nginx.com         }
7071157Smax.romanov@nginx.com         // If not the first fragment, it is a continuation with opCode of zero
7081157Smax.romanov@nginx.com 
7091157Smax.romanov@nginx.com         headerBuffer.put(b);
7101157Smax.romanov@nginx.com 
7111157Smax.romanov@nginx.com         if (masked) {
7121157Smax.romanov@nginx.com             b = (byte) 0x80;
7131157Smax.romanov@nginx.com         } else {
7141157Smax.romanov@nginx.com             b = 0;
7151157Smax.romanov@nginx.com         }
7161157Smax.romanov@nginx.com 
7171157Smax.romanov@nginx.com         // Next write the mask && length length
7181157Smax.romanov@nginx.com         if (payload.limit() < 126) {
7191157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() | b));
7201157Smax.romanov@nginx.com         } else if (payload.limit() < 65536) {
7211157Smax.romanov@nginx.com             headerBuffer.put((byte) (126 | b));
7221157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() >>> 8));
7231157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() & 0xFF));
7241157Smax.romanov@nginx.com         } else {
7251157Smax.romanov@nginx.com             // Will never be more than 2^31-1
7261157Smax.romanov@nginx.com             headerBuffer.put((byte) (127 | b));
7271157Smax.romanov@nginx.com             headerBuffer.put((byte) 0);
7281157Smax.romanov@nginx.com             headerBuffer.put((byte) 0);
7291157Smax.romanov@nginx.com             headerBuffer.put((byte) 0);
7301157Smax.romanov@nginx.com             headerBuffer.put((byte) 0);
7311157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() >>> 24));
7321157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() >>> 16));
7331157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() >>> 8));
7341157Smax.romanov@nginx.com             headerBuffer.put((byte) (payload.limit() & 0xFF));
7351157Smax.romanov@nginx.com         }
7361157Smax.romanov@nginx.com         if (masked) {
7371157Smax.romanov@nginx.com             headerBuffer.put(mask[0]);
7381157Smax.romanov@nginx.com             headerBuffer.put(mask[1]);
7391157Smax.romanov@nginx.com             headerBuffer.put(mask[2]);
7401157Smax.romanov@nginx.com             headerBuffer.put(mask[3]);
7411157Smax.romanov@nginx.com         }
7421157Smax.romanov@nginx.com     }
7431157Smax.romanov@nginx.com 
7441157Smax.romanov@nginx.com 
7451157Smax.romanov@nginx.com     private class TextMessageSendHandler implements SendHandler {
7461157Smax.romanov@nginx.com 
7471157Smax.romanov@nginx.com         private final SendHandler handler;
7481157Smax.romanov@nginx.com         private final CharBuffer message;
7491157Smax.romanov@nginx.com         private final boolean isLast;
7501157Smax.romanov@nginx.com         private final CharsetEncoder encoder;
7511157Smax.romanov@nginx.com         private final ByteBuffer buffer;
7521157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
7531157Smax.romanov@nginx.com         private volatile boolean isDone = false;
7541157Smax.romanov@nginx.com 
TextMessageSendHandler(SendHandler handler, CharBuffer message, boolean isLast, CharsetEncoder encoder, ByteBuffer encoderBuffer, WsRemoteEndpointImplBase endpoint)7551157Smax.romanov@nginx.com         public TextMessageSendHandler(SendHandler handler, CharBuffer message,
7561157Smax.romanov@nginx.com                 boolean isLast, CharsetEncoder encoder,
7571157Smax.romanov@nginx.com                 ByteBuffer encoderBuffer, WsRemoteEndpointImplBase endpoint) {
7581157Smax.romanov@nginx.com             this.handler = handler;
7591157Smax.romanov@nginx.com             this.message = message;
7601157Smax.romanov@nginx.com             this.isLast = isLast;
7611157Smax.romanov@nginx.com             this.encoder = encoder.reset();
7621157Smax.romanov@nginx.com             this.buffer = encoderBuffer;
7631157Smax.romanov@nginx.com             this.endpoint = endpoint;
7641157Smax.romanov@nginx.com         }
7651157Smax.romanov@nginx.com 
write()7661157Smax.romanov@nginx.com         public void write() {
7671157Smax.romanov@nginx.com             buffer.clear();
7681157Smax.romanov@nginx.com             CoderResult cr = encoder.encode(message, buffer, true);
7691157Smax.romanov@nginx.com             if (cr.isError()) {
7701157Smax.romanov@nginx.com                 throw new IllegalArgumentException(cr.toString());
7711157Smax.romanov@nginx.com             }
7721157Smax.romanov@nginx.com             isDone = !cr.isOverflow();
7731157Smax.romanov@nginx.com             buffer.flip();
7741157Smax.romanov@nginx.com             endpoint.startMessage(Constants.OPCODE_TEXT, buffer,
7751157Smax.romanov@nginx.com                     isDone && isLast, this);
7761157Smax.romanov@nginx.com         }
7771157Smax.romanov@nginx.com 
7781157Smax.romanov@nginx.com         @Override
onResult(SendResult result)7791157Smax.romanov@nginx.com         public void onResult(SendResult result) {
7801157Smax.romanov@nginx.com             if (isDone) {
7811157Smax.romanov@nginx.com                 endpoint.stateMachine.complete(isLast);
7821157Smax.romanov@nginx.com                 handler.onResult(result);
7831157Smax.romanov@nginx.com             } else if(!result.isOK()) {
7841157Smax.romanov@nginx.com                 handler.onResult(result);
7851157Smax.romanov@nginx.com             } else if (closed){
7861157Smax.romanov@nginx.com                 SendResult sr = new SendResult(new IOException(
7871157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedDuringMessage")));
7881157Smax.romanov@nginx.com                 handler.onResult(sr);
7891157Smax.romanov@nginx.com             } else {
7901157Smax.romanov@nginx.com                 write();
7911157Smax.romanov@nginx.com             }
7921157Smax.romanov@nginx.com         }
7931157Smax.romanov@nginx.com     }
7941157Smax.romanov@nginx.com 
7951157Smax.romanov@nginx.com 
7961157Smax.romanov@nginx.com     /**
7971157Smax.romanov@nginx.com      * Used to write data to the output buffer, flushing the buffer if it fills
7981157Smax.romanov@nginx.com      * up.
7991157Smax.romanov@nginx.com      */
8001157Smax.romanov@nginx.com     private static class OutputBufferSendHandler implements SendHandler {
8011157Smax.romanov@nginx.com 
8021157Smax.romanov@nginx.com         private final SendHandler handler;
8031157Smax.romanov@nginx.com         private final long blockingWriteTimeoutExpiry;
8041157Smax.romanov@nginx.com         private final ByteBuffer headerBuffer;
8051157Smax.romanov@nginx.com         private final ByteBuffer payload;
8061157Smax.romanov@nginx.com         private final byte[] mask;
8071157Smax.romanov@nginx.com         private final ByteBuffer outputBuffer;
8081157Smax.romanov@nginx.com         private final boolean flushRequired;
8091157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
8101157Smax.romanov@nginx.com         private int maskIndex = 0;
8111157Smax.romanov@nginx.com 
OutputBufferSendHandler(SendHandler completion, long blockingWriteTimeoutExpiry, ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask, ByteBuffer outputBuffer, boolean flushRequired, WsRemoteEndpointImplBase endpoint)8121157Smax.romanov@nginx.com         public OutputBufferSendHandler(SendHandler completion,
8131157Smax.romanov@nginx.com                 long blockingWriteTimeoutExpiry,
8141157Smax.romanov@nginx.com                 ByteBuffer headerBuffer, ByteBuffer payload, byte[] mask,
8151157Smax.romanov@nginx.com                 ByteBuffer outputBuffer, boolean flushRequired,
8161157Smax.romanov@nginx.com                 WsRemoteEndpointImplBase endpoint) {
8171157Smax.romanov@nginx.com             this.blockingWriteTimeoutExpiry = blockingWriteTimeoutExpiry;
8181157Smax.romanov@nginx.com             this.handler = completion;
8191157Smax.romanov@nginx.com             this.headerBuffer = headerBuffer;
8201157Smax.romanov@nginx.com             this.payload = payload;
8211157Smax.romanov@nginx.com             this.mask = mask;
8221157Smax.romanov@nginx.com             this.outputBuffer = outputBuffer;
8231157Smax.romanov@nginx.com             this.flushRequired = flushRequired;
8241157Smax.romanov@nginx.com             this.endpoint = endpoint;
8251157Smax.romanov@nginx.com         }
8261157Smax.romanov@nginx.com 
write()8271157Smax.romanov@nginx.com         public void write() {
8281157Smax.romanov@nginx.com             // Write the header
8291157Smax.romanov@nginx.com             while (headerBuffer.hasRemaining() && outputBuffer.hasRemaining()) {
8301157Smax.romanov@nginx.com                 outputBuffer.put(headerBuffer.get());
8311157Smax.romanov@nginx.com             }
8321157Smax.romanov@nginx.com             if (headerBuffer.hasRemaining()) {
8331157Smax.romanov@nginx.com                 // Still more headers to write, need to flush
8341157Smax.romanov@nginx.com                 outputBuffer.flip();
8351157Smax.romanov@nginx.com                 endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
8361157Smax.romanov@nginx.com                 return;
8371157Smax.romanov@nginx.com             }
8381157Smax.romanov@nginx.com 
8391157Smax.romanov@nginx.com             // Write the payload
8401157Smax.romanov@nginx.com             int payloadLeft = payload.remaining();
8411157Smax.romanov@nginx.com             int payloadLimit = payload.limit();
8421157Smax.romanov@nginx.com             int outputSpace = outputBuffer.remaining();
8431157Smax.romanov@nginx.com             int toWrite = payloadLeft;
8441157Smax.romanov@nginx.com 
8451157Smax.romanov@nginx.com             if (payloadLeft > outputSpace) {
8461157Smax.romanov@nginx.com                 toWrite = outputSpace;
8471157Smax.romanov@nginx.com                 // Temporarily reduce the limit
8481157Smax.romanov@nginx.com                 payload.limit(payload.position() + toWrite);
8491157Smax.romanov@nginx.com             }
8501157Smax.romanov@nginx.com 
8511157Smax.romanov@nginx.com             if (mask == null) {
8521157Smax.romanov@nginx.com                 // Use a bulk copy
8531157Smax.romanov@nginx.com                 outputBuffer.put(payload);
8541157Smax.romanov@nginx.com             } else {
8551157Smax.romanov@nginx.com                 for (int i = 0; i < toWrite; i++) {
8561157Smax.romanov@nginx.com                     outputBuffer.put(
8571157Smax.romanov@nginx.com                             (byte) (payload.get() ^ (mask[maskIndex++] & 0xFF)));
8581157Smax.romanov@nginx.com                     if (maskIndex > 3) {
8591157Smax.romanov@nginx.com                         maskIndex = 0;
8601157Smax.romanov@nginx.com                     }
8611157Smax.romanov@nginx.com                 }
8621157Smax.romanov@nginx.com             }
8631157Smax.romanov@nginx.com 
8641157Smax.romanov@nginx.com             if (payloadLeft > outputSpace) {
8651157Smax.romanov@nginx.com                 // Restore the original limit
8661157Smax.romanov@nginx.com                 payload.limit(payloadLimit);
8671157Smax.romanov@nginx.com                 // Still more data to write, need to flush
8681157Smax.romanov@nginx.com                 outputBuffer.flip();
8691157Smax.romanov@nginx.com                 endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
8701157Smax.romanov@nginx.com                 return;
8711157Smax.romanov@nginx.com             }
8721157Smax.romanov@nginx.com 
8731157Smax.romanov@nginx.com             if (flushRequired) {
8741157Smax.romanov@nginx.com                 outputBuffer.flip();
8751157Smax.romanov@nginx.com                 if (outputBuffer.remaining() == 0) {
8761157Smax.romanov@nginx.com                     handler.onResult(SENDRESULT_OK);
8771157Smax.romanov@nginx.com                 } else {
8781157Smax.romanov@nginx.com                     endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
8791157Smax.romanov@nginx.com                 }
8801157Smax.romanov@nginx.com             } else {
8811157Smax.romanov@nginx.com                 handler.onResult(SENDRESULT_OK);
8821157Smax.romanov@nginx.com             }
8831157Smax.romanov@nginx.com         }
8841157Smax.romanov@nginx.com 
8851157Smax.romanov@nginx.com         // ------------------------------------------------- SendHandler methods
8861157Smax.romanov@nginx.com         @Override
onResult(SendResult result)8871157Smax.romanov@nginx.com         public void onResult(SendResult result) {
8881157Smax.romanov@nginx.com             if (result.isOK()) {
8891157Smax.romanov@nginx.com                 if (outputBuffer.hasRemaining()) {
8901157Smax.romanov@nginx.com                     endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer);
8911157Smax.romanov@nginx.com                 } else {
8921157Smax.romanov@nginx.com                     outputBuffer.clear();
8931157Smax.romanov@nginx.com                     write();
8941157Smax.romanov@nginx.com                 }
8951157Smax.romanov@nginx.com             } else {
8961157Smax.romanov@nginx.com                 handler.onResult(result);
8971157Smax.romanov@nginx.com             }
8981157Smax.romanov@nginx.com         }
8991157Smax.romanov@nginx.com     }
9001157Smax.romanov@nginx.com 
9011157Smax.romanov@nginx.com 
9021157Smax.romanov@nginx.com     /**
9031157Smax.romanov@nginx.com      * Ensures that the output buffer is cleared after it has been flushed.
9041157Smax.romanov@nginx.com      */
9051157Smax.romanov@nginx.com     private static class OutputBufferFlushSendHandler implements SendHandler {
9061157Smax.romanov@nginx.com 
9071157Smax.romanov@nginx.com         private final ByteBuffer outputBuffer;
9081157Smax.romanov@nginx.com         private final SendHandler handler;
9091157Smax.romanov@nginx.com 
OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler)9101157Smax.romanov@nginx.com         public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler) {
9111157Smax.romanov@nginx.com             this.outputBuffer = outputBuffer;
9121157Smax.romanov@nginx.com             this.handler = handler;
9131157Smax.romanov@nginx.com         }
9141157Smax.romanov@nginx.com 
9151157Smax.romanov@nginx.com         @Override
onResult(SendResult result)9161157Smax.romanov@nginx.com         public void onResult(SendResult result) {
9171157Smax.romanov@nginx.com             if (result.isOK()) {
9181157Smax.romanov@nginx.com                 outputBuffer.clear();
9191157Smax.romanov@nginx.com             }
9201157Smax.romanov@nginx.com             handler.onResult(result);
9211157Smax.romanov@nginx.com         }
9221157Smax.romanov@nginx.com     }
9231157Smax.romanov@nginx.com 
9241157Smax.romanov@nginx.com 
9251157Smax.romanov@nginx.com     private static class WsOutputStream extends OutputStream {
9261157Smax.romanov@nginx.com 
9271157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
9281157Smax.romanov@nginx.com         private final ByteBuffer buffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
9291157Smax.romanov@nginx.com         private final Object closeLock = new Object();
9301157Smax.romanov@nginx.com         private volatile boolean closed = false;
9311157Smax.romanov@nginx.com         private volatile boolean used = false;
9321157Smax.romanov@nginx.com 
WsOutputStream(WsRemoteEndpointImplBase endpoint)9331157Smax.romanov@nginx.com         public WsOutputStream(WsRemoteEndpointImplBase endpoint) {
9341157Smax.romanov@nginx.com             this.endpoint = endpoint;
9351157Smax.romanov@nginx.com         }
9361157Smax.romanov@nginx.com 
9371157Smax.romanov@nginx.com         @Override
write(int b)9381157Smax.romanov@nginx.com         public void write(int b) throws IOException {
9391157Smax.romanov@nginx.com             if (closed) {
9401157Smax.romanov@nginx.com                 throw new IllegalStateException(
9411157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedOutputStream"));
9421157Smax.romanov@nginx.com             }
9431157Smax.romanov@nginx.com 
9441157Smax.romanov@nginx.com             used = true;
9451157Smax.romanov@nginx.com             if (buffer.remaining() == 0) {
9461157Smax.romanov@nginx.com                 flush();
9471157Smax.romanov@nginx.com             }
9481157Smax.romanov@nginx.com             buffer.put((byte) b);
9491157Smax.romanov@nginx.com         }
9501157Smax.romanov@nginx.com 
9511157Smax.romanov@nginx.com         @Override
write(byte[] b, int off, int len)9521157Smax.romanov@nginx.com         public void write(byte[] b, int off, int len) throws IOException {
9531157Smax.romanov@nginx.com             if (closed) {
9541157Smax.romanov@nginx.com                 throw new IllegalStateException(
9551157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedOutputStream"));
9561157Smax.romanov@nginx.com             }
9571157Smax.romanov@nginx.com             if (len == 0) {
9581157Smax.romanov@nginx.com                 return;
9591157Smax.romanov@nginx.com             }
9601157Smax.romanov@nginx.com             if ((off < 0) || (off > b.length) || (len < 0) ||
9611157Smax.romanov@nginx.com                 ((off + len) > b.length) || ((off + len) < 0)) {
9621157Smax.romanov@nginx.com                 throw new IndexOutOfBoundsException();
9631157Smax.romanov@nginx.com             }
9641157Smax.romanov@nginx.com 
9651157Smax.romanov@nginx.com             used = true;
9661157Smax.romanov@nginx.com             if (buffer.remaining() == 0) {
9671157Smax.romanov@nginx.com                 flush();
9681157Smax.romanov@nginx.com             }
9691157Smax.romanov@nginx.com             int remaining = buffer.remaining();
9701157Smax.romanov@nginx.com             int written = 0;
9711157Smax.romanov@nginx.com 
9721157Smax.romanov@nginx.com             while (remaining < len - written) {
9731157Smax.romanov@nginx.com                 buffer.put(b, off + written, remaining);
9741157Smax.romanov@nginx.com                 written += remaining;
9751157Smax.romanov@nginx.com                 flush();
9761157Smax.romanov@nginx.com                 remaining = buffer.remaining();
9771157Smax.romanov@nginx.com             }
9781157Smax.romanov@nginx.com             buffer.put(b, off + written, len - written);
9791157Smax.romanov@nginx.com         }
9801157Smax.romanov@nginx.com 
9811157Smax.romanov@nginx.com         @Override
flush()9821157Smax.romanov@nginx.com         public void flush() throws IOException {
9831157Smax.romanov@nginx.com             if (closed) {
9841157Smax.romanov@nginx.com                 throw new IllegalStateException(
9851157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedOutputStream"));
9861157Smax.romanov@nginx.com             }
9871157Smax.romanov@nginx.com 
9881157Smax.romanov@nginx.com             // Optimisation. If there is no data to flush then do not send an
9891157Smax.romanov@nginx.com             // empty message.
9901157Smax.romanov@nginx.com             if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || buffer.position() > 0) {
9911157Smax.romanov@nginx.com                 doWrite(false);
9921157Smax.romanov@nginx.com             }
9931157Smax.romanov@nginx.com         }
9941157Smax.romanov@nginx.com 
9951157Smax.romanov@nginx.com         @Override
close()9961157Smax.romanov@nginx.com         public void close() throws IOException {
9971157Smax.romanov@nginx.com             synchronized (closeLock) {
9981157Smax.romanov@nginx.com                 if (closed) {
9991157Smax.romanov@nginx.com                     return;
10001157Smax.romanov@nginx.com                 }
10011157Smax.romanov@nginx.com                 closed = true;
10021157Smax.romanov@nginx.com             }
10031157Smax.romanov@nginx.com 
10041157Smax.romanov@nginx.com             doWrite(true);
10051157Smax.romanov@nginx.com         }
10061157Smax.romanov@nginx.com 
doWrite(boolean last)10071157Smax.romanov@nginx.com         private void doWrite(boolean last) throws IOException {
10081157Smax.romanov@nginx.com             if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || used) {
10091157Smax.romanov@nginx.com                 buffer.flip();
10101157Smax.romanov@nginx.com                 endpoint.sendMessageBlock(Constants.OPCODE_BINARY, buffer, last);
10111157Smax.romanov@nginx.com             }
10121157Smax.romanov@nginx.com             endpoint.stateMachine.complete(last);
10131157Smax.romanov@nginx.com             buffer.clear();
10141157Smax.romanov@nginx.com         }
10151157Smax.romanov@nginx.com     }
10161157Smax.romanov@nginx.com 
10171157Smax.romanov@nginx.com 
10181157Smax.romanov@nginx.com     private static class WsWriter extends Writer {
10191157Smax.romanov@nginx.com 
10201157Smax.romanov@nginx.com         private final WsRemoteEndpointImplBase endpoint;
10211157Smax.romanov@nginx.com         private final CharBuffer buffer = CharBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
10221157Smax.romanov@nginx.com         private final Object closeLock = new Object();
10231157Smax.romanov@nginx.com         private volatile boolean closed = false;
10241157Smax.romanov@nginx.com         private volatile boolean used = false;
10251157Smax.romanov@nginx.com 
WsWriter(WsRemoteEndpointImplBase endpoint)10261157Smax.romanov@nginx.com         public WsWriter(WsRemoteEndpointImplBase endpoint) {
10271157Smax.romanov@nginx.com             this.endpoint = endpoint;
10281157Smax.romanov@nginx.com         }
10291157Smax.romanov@nginx.com 
10301157Smax.romanov@nginx.com         @Override
write(char[] cbuf, int off, int len)10311157Smax.romanov@nginx.com         public void write(char[] cbuf, int off, int len) throws IOException {
10321157Smax.romanov@nginx.com             if (closed) {
10331157Smax.romanov@nginx.com                 throw new IllegalStateException(
10341157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedWriter"));
10351157Smax.romanov@nginx.com             }
10361157Smax.romanov@nginx.com             if (len == 0) {
10371157Smax.romanov@nginx.com                 return;
10381157Smax.romanov@nginx.com             }
10391157Smax.romanov@nginx.com             if ((off < 0) || (off > cbuf.length) || (len < 0) ||
10401157Smax.romanov@nginx.com                     ((off + len) > cbuf.length) || ((off + len) < 0)) {
10411157Smax.romanov@nginx.com                 throw new IndexOutOfBoundsException();
10421157Smax.romanov@nginx.com             }
10431157Smax.romanov@nginx.com 
10441157Smax.romanov@nginx.com             used = true;
10451157Smax.romanov@nginx.com             if (buffer.remaining() == 0) {
10461157Smax.romanov@nginx.com                 flush();
10471157Smax.romanov@nginx.com             }
10481157Smax.romanov@nginx.com             int remaining = buffer.remaining();
10491157Smax.romanov@nginx.com             int written = 0;
10501157Smax.romanov@nginx.com 
10511157Smax.romanov@nginx.com             while (remaining < len - written) {
10521157Smax.romanov@nginx.com                 buffer.put(cbuf, off + written, remaining);
10531157Smax.romanov@nginx.com                 written += remaining;
10541157Smax.romanov@nginx.com                 flush();
10551157Smax.romanov@nginx.com                 remaining = buffer.remaining();
10561157Smax.romanov@nginx.com             }
10571157Smax.romanov@nginx.com             buffer.put(cbuf, off + written, len - written);
10581157Smax.romanov@nginx.com         }
10591157Smax.romanov@nginx.com 
10601157Smax.romanov@nginx.com         @Override
flush()10611157Smax.romanov@nginx.com         public void flush() throws IOException {
10621157Smax.romanov@nginx.com             if (closed) {
10631157Smax.romanov@nginx.com                 throw new IllegalStateException(
10641157Smax.romanov@nginx.com                         sm.getString("wsRemoteEndpoint.closedWriter"));
10651157Smax.romanov@nginx.com             }
10661157Smax.romanov@nginx.com 
10671157Smax.romanov@nginx.com             if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || buffer.position() > 0) {
10681157Smax.romanov@nginx.com                 doWrite(false);
10691157Smax.romanov@nginx.com             }
10701157Smax.romanov@nginx.com         }
10711157Smax.romanov@nginx.com 
10721157Smax.romanov@nginx.com         @Override
close()10731157Smax.romanov@nginx.com         public void close() throws IOException {
10741157Smax.romanov@nginx.com             synchronized (closeLock) {
10751157Smax.romanov@nginx.com                 if (closed) {
10761157Smax.romanov@nginx.com                     return;
10771157Smax.romanov@nginx.com                 }
10781157Smax.romanov@nginx.com                 closed = true;
10791157Smax.romanov@nginx.com             }
10801157Smax.romanov@nginx.com 
10811157Smax.romanov@nginx.com             doWrite(true);
10821157Smax.romanov@nginx.com         }
10831157Smax.romanov@nginx.com 
doWrite(boolean last)10841157Smax.romanov@nginx.com         private void doWrite(boolean last) throws IOException {
10851157Smax.romanov@nginx.com             if (!Constants.STREAMS_DROP_EMPTY_MESSAGES || used) {
10861157Smax.romanov@nginx.com                 buffer.flip();
10871157Smax.romanov@nginx.com                 endpoint.sendMessageBlock(buffer, last);
10881157Smax.romanov@nginx.com                 buffer.clear();
10891157Smax.romanov@nginx.com             } else {
10901157Smax.romanov@nginx.com                 endpoint.stateMachine.complete(last);
10911157Smax.romanov@nginx.com             }
10921157Smax.romanov@nginx.com         }
10931157Smax.romanov@nginx.com     }
10941157Smax.romanov@nginx.com 
10951157Smax.romanov@nginx.com 
10961157Smax.romanov@nginx.com     private static class EncoderEntry {
10971157Smax.romanov@nginx.com 
10981157Smax.romanov@nginx.com         private final Class<?> clazz;
10991157Smax.romanov@nginx.com         private final Encoder encoder;
11001157Smax.romanov@nginx.com 
EncoderEntry(Class<?> clazz, Encoder encoder)11011157Smax.romanov@nginx.com         public EncoderEntry(Class<?> clazz, Encoder encoder) {
11021157Smax.romanov@nginx.com             this.clazz = clazz;
11031157Smax.romanov@nginx.com             this.encoder = encoder;
11041157Smax.romanov@nginx.com         }
11051157Smax.romanov@nginx.com 
getClazz()11061157Smax.romanov@nginx.com         public Class<?> getClazz() {
11071157Smax.romanov@nginx.com             return clazz;
11081157Smax.romanov@nginx.com         }
11091157Smax.romanov@nginx.com 
getEncoder()11101157Smax.romanov@nginx.com         public Encoder getEncoder() {
11111157Smax.romanov@nginx.com             return encoder;
11121157Smax.romanov@nginx.com         }
11131157Smax.romanov@nginx.com     }
11141157Smax.romanov@nginx.com 
11151157Smax.romanov@nginx.com 
11161157Smax.romanov@nginx.com     private enum State {
11171157Smax.romanov@nginx.com         OPEN,
11181157Smax.romanov@nginx.com         STREAM_WRITING,
11191157Smax.romanov@nginx.com         WRITER_WRITING,
11201157Smax.romanov@nginx.com         BINARY_PARTIAL_WRITING,
11211157Smax.romanov@nginx.com         BINARY_PARTIAL_READY,
11221157Smax.romanov@nginx.com         BINARY_FULL_WRITING,
11231157Smax.romanov@nginx.com         TEXT_PARTIAL_WRITING,
11241157Smax.romanov@nginx.com         TEXT_PARTIAL_READY,
11251157Smax.romanov@nginx.com         TEXT_FULL_WRITING
11261157Smax.romanov@nginx.com     }
11271157Smax.romanov@nginx.com 
11281157Smax.romanov@nginx.com 
11291157Smax.romanov@nginx.com     private static class StateMachine {
11301157Smax.romanov@nginx.com         private State state = State.OPEN;
11311157Smax.romanov@nginx.com 
streamStart()11321157Smax.romanov@nginx.com         public synchronized void streamStart() {
11331157Smax.romanov@nginx.com             checkState(State.OPEN);
11341157Smax.romanov@nginx.com             state = State.STREAM_WRITING;
11351157Smax.romanov@nginx.com         }
11361157Smax.romanov@nginx.com 
writeStart()11371157Smax.romanov@nginx.com         public synchronized void writeStart() {
11381157Smax.romanov@nginx.com             checkState(State.OPEN);
11391157Smax.romanov@nginx.com             state = State.WRITER_WRITING;
11401157Smax.romanov@nginx.com         }
11411157Smax.romanov@nginx.com 
binaryPartialStart()11421157Smax.romanov@nginx.com         public synchronized void binaryPartialStart() {
11431157Smax.romanov@nginx.com             checkState(State.OPEN, State.BINARY_PARTIAL_READY);
11441157Smax.romanov@nginx.com             state = State.BINARY_PARTIAL_WRITING;
11451157Smax.romanov@nginx.com         }
11461157Smax.romanov@nginx.com 
binaryStart()11471157Smax.romanov@nginx.com         public synchronized void binaryStart() {
11481157Smax.romanov@nginx.com             checkState(State.OPEN);
11491157Smax.romanov@nginx.com             state = State.BINARY_FULL_WRITING;
11501157Smax.romanov@nginx.com         }
11511157Smax.romanov@nginx.com 
textPartialStart()11521157Smax.romanov@nginx.com         public synchronized void textPartialStart() {
11531157Smax.romanov@nginx.com             checkState(State.OPEN, State.TEXT_PARTIAL_READY);
11541157Smax.romanov@nginx.com             state = State.TEXT_PARTIAL_WRITING;
11551157Smax.romanov@nginx.com         }
11561157Smax.romanov@nginx.com 
textStart()11571157Smax.romanov@nginx.com         public synchronized void textStart() {
11581157Smax.romanov@nginx.com             checkState(State.OPEN);
11591157Smax.romanov@nginx.com             state = State.TEXT_FULL_WRITING;
11601157Smax.romanov@nginx.com         }
11611157Smax.romanov@nginx.com 
complete(boolean last)11621157Smax.romanov@nginx.com         public synchronized void complete(boolean last) {
11631157Smax.romanov@nginx.com             if (last) {
11641157Smax.romanov@nginx.com                 checkState(State.TEXT_PARTIAL_WRITING, State.TEXT_FULL_WRITING,
11651157Smax.romanov@nginx.com                         State.BINARY_PARTIAL_WRITING, State.BINARY_FULL_WRITING,
11661157Smax.romanov@nginx.com                         State.STREAM_WRITING, State.WRITER_WRITING);
11671157Smax.romanov@nginx.com                 state = State.OPEN;
11681157Smax.romanov@nginx.com             } else {
11691157Smax.romanov@nginx.com                 checkState(State.TEXT_PARTIAL_WRITING, State.BINARY_PARTIAL_WRITING,
11701157Smax.romanov@nginx.com                         State.STREAM_WRITING, State.WRITER_WRITING);
11711157Smax.romanov@nginx.com                 if (state == State.TEXT_PARTIAL_WRITING) {
11721157Smax.romanov@nginx.com                     state = State.TEXT_PARTIAL_READY;
11731157Smax.romanov@nginx.com                 } else if (state == State.BINARY_PARTIAL_WRITING){
11741157Smax.romanov@nginx.com                     state = State.BINARY_PARTIAL_READY;
11751157Smax.romanov@nginx.com                 } else if (state == State.WRITER_WRITING) {
11761157Smax.romanov@nginx.com                     // NO-OP. Leave state as is.
11771157Smax.romanov@nginx.com                 } else if (state == State.STREAM_WRITING) {
1178*2078Salx.manpages@gmail.com                     // NO-OP. Leave state as is.
11791157Smax.romanov@nginx.com                 } else {
11801157Smax.romanov@nginx.com                     // Should never happen
11811157Smax.romanov@nginx.com                     // The if ... else ... blocks above should cover all states
11821157Smax.romanov@nginx.com                     // permitted by the preceding checkState() call
11831157Smax.romanov@nginx.com                     throw new IllegalStateException(
11841157Smax.romanov@nginx.com                             "BUG: This code should never be called");
11851157Smax.romanov@nginx.com                 }
11861157Smax.romanov@nginx.com             }
11871157Smax.romanov@nginx.com         }
11881157Smax.romanov@nginx.com 
checkState(State... required)11891157Smax.romanov@nginx.com         private void checkState(State... required) {
11901157Smax.romanov@nginx.com             for (State state : required) {
11911157Smax.romanov@nginx.com                 if (this.state == state) {
11921157Smax.romanov@nginx.com                     return;
11931157Smax.romanov@nginx.com                 }
11941157Smax.romanov@nginx.com             }
11951157Smax.romanov@nginx.com             throw new IllegalStateException(
11961157Smax.romanov@nginx.com                     sm.getString("wsRemoteEndpoint.wrongState", this.state));
11971157Smax.romanov@nginx.com         }
11981157Smax.romanov@nginx.com     }
11991157Smax.romanov@nginx.com 
12001157Smax.romanov@nginx.com 
12011157Smax.romanov@nginx.com     private static class StateUpdateSendHandler implements SendHandler {
12021157Smax.romanov@nginx.com 
12031157Smax.romanov@nginx.com         private final SendHandler handler;
12041157Smax.romanov@nginx.com         private final StateMachine stateMachine;
12051157Smax.romanov@nginx.com 
StateUpdateSendHandler(SendHandler handler, StateMachine stateMachine)12061157Smax.romanov@nginx.com         public StateUpdateSendHandler(SendHandler handler, StateMachine stateMachine) {
12071157Smax.romanov@nginx.com             this.handler = handler;
12081157Smax.romanov@nginx.com             this.stateMachine = stateMachine;
12091157Smax.romanov@nginx.com         }
12101157Smax.romanov@nginx.com 
12111157Smax.romanov@nginx.com         @Override
onResult(SendResult result)12121157Smax.romanov@nginx.com         public void onResult(SendResult result) {
12131157Smax.romanov@nginx.com             if (result.isOK()) {
12141157Smax.romanov@nginx.com                 stateMachine.complete(true);
12151157Smax.romanov@nginx.com             }
12161157Smax.romanov@nginx.com             handler.onResult(result);
12171157Smax.romanov@nginx.com         }
12181157Smax.romanov@nginx.com     }
12191157Smax.romanov@nginx.com 
12201157Smax.romanov@nginx.com 
12211157Smax.romanov@nginx.com     private static class BlockingSendHandler implements SendHandler {
12221157Smax.romanov@nginx.com 
12231157Smax.romanov@nginx.com         private SendResult sendResult = null;
12241157Smax.romanov@nginx.com 
12251157Smax.romanov@nginx.com         @Override
onResult(SendResult result)12261157Smax.romanov@nginx.com         public void onResult(SendResult result) {
12271157Smax.romanov@nginx.com             sendResult = result;
12281157Smax.romanov@nginx.com         }
12291157Smax.romanov@nginx.com 
getSendResult()12301157Smax.romanov@nginx.com         public SendResult getSendResult() {
12311157Smax.romanov@nginx.com             return sendResult;
12321157Smax.romanov@nginx.com         }
12331157Smax.romanov@nginx.com     }
12341157Smax.romanov@nginx.com }
1235