xref: /unit/src/java/nginx/unit/websocket/WsSession.java (revision 1157:7ae152bda303)
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package nginx.unit.websocket;
18 
19 import java.io.IOException;
20 import java.net.URI;
21 import java.nio.ByteBuffer;
22 import java.nio.ByteOrder;
23 import java.nio.CharBuffer;
24 import java.nio.channels.WritePendingException;
25 import java.nio.charset.CharsetDecoder;
26 import java.nio.charset.CoderResult;
27 import java.nio.charset.CodingErrorAction;
28 import java.nio.charset.StandardCharsets;
29 import java.security.Principal;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicLong;
37 
38 import javax.websocket.CloseReason;
39 import javax.websocket.CloseReason.CloseCode;
40 import javax.websocket.CloseReason.CloseCodes;
41 import javax.websocket.DeploymentException;
42 import javax.websocket.Endpoint;
43 import javax.websocket.EndpointConfig;
44 import javax.websocket.Extension;
45 import javax.websocket.MessageHandler;
46 import javax.websocket.MessageHandler.Partial;
47 import javax.websocket.MessageHandler.Whole;
48 import javax.websocket.PongMessage;
49 import javax.websocket.RemoteEndpoint;
50 import javax.websocket.SendResult;
51 import javax.websocket.Session;
52 import javax.websocket.WebSocketContainer;
53 
54 import org.apache.juli.logging.Log;
55 import org.apache.juli.logging.LogFactory;
56 import org.apache.tomcat.InstanceManager;
57 import org.apache.tomcat.InstanceManagerBindings;
58 import org.apache.tomcat.util.ExceptionUtils;
59 import org.apache.tomcat.util.buf.Utf8Decoder;
60 import org.apache.tomcat.util.res.StringManager;
61 
62 import nginx.unit.Request;
63 
64 public class WsSession implements Session {
65 
66     // An ellipsis is a single character that looks like three periods in a row
67     // and is used to indicate a continuation.
68     private static final byte[] ELLIPSIS_BYTES = "\u2026".getBytes(StandardCharsets.UTF_8);
69     // An ellipsis is three bytes in UTF-8
70     private static final int ELLIPSIS_BYTES_LEN = ELLIPSIS_BYTES.length;
71 
72     private static final StringManager sm = StringManager.getManager(WsSession.class);
73     private static AtomicLong ids = new AtomicLong(0);
74 
75     private final Log log = LogFactory.getLog(WsSession.class); // must not be static
76 
77     private final CharsetDecoder utf8DecoderMessage = new Utf8Decoder().
78             onMalformedInput(CodingErrorAction.REPORT).
79             onUnmappableCharacter(CodingErrorAction.REPORT);
80 
81     private final Endpoint localEndpoint;
82     private final WsRemoteEndpointImplBase wsRemoteEndpoint;
83     private final RemoteEndpoint.Async remoteEndpointAsync;
84     private final RemoteEndpoint.Basic remoteEndpointBasic;
85     private final ClassLoader applicationClassLoader;
86     private final WsWebSocketContainer webSocketContainer;
87     private final URI requestUri;
88     private final Map<String, List<String>> requestParameterMap;
89     private final String queryString;
90     private final Principal userPrincipal;
91     private final EndpointConfig endpointConfig;
92 
93     private final List<Extension> negotiatedExtensions;
94     private final String subProtocol;
95     private final Map<String, String> pathParameters;
96     private final boolean secure;
97     private final String httpSessionId;
98     private final String id;
99 
100     // Expected to handle message types of <String> only
101     private volatile MessageHandler textMessageHandler = null;
102     // Expected to handle message types of <ByteBuffer> only
103     private volatile MessageHandler binaryMessageHandler = null;
104     private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = null;
105     private volatile State state = State.OPEN;
106     private final Object stateLock = new Object();
107     private final Map<String, Object> userProperties = new ConcurrentHashMap<>();
108     private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
109     private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
110     private volatile long maxIdleTimeout = 0;
111     private volatile long lastActive = System.currentTimeMillis();
112     private Map<FutureToSendHandler, FutureToSendHandler> futures = new ConcurrentHashMap<>();
113 
114     private CharBuffer messageBufferText;
115     private ByteBuffer binaryBuffer;
116     private byte startOpCode = Constants.OPCODE_CONTINUATION;
117 
118     /**
119      * Creates a new WebSocket session for communication between the two
120      * provided end points. The result of {@link Thread#getContextClassLoader()}
121      * at the time this constructor is called will be used when calling
122      * {@link Endpoint#onClose(Session, CloseReason)}.
123      *
124      * @param localEndpoint        The end point managed by this code
125      * @param wsRemoteEndpoint     The other / remote endpoint
126      * @param wsWebSocketContainer The container that created this session
127      * @param requestUri           The URI used to connect to this endpoint or
128      *                             <code>null</code> is this is a client session
129      * @param requestParameterMap  The parameters associated with the request
130      *                             that initiated this session or
131      *                             <code>null</code> if this is a client session
132      * @param queryString          The query string associated with the request
133      *                             that initiated this session or
134      *                             <code>null</code> if this is a client session
135      * @param userPrincipal        The principal associated with the request
136      *                             that initiated this session or
137      *                             <code>null</code> if this is a client session
138      * @param httpSessionId        The HTTP session ID associated with the
139      *                             request that initiated this session or
140      *                             <code>null</code> if this is a client session
141      * @param negotiatedExtensions The agreed extensions to use for this session
142      * @param subProtocol          The agreed subprotocol to use for this
143      *                             session
144      * @param pathParameters       The path parameters associated with the
145      *                             request that initiated this session or
146      *                             <code>null</code> if this is a client session
147      * @param secure               Was this session initiated over a secure
148      *                             connection?
149      * @param endpointConfig       The configuration information for the
150      *                             endpoint
151      * @throws DeploymentException if an invalid encode is specified
152      */
WsSession(Endpoint localEndpoint, WsRemoteEndpointImplBase wsRemoteEndpoint, WsWebSocketContainer wsWebSocketContainer, URI requestUri, Map<String, List<String>> requestParameterMap, String queryString, Principal userPrincipal, String httpSessionId, List<Extension> negotiatedExtensions, String subProtocol, Map<String, String> pathParameters, boolean secure, EndpointConfig endpointConfig, Request request)153     public WsSession(Endpoint localEndpoint,
154             WsRemoteEndpointImplBase wsRemoteEndpoint,
155             WsWebSocketContainer wsWebSocketContainer,
156             URI requestUri, Map<String, List<String>> requestParameterMap,
157             String queryString, Principal userPrincipal, String httpSessionId,
158             List<Extension> negotiatedExtensions, String subProtocol, Map<String, String> pathParameters,
159             boolean secure, EndpointConfig endpointConfig,
160             Request request) throws DeploymentException {
161         this.localEndpoint = localEndpoint;
162         this.wsRemoteEndpoint = wsRemoteEndpoint;
163         this.wsRemoteEndpoint.setSession(this);
164         this.wsRemoteEndpoint.setRequest(request);
165 
166         request.setWsSession(this);
167 
168         this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint);
169         this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint);
170         this.webSocketContainer = wsWebSocketContainer;
171         applicationClassLoader = Thread.currentThread().getContextClassLoader();
172         wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout());
173         this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize();
174         this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize();
175         this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout();
176         this.requestUri = requestUri;
177         if (requestParameterMap == null) {
178             this.requestParameterMap = Collections.emptyMap();
179         } else {
180             this.requestParameterMap = requestParameterMap;
181         }
182         this.queryString = queryString;
183         this.userPrincipal = userPrincipal;
184         this.httpSessionId = httpSessionId;
185         this.negotiatedExtensions = negotiatedExtensions;
186         if (subProtocol == null) {
187             this.subProtocol = "";
188         } else {
189             this.subProtocol = subProtocol;
190         }
191         this.pathParameters = pathParameters;
192         this.secure = secure;
193         this.wsRemoteEndpoint.setEncoders(endpointConfig);
194         this.endpointConfig = endpointConfig;
195 
196         this.userProperties.putAll(endpointConfig.getUserProperties());
197         this.id = Long.toHexString(ids.getAndIncrement());
198 
199         InstanceManager instanceManager = webSocketContainer.getInstanceManager();
200         if (instanceManager == null) {
201             instanceManager = InstanceManagerBindings.get(applicationClassLoader);
202         }
203         if (instanceManager != null) {
204             try {
205                 instanceManager.newInstance(localEndpoint);
206             } catch (Exception e) {
207                 throw new DeploymentException(sm.getString("wsSession.instanceNew"), e);
208             }
209         }
210 
211         if (log.isDebugEnabled()) {
212             log.debug(sm.getString("wsSession.created", id));
213         }
214 
215         messageBufferText = CharBuffer.allocate(maxTextMessageBufferSize);
216     }
217 
wsSession_test()218     public static String wsSession_test() {
219         return sm.getString("wsSession.instanceNew");
220     }
221 
222 
223     @Override
getContainer()224     public WebSocketContainer getContainer() {
225         checkState();
226         return webSocketContainer;
227     }
228 
229 
230     @Override
addMessageHandler(MessageHandler listener)231     public void addMessageHandler(MessageHandler listener) {
232         Class<?> target = Util.getMessageType(listener);
233         doAddMessageHandler(target, listener);
234     }
235 
236 
237     @Override
addMessageHandler(Class<T> clazz, Partial<T> handler)238     public <T> void addMessageHandler(Class<T> clazz, Partial<T> handler)
239             throws IllegalStateException {
240         doAddMessageHandler(clazz, handler);
241     }
242 
243 
244     @Override
addMessageHandler(Class<T> clazz, Whole<T> handler)245     public <T> void addMessageHandler(Class<T> clazz, Whole<T> handler)
246             throws IllegalStateException {
247         doAddMessageHandler(clazz, handler);
248     }
249 
250 
251     @SuppressWarnings("unchecked")
doAddMessageHandler(Class<?> target, MessageHandler listener)252     private void doAddMessageHandler(Class<?> target, MessageHandler listener) {
253         checkState();
254 
255         // Message handlers that require decoders may map to text messages,
256         // binary messages, both or neither.
257 
258         // The frame processing code expects binary message handlers to
259         // accept ByteBuffer
260 
261         // Use the POJO message handler wrappers as they are designed to wrap
262         // arbitrary objects with MessageHandlers and can wrap MessageHandlers
263         // just as easily.
264 
265         Set<MessageHandlerResult> mhResults = Util.getMessageHandlers(target, listener,
266                 endpointConfig, this);
267 
268         for (MessageHandlerResult mhResult : mhResults) {
269             switch (mhResult.getType()) {
270             case TEXT: {
271                 if (textMessageHandler != null) {
272                     throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerText"));
273                 }
274                 textMessageHandler = mhResult.getHandler();
275                 break;
276             }
277             case BINARY: {
278                 if (binaryMessageHandler != null) {
279                     throw new IllegalStateException(
280                             sm.getString("wsSession.duplicateHandlerBinary"));
281                 }
282                 binaryMessageHandler = mhResult.getHandler();
283                 break;
284             }
285             case PONG: {
286                 if (pongMessageHandler != null) {
287                     throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerPong"));
288                 }
289                 MessageHandler handler = mhResult.getHandler();
290                 if (handler instanceof MessageHandler.Whole<?>) {
291                     pongMessageHandler = (MessageHandler.Whole<PongMessage>) handler;
292                 } else {
293                     throw new IllegalStateException(
294                             sm.getString("wsSession.invalidHandlerTypePong"));
295                 }
296 
297                 break;
298             }
299             default: {
300                 throw new IllegalArgumentException(
301                         sm.getString("wsSession.unknownHandlerType", listener, mhResult.getType()));
302             }
303             }
304         }
305     }
306 
307 
308     @Override
getMessageHandlers()309     public Set<MessageHandler> getMessageHandlers() {
310         checkState();
311         Set<MessageHandler> result = new HashSet<>();
312         if (binaryMessageHandler != null) {
313             result.add(binaryMessageHandler);
314         }
315         if (textMessageHandler != null) {
316             result.add(textMessageHandler);
317         }
318         if (pongMessageHandler != null) {
319             result.add(pongMessageHandler);
320         }
321         return result;
322     }
323 
324 
325     @Override
removeMessageHandler(MessageHandler listener)326     public void removeMessageHandler(MessageHandler listener) {
327         checkState();
328         if (listener == null) {
329             return;
330         }
331 
332         MessageHandler wrapped = null;
333 
334         if (listener instanceof WrappedMessageHandler) {
335             wrapped = ((WrappedMessageHandler) listener).getWrappedHandler();
336         }
337 
338         if (wrapped == null) {
339             wrapped = listener;
340         }
341 
342         boolean removed = false;
343         if (wrapped.equals(textMessageHandler) || listener.equals(textMessageHandler)) {
344             textMessageHandler = null;
345             removed = true;
346         }
347 
348         if (wrapped.equals(binaryMessageHandler) || listener.equals(binaryMessageHandler)) {
349             binaryMessageHandler = null;
350             removed = true;
351         }
352 
353         if (wrapped.equals(pongMessageHandler) || listener.equals(pongMessageHandler)) {
354             pongMessageHandler = null;
355             removed = true;
356         }
357 
358         if (!removed) {
359             // ISE for now. Could swallow this silently / log this if the ISE
360             // becomes a problem
361             throw new IllegalStateException(
362                     sm.getString("wsSession.removeHandlerFailed", listener));
363         }
364     }
365 
366 
367     @Override
getProtocolVersion()368     public String getProtocolVersion() {
369         checkState();
370         return Constants.WS_VERSION_HEADER_VALUE;
371     }
372 
373 
374     @Override
getNegotiatedSubprotocol()375     public String getNegotiatedSubprotocol() {
376         checkState();
377         return subProtocol;
378     }
379 
380 
381     @Override
getNegotiatedExtensions()382     public List<Extension> getNegotiatedExtensions() {
383         checkState();
384         return negotiatedExtensions;
385     }
386 
387 
388     @Override
isSecure()389     public boolean isSecure() {
390         checkState();
391         return secure;
392     }
393 
394 
395     @Override
isOpen()396     public boolean isOpen() {
397         return state == State.OPEN;
398     }
399 
400 
401     @Override
getMaxIdleTimeout()402     public long getMaxIdleTimeout() {
403         checkState();
404         return maxIdleTimeout;
405     }
406 
407 
408     @Override
setMaxIdleTimeout(long timeout)409     public void setMaxIdleTimeout(long timeout) {
410         checkState();
411         this.maxIdleTimeout = timeout;
412     }
413 
414 
415     @Override
setMaxBinaryMessageBufferSize(int max)416     public void setMaxBinaryMessageBufferSize(int max) {
417         checkState();
418         this.maxBinaryMessageBufferSize = max;
419     }
420 
421 
422     @Override
getMaxBinaryMessageBufferSize()423     public int getMaxBinaryMessageBufferSize() {
424         checkState();
425         return maxBinaryMessageBufferSize;
426     }
427 
428 
429     @Override
setMaxTextMessageBufferSize(int max)430     public void setMaxTextMessageBufferSize(int max) {
431         checkState();
432         this.maxTextMessageBufferSize = max;
433     }
434 
435 
436     @Override
getMaxTextMessageBufferSize()437     public int getMaxTextMessageBufferSize() {
438         checkState();
439         return maxTextMessageBufferSize;
440     }
441 
442 
443     @Override
getOpenSessions()444     public Set<Session> getOpenSessions() {
445         checkState();
446         return webSocketContainer.getOpenSessions(localEndpoint);
447     }
448 
449 
450     @Override
getAsyncRemote()451     public RemoteEndpoint.Async getAsyncRemote() {
452         checkState();
453         return remoteEndpointAsync;
454     }
455 
456 
457     @Override
getBasicRemote()458     public RemoteEndpoint.Basic getBasicRemote() {
459         checkState();
460         return remoteEndpointBasic;
461     }
462 
463 
464     @Override
close()465     public void close() throws IOException {
466         close(new CloseReason(CloseCodes.NORMAL_CLOSURE, ""));
467     }
468 
469 
470     @Override
close(CloseReason closeReason)471     public void close(CloseReason closeReason) throws IOException {
472         doClose(closeReason, closeReason);
473     }
474 
475 
476     /**
477      * WebSocket 1.0. Section 2.1.5.
478      * Need internal close method as spec requires that the local endpoint
479      * receives a 1006 on timeout.
480      *
481      * @param closeReasonMessage The close reason to pass to the remote endpoint
482      * @param closeReasonLocal   The close reason to pass to the local endpoint
483      */
doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal)484     public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal) {
485         // Double-checked locking. OK because state is volatile
486         if (state != State.OPEN) {
487             return;
488         }
489 
490         synchronized (stateLock) {
491             if (state != State.OPEN) {
492                 return;
493             }
494 
495             if (log.isDebugEnabled()) {
496                 log.debug(sm.getString("wsSession.doClose", id));
497             }
498             try {
499                 wsRemoteEndpoint.setBatchingAllowed(false);
500             } catch (IOException e) {
501                 log.warn(sm.getString("wsSession.flushFailOnClose"), e);
502                 fireEndpointOnError(e);
503             }
504 
505             state = State.OUTPUT_CLOSED;
506 
507             sendCloseMessage(closeReasonMessage);
508             fireEndpointOnClose(closeReasonLocal);
509         }
510 
511         IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
512         SendResult sr = new SendResult(ioe);
513         for (FutureToSendHandler f2sh : futures.keySet()) {
514             f2sh.onResult(sr);
515         }
516     }
517 
518 
519     /**
520      * Called when a close message is received. Should only ever happen once.
521      * Also called after a protocol error when the ProtocolHandler needs to
522      * force the closing of the connection.
523      *
524      * @param closeReason The reason contained within the received close
525      *                    message.
526      */
onClose(CloseReason closeReason)527     public void onClose(CloseReason closeReason) {
528 
529         synchronized (stateLock) {
530             if (state != State.CLOSED) {
531                 try {
532                     wsRemoteEndpoint.setBatchingAllowed(false);
533                 } catch (IOException e) {
534                     log.warn(sm.getString("wsSession.flushFailOnClose"), e);
535                     fireEndpointOnError(e);
536                 }
537                 if (state == State.OPEN) {
538                     state = State.OUTPUT_CLOSED;
539                     sendCloseMessage(closeReason);
540                     fireEndpointOnClose(closeReason);
541                 }
542                 state = State.CLOSED;
543 
544                 // Close the socket
545                 wsRemoteEndpoint.close();
546             }
547         }
548     }
549 
550 
onClose()551     public void onClose() {
552 
553         synchronized (stateLock) {
554             if (state != State.CLOSED) {
555                 try {
556                     wsRemoteEndpoint.setBatchingAllowed(false);
557                 } catch (IOException e) {
558                     log.warn(sm.getString("wsSession.flushFailOnClose"), e);
559                     fireEndpointOnError(e);
560                 }
561                 if (state == State.OPEN) {
562                     state = State.OUTPUT_CLOSED;
563                     fireEndpointOnClose(new CloseReason(
564                         CloseReason.CloseCodes.NORMAL_CLOSURE, ""));
565                 }
566                 state = State.CLOSED;
567 
568                 // Close the socket
569                 wsRemoteEndpoint.close();
570             }
571         }
572     }
573 
574 
fireEndpointOnClose(CloseReason closeReason)575     private void fireEndpointOnClose(CloseReason closeReason) {
576 
577         // Fire the onClose event
578         Throwable throwable = null;
579         InstanceManager instanceManager = webSocketContainer.getInstanceManager();
580         if (instanceManager == null) {
581             instanceManager = InstanceManagerBindings.get(applicationClassLoader);
582         }
583         Thread t = Thread.currentThread();
584         ClassLoader cl = t.getContextClassLoader();
585         t.setContextClassLoader(applicationClassLoader);
586         try {
587             localEndpoint.onClose(this, closeReason);
588         } catch (Throwable t1) {
589             ExceptionUtils.handleThrowable(t1);
590             throwable = t1;
591         } finally {
592             if (instanceManager != null) {
593                 try {
594                     instanceManager.destroyInstance(localEndpoint);
595                 } catch (Throwable t2) {
596                     ExceptionUtils.handleThrowable(t2);
597                     if (throwable == null) {
598                         throwable = t2;
599                     }
600                 }
601             }
602             t.setContextClassLoader(cl);
603         }
604 
605         if (throwable != null) {
606             fireEndpointOnError(throwable);
607         }
608     }
609 
610 
fireEndpointOnError(Throwable throwable)611     private void fireEndpointOnError(Throwable throwable) {
612 
613         // Fire the onError event
614         Thread t = Thread.currentThread();
615         ClassLoader cl = t.getContextClassLoader();
616         t.setContextClassLoader(applicationClassLoader);
617         try {
618             localEndpoint.onError(this, throwable);
619         } finally {
620             t.setContextClassLoader(cl);
621         }
622     }
623 
624 
sendCloseMessage(CloseReason closeReason)625     private void sendCloseMessage(CloseReason closeReason) {
626         // 125 is maximum size for the payload of a control message
627         ByteBuffer msg = ByteBuffer.allocate(125);
628         CloseCode closeCode = closeReason.getCloseCode();
629         // CLOSED_ABNORMALLY should not be put on the wire
630         if (closeCode == CloseCodes.CLOSED_ABNORMALLY) {
631             // PROTOCOL_ERROR is probably better than GOING_AWAY here
632             msg.putShort((short) CloseCodes.PROTOCOL_ERROR.getCode());
633         } else {
634             msg.putShort((short) closeCode.getCode());
635         }
636 
637         String reason = closeReason.getReasonPhrase();
638         if (reason != null && reason.length() > 0) {
639             appendCloseReasonWithTruncation(msg, reason);
640         }
641         msg.flip();
642         try {
643             wsRemoteEndpoint.sendMessageBlock(Constants.OPCODE_CLOSE, msg, true);
644         } catch (IOException | WritePendingException e) {
645             // Failed to send close message. Close the socket and let the caller
646             // deal with the Exception
647             if (log.isDebugEnabled()) {
648                 log.debug(sm.getString("wsSession.sendCloseFail", id), e);
649             }
650             wsRemoteEndpoint.close();
651             // Failure to send a close message is not unexpected in the case of
652             // an abnormal closure (usually triggered by a failure to read/write
653             // from/to the client. In this case do not trigger the endpoint's
654             // error handling
655             if (closeCode != CloseCodes.CLOSED_ABNORMALLY) {
656                 localEndpoint.onError(this, e);
657             }
658         } finally {
659             webSocketContainer.unregisterSession(localEndpoint, this);
660         }
661     }
662 
663 
664     /**
665      * Use protected so unit tests can access this method directly.
666      * @param msg The message
667      * @param reason The reason
668      */
appendCloseReasonWithTruncation(ByteBuffer msg, String reason)669     protected static void appendCloseReasonWithTruncation(ByteBuffer msg, String reason) {
670         // Once the close code has been added there are a maximum of 123 bytes
671         // left for the reason phrase. If it is truncated then care needs to be
672         // taken to ensure the bytes are not truncated in the middle of a
673         // multi-byte UTF-8 character.
674         byte[] reasonBytes = reason.getBytes(StandardCharsets.UTF_8);
675 
676         if (reasonBytes.length <= 123) {
677             // No need to truncate
678             msg.put(reasonBytes);
679         } else {
680             // Need to truncate
681             int remaining = 123 - ELLIPSIS_BYTES_LEN;
682             int pos = 0;
683             byte[] bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8);
684             while (remaining >= bytesNext.length) {
685                 msg.put(bytesNext);
686                 remaining -= bytesNext.length;
687                 pos++;
688                 bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8);
689             }
690             msg.put(ELLIPSIS_BYTES);
691         }
692     }
693 
694 
695     /**
696      * Make the session aware of a {@link FutureToSendHandler} that will need to
697      * be forcibly closed if the session closes before the
698      * {@link FutureToSendHandler} completes.
699      * @param f2sh The handler
700      */
registerFuture(FutureToSendHandler f2sh)701     protected void registerFuture(FutureToSendHandler f2sh) {
702         // Ideally, this code should sync on stateLock so that the correct
703         // action is taken based on the current state of the connection.
704         // However, a sync on stateLock can't be used here as it will create the
705         // possibility of a dead-lock. See BZ 61183.
706         // Therefore, a slightly less efficient approach is used.
707 
708         // Always register the future.
709         futures.put(f2sh, f2sh);
710 
711         if (state == State.OPEN) {
712             // The session is open. The future has been registered with the open
713             // session. Normal processing continues.
714             return;
715         }
716 
717         // The session is closed. The future may or may not have been registered
718         // in time for it to be processed during session closure.
719 
720         if (f2sh.isDone()) {
721             // The future has completed. It is not known if the future was
722             // completed normally by the I/O layer or in error by doClose(). It
723             // doesn't matter which. There is nothing more to do here.
724             return;
725         }
726 
727         // The session is closed. The Future had not completed when last checked.
728         // There is a small timing window that means the Future may have been
729         // completed since the last check. There is also the possibility that
730         // the Future was not registered in time to be cleaned up during session
731         // close.
732         // Attempt to complete the Future with an error result as this ensures
733         // that the Future completes and any client code waiting on it does not
734         // hang. It is slightly inefficient since the Future may have been
735         // completed in another thread or another thread may be about to
736         // complete the Future but knowing if this is the case requires the sync
737         // on stateLock (see above).
738         // Note: If multiple attempts are made to complete the Future, the
739         //       second and subsequent attempts are ignored.
740 
741         IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
742         SendResult sr = new SendResult(ioe);
743         f2sh.onResult(sr);
744     }
745 
746 
747     /**
748      * Remove a {@link FutureToSendHandler} from the set of tracked instances.
749      * @param f2sh The handler
750      */
unregisterFuture(FutureToSendHandler f2sh)751     protected void unregisterFuture(FutureToSendHandler f2sh) {
752         futures.remove(f2sh);
753     }
754 
755 
756     @Override
getRequestURI()757     public URI getRequestURI() {
758         checkState();
759         return requestUri;
760     }
761 
762 
763     @Override
getRequestParameterMap()764     public Map<String, List<String>> getRequestParameterMap() {
765         checkState();
766         return requestParameterMap;
767     }
768 
769 
770     @Override
getQueryString()771     public String getQueryString() {
772         checkState();
773         return queryString;
774     }
775 
776 
777     @Override
getUserPrincipal()778     public Principal getUserPrincipal() {
779         checkState();
780         return userPrincipal;
781     }
782 
783 
784     @Override
getPathParameters()785     public Map<String, String> getPathParameters() {
786         checkState();
787         return pathParameters;
788     }
789 
790 
791     @Override
getId()792     public String getId() {
793         return id;
794     }
795 
796 
797     @Override
getUserProperties()798     public Map<String, Object> getUserProperties() {
799         checkState();
800         return userProperties;
801     }
802 
803 
getLocal()804     public Endpoint getLocal() {
805         return localEndpoint;
806     }
807 
808 
getHttpSessionId()809     public String getHttpSessionId() {
810         return httpSessionId;
811     }
812 
813     private ByteBuffer rawFragments;
814 
processFrame(ByteBuffer buf, byte opCode, boolean last)815     public void processFrame(ByteBuffer buf, byte opCode, boolean last)
816         throws IOException
817     {
818         if (state == State.CLOSED) {
819             return;
820         }
821 
822         if (opCode == Constants.OPCODE_CONTINUATION) {
823             opCode = startOpCode;
824 
825             if (rawFragments != null && rawFragments.position() > 0) {
826                 rawFragments.put(buf);
827                 rawFragments.flip();
828                 buf = rawFragments;
829             }
830         } else {
831             if (!last && (opCode == Constants.OPCODE_BINARY ||
832                           opCode == Constants.OPCODE_TEXT)) {
833                 startOpCode = opCode;
834 
835                 if (rawFragments != null) {
836                     rawFragments.clear();
837                 }
838             }
839         }
840 
841         if (last) {
842             startOpCode = Constants.OPCODE_CONTINUATION;
843         }
844 
845         if (opCode == Constants.OPCODE_PONG) {
846             if (pongMessageHandler != null) {
847                 final ByteBuffer b = buf;
848 
849                 PongMessage pongMessage = new PongMessage() {
850                     @Override
851                     public ByteBuffer getApplicationData() {
852                         return b;
853                     }
854                 };
855 
856                 pongMessageHandler.onMessage(pongMessage);
857             }
858         }
859 
860         if (opCode == Constants.OPCODE_CLOSE) {
861             CloseReason closeReason;
862 
863             if (buf.remaining() >= 2) {
864                 short closeCode = buf.order(ByteOrder.BIG_ENDIAN).getShort();
865 
866                 closeReason = new CloseReason(
867                     CloseReason.CloseCodes.getCloseCode(closeCode),
868                     buf.asCharBuffer().toString());
869             } else {
870                 closeReason = new CloseReason(
871                     CloseReason.CloseCodes.NORMAL_CLOSURE, "");
872             }
873 
874             onClose(closeReason);
875         }
876 
877         if (opCode == Constants.OPCODE_BINARY) {
878             onMessage(buf, last);
879         }
880 
881         if (opCode == Constants.OPCODE_TEXT) {
882             if (messageBufferText.position() == 0 && maxTextMessageBufferSize != messageBufferText.capacity()) {
883                 messageBufferText = CharBuffer.allocate(maxTextMessageBufferSize);
884             }
885 
886             CoderResult cr = utf8DecoderMessage.decode(buf, messageBufferText, last);
887             if (cr.isError()) {
888                 throw new WsIOException(new CloseReason(
889                         CloseCodes.NOT_CONSISTENT,
890                         sm.getString("wsFrame.invalidUtf8")));
891             } else if (cr.isOverflow()) {
892                 // Ran out of space in text buffer - flush it
893                 if (hasTextPartial()) {
894                     do {
895                         onMessage(messageBufferText, false);
896 
897                         cr = utf8DecoderMessage.decode(buf, messageBufferText, last);
898                     } while (cr.isOverflow());
899                 } else {
900                     throw new WsIOException(new CloseReason(
901                             CloseCodes.TOO_BIG,
902                             sm.getString("wsFrame.textMessageTooBig")));
903                 }
904             } else if (cr.isUnderflow() && !last) {
905                 updateRawFragments(buf, last);
906 
907                 if (hasTextPartial()) {
908                     onMessage(messageBufferText, false);
909                 }
910 
911                 return;
912             }
913 
914             if (last) {
915                 utf8DecoderMessage.reset();
916             }
917 
918             updateRawFragments(buf, last);
919 
920             onMessage(messageBufferText, last);
921         }
922     }
923 
924 
hasTextPartial()925     private boolean hasTextPartial() {
926         return textMessageHandler instanceof MessageHandler.Partial<?>;
927     }
928 
929 
onMessage(CharBuffer buf, boolean last)930     private void onMessage(CharBuffer buf, boolean last) throws IOException {
931         buf.flip();
932         try {
933             onMessage(buf.toString(), last);
934         } catch (Throwable t) {
935             handleThrowableOnSend(t);
936         } finally {
937             buf.clear();
938         }
939     }
940 
941 
updateRawFragments(ByteBuffer buf, boolean last)942     private void updateRawFragments(ByteBuffer buf, boolean last) {
943         if (!last && buf.remaining() > 0) {
944             if (buf == rawFragments) {
945                 buf.compact();
946             } else {
947                 if (rawFragments == null || (rawFragments.position() == 0 && maxTextMessageBufferSize != rawFragments.capacity())) {
948                     rawFragments = ByteBuffer.allocateDirect(maxTextMessageBufferSize);
949                 }
950                 rawFragments.put(buf);
951             }
952         } else {
953             if (rawFragments != null) {
954                 rawFragments.clear();
955             }
956         }
957     }
958 
959 
960     @SuppressWarnings("unchecked")
onMessage(String text, boolean last)961     public void onMessage(String text, boolean last) {
962         if (hasTextPartial()) {
963             ((MessageHandler.Partial<String>) textMessageHandler).onMessage(text, last);
964         } else {
965             // Caller ensures last == true if this branch is used
966             ((MessageHandler.Whole<String>) textMessageHandler).onMessage(text);
967         }
968     }
969 
970 
971     @SuppressWarnings("unchecked")
onMessage(ByteBuffer buf, boolean last)972     public void onMessage(ByteBuffer buf, boolean last)
973         throws IOException
974     {
975         if (binaryMessageHandler instanceof MessageHandler.Partial<?>) {
976             ((MessageHandler.Partial<ByteBuffer>) binaryMessageHandler).onMessage(buf, last);
977         } else {
978             if (last && (binaryBuffer == null || binaryBuffer.position() == 0)) {
979                 ((MessageHandler.Whole<ByteBuffer>) binaryMessageHandler).onMessage(buf);
980                 return;
981             }
982 
983             if (binaryBuffer == null ||
984                 (binaryBuffer.position() == 0 && binaryBuffer.capacity() != maxBinaryMessageBufferSize))
985             {
986                 binaryBuffer = ByteBuffer.allocateDirect(maxBinaryMessageBufferSize);
987             }
988 
989             if (binaryBuffer.remaining() < buf.remaining()) {
990                 throw new WsIOException(new CloseReason(
991                         CloseCodes.TOO_BIG,
992                         sm.getString("wsFrame.textMessageTooBig")));
993             }
994 
995             binaryBuffer.put(buf);
996 
997             if (last) {
998                 binaryBuffer.flip();
999                 try {
1000                     ((MessageHandler.Whole<ByteBuffer>) binaryMessageHandler).onMessage(binaryBuffer);
1001                 } finally {
1002                     binaryBuffer.clear();
1003                 }
1004             }
1005         }
1006     }
1007 
1008 
handleThrowableOnSend(Throwable t)1009     private void handleThrowableOnSend(Throwable t) throws WsIOException {
1010         ExceptionUtils.handleThrowable(t);
1011         getLocal().onError(this, t);
1012         CloseReason cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY,
1013                 sm.getString("wsFrame.ioeTriggeredClose"));
1014         throw new WsIOException(cr);
1015     }
1016 
1017 
getTextMessageHandler()1018     protected MessageHandler getTextMessageHandler() {
1019         return textMessageHandler;
1020     }
1021 
1022 
getBinaryMessageHandler()1023     protected MessageHandler getBinaryMessageHandler() {
1024         return binaryMessageHandler;
1025     }
1026 
1027 
getPongMessageHandler()1028     protected MessageHandler.Whole<PongMessage> getPongMessageHandler() {
1029         return pongMessageHandler;
1030     }
1031 
1032 
updateLastActive()1033     protected void updateLastActive() {
1034         lastActive = System.currentTimeMillis();
1035     }
1036 
1037 
checkExpiration()1038     protected void checkExpiration() {
1039         long timeout = maxIdleTimeout;
1040         if (timeout < 1) {
1041             return;
1042         }
1043 
1044         if (System.currentTimeMillis() - lastActive > timeout) {
1045             String msg = sm.getString("wsSession.timeout", getId());
1046             if (log.isDebugEnabled()) {
1047                 log.debug(msg);
1048             }
1049             doClose(new CloseReason(CloseCodes.GOING_AWAY, msg),
1050                     new CloseReason(CloseCodes.CLOSED_ABNORMALLY, msg));
1051         }
1052     }
1053 
1054 
checkState()1055     private void checkState() {
1056         if (state == State.CLOSED) {
1057             /*
1058              * As per RFC 6455, a WebSocket connection is considered to be
1059              * closed once a peer has sent and received a WebSocket close frame.
1060              */
1061             throw new IllegalStateException(sm.getString("wsSession.closed", id));
1062         }
1063     }
1064 
1065     private enum State {
1066         OPEN,
1067         OUTPUT_CLOSED,
1068         CLOSED
1069     }
1070 }
1071