1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package nginx.unit.websocket; 18 19 import java.io.IOException; 20 import java.net.URI; 21 import java.nio.ByteBuffer; 22 import java.nio.ByteOrder; 23 import java.nio.CharBuffer; 24 import java.nio.channels.WritePendingException; 25 import java.nio.charset.CharsetDecoder; 26 import java.nio.charset.CoderResult; 27 import java.nio.charset.CodingErrorAction; 28 import java.nio.charset.StandardCharsets; 29 import java.security.Principal; 30 import java.util.Collections; 31 import java.util.HashSet; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.Set; 35 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.atomic.AtomicLong; 37 38 import javax.websocket.CloseReason; 39 import javax.websocket.CloseReason.CloseCode; 40 import javax.websocket.CloseReason.CloseCodes; 41 import javax.websocket.DeploymentException; 42 import javax.websocket.Endpoint; 43 import javax.websocket.EndpointConfig; 44 import javax.websocket.Extension; 45 import javax.websocket.MessageHandler; 46 import javax.websocket.MessageHandler.Partial; 47 import javax.websocket.MessageHandler.Whole; 48 import javax.websocket.PongMessage; 49 import javax.websocket.RemoteEndpoint; 50 import javax.websocket.SendResult; 51 import javax.websocket.Session; 52 import javax.websocket.WebSocketContainer; 53 54 import org.apache.juli.logging.Log; 55 import org.apache.juli.logging.LogFactory; 56 import org.apache.tomcat.InstanceManager; 57 import org.apache.tomcat.InstanceManagerBindings; 58 import org.apache.tomcat.util.ExceptionUtils; 59 import org.apache.tomcat.util.buf.Utf8Decoder; 60 import org.apache.tomcat.util.res.StringManager; 61 62 import nginx.unit.Request; 63 64 public class WsSession implements Session { 65 66 // An ellipsis is a single character that looks like three periods in a row 67 // and is used to indicate a continuation. 68 private static final byte[] ELLIPSIS_BYTES = "\u2026".getBytes(StandardCharsets.UTF_8); 69 // An ellipsis is three bytes in UTF-8 70 private static final int ELLIPSIS_BYTES_LEN = ELLIPSIS_BYTES.length; 71 72 private static final StringManager sm = StringManager.getManager(WsSession.class); 73 private static AtomicLong ids = new AtomicLong(0); 74 75 private final Log log = LogFactory.getLog(WsSession.class); // must not be static 76 77 private final CharsetDecoder utf8DecoderMessage = new Utf8Decoder(). 78 onMalformedInput(CodingErrorAction.REPORT). 79 onUnmappableCharacter(CodingErrorAction.REPORT); 80 81 private final Endpoint localEndpoint; 82 private final WsRemoteEndpointImplBase wsRemoteEndpoint; 83 private final RemoteEndpoint.Async remoteEndpointAsync; 84 private final RemoteEndpoint.Basic remoteEndpointBasic; 85 private final ClassLoader applicationClassLoader; 86 private final WsWebSocketContainer webSocketContainer; 87 private final URI requestUri; 88 private final Map<String, List<String>> requestParameterMap; 89 private final String queryString; 90 private final Principal userPrincipal; 91 private final EndpointConfig endpointConfig; 92 93 private final List<Extension> negotiatedExtensions; 94 private final String subProtocol; 95 private final Map<String, String> pathParameters; 96 private final boolean secure; 97 private final String httpSessionId; 98 private final String id; 99 100 // Expected to handle message types of <String> only 101 private volatile MessageHandler textMessageHandler = null; 102 // Expected to handle message types of <ByteBuffer> only 103 private volatile MessageHandler binaryMessageHandler = null; 104 private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = null; 105 private volatile State state = State.OPEN; 106 private final Object stateLock = new Object(); 107 private final Map<String, Object> userProperties = new ConcurrentHashMap<>(); 108 private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; 109 private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE; 110 private volatile long maxIdleTimeout = 0; 111 private volatile long lastActive = System.currentTimeMillis(); 112 private Map<FutureToSendHandler, FutureToSendHandler> futures = new ConcurrentHashMap<>(); 113 114 private CharBuffer messageBufferText; 115 private ByteBuffer binaryBuffer; 116 private byte startOpCode = Constants.OPCODE_CONTINUATION; 117 118 /** 119 * Creates a new WebSocket session for communication between the two 120 * provided end points. The result of {@link Thread#getContextClassLoader()} 121 * at the time this constructor is called will be used when calling 122 * {@link Endpoint#onClose(Session, CloseReason)}. 123 * 124 * @param localEndpoint The end point managed by this code 125 * @param wsRemoteEndpoint The other / remote endpoint 126 * @param wsWebSocketContainer The container that created this session 127 * @param requestUri The URI used to connect to this endpoint or 128 * <code>null</code> is this is a client session 129 * @param requestParameterMap The parameters associated with the request 130 * that initiated this session or 131 * <code>null</code> if this is a client session 132 * @param queryString The query string associated with the request 133 * that initiated this session or 134 * <code>null</code> if this is a client session 135 * @param userPrincipal The principal associated with the request 136 * that initiated this session or 137 * <code>null</code> if this is a client session 138 * @param httpSessionId The HTTP session ID associated with the 139 * request that initiated this session or 140 * <code>null</code> if this is a client session 141 * @param negotiatedExtensions The agreed extensions to use for this session 142 * @param subProtocol The agreed subprotocol to use for this 143 * session 144 * @param pathParameters The path parameters associated with the 145 * request that initiated this session or 146 * <code>null</code> if this is a client session 147 * @param secure Was this session initiated over a secure 148 * connection? 149 * @param endpointConfig The configuration information for the 150 * endpoint 151 * @throws DeploymentException if an invalid encode is specified 152 */ WsSession(Endpoint localEndpoint, WsRemoteEndpointImplBase wsRemoteEndpoint, WsWebSocketContainer wsWebSocketContainer, URI requestUri, Map<String, List<String>> requestParameterMap, String queryString, Principal userPrincipal, String httpSessionId, List<Extension> negotiatedExtensions, String subProtocol, Map<String, String> pathParameters, boolean secure, EndpointConfig endpointConfig, Request request)153 public WsSession(Endpoint localEndpoint, 154 WsRemoteEndpointImplBase wsRemoteEndpoint, 155 WsWebSocketContainer wsWebSocketContainer, 156 URI requestUri, Map<String, List<String>> requestParameterMap, 157 String queryString, Principal userPrincipal, String httpSessionId, 158 List<Extension> negotiatedExtensions, String subProtocol, Map<String, String> pathParameters, 159 boolean secure, EndpointConfig endpointConfig, 160 Request request) throws DeploymentException { 161 this.localEndpoint = localEndpoint; 162 this.wsRemoteEndpoint = wsRemoteEndpoint; 163 this.wsRemoteEndpoint.setSession(this); 164 this.wsRemoteEndpoint.setRequest(request); 165 166 request.setWsSession(this); 167 168 this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint); 169 this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint); 170 this.webSocketContainer = wsWebSocketContainer; 171 applicationClassLoader = Thread.currentThread().getContextClassLoader(); 172 wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout()); 173 this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize(); 174 this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize(); 175 this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout(); 176 this.requestUri = requestUri; 177 if (requestParameterMap == null) { 178 this.requestParameterMap = Collections.emptyMap(); 179 } else { 180 this.requestParameterMap = requestParameterMap; 181 } 182 this.queryString = queryString; 183 this.userPrincipal = userPrincipal; 184 this.httpSessionId = httpSessionId; 185 this.negotiatedExtensions = negotiatedExtensions; 186 if (subProtocol == null) { 187 this.subProtocol = ""; 188 } else { 189 this.subProtocol = subProtocol; 190 } 191 this.pathParameters = pathParameters; 192 this.secure = secure; 193 this.wsRemoteEndpoint.setEncoders(endpointConfig); 194 this.endpointConfig = endpointConfig; 195 196 this.userProperties.putAll(endpointConfig.getUserProperties()); 197 this.id = Long.toHexString(ids.getAndIncrement()); 198 199 InstanceManager instanceManager = webSocketContainer.getInstanceManager(); 200 if (instanceManager == null) { 201 instanceManager = InstanceManagerBindings.get(applicationClassLoader); 202 } 203 if (instanceManager != null) { 204 try { 205 instanceManager.newInstance(localEndpoint); 206 } catch (Exception e) { 207 throw new DeploymentException(sm.getString("wsSession.instanceNew"), e); 208 } 209 } 210 211 if (log.isDebugEnabled()) { 212 log.debug(sm.getString("wsSession.created", id)); 213 } 214 215 messageBufferText = CharBuffer.allocate(maxTextMessageBufferSize); 216 } 217 wsSession_test()218 public static String wsSession_test() { 219 return sm.getString("wsSession.instanceNew"); 220 } 221 222 223 @Override getContainer()224 public WebSocketContainer getContainer() { 225 checkState(); 226 return webSocketContainer; 227 } 228 229 230 @Override addMessageHandler(MessageHandler listener)231 public void addMessageHandler(MessageHandler listener) { 232 Class<?> target = Util.getMessageType(listener); 233 doAddMessageHandler(target, listener); 234 } 235 236 237 @Override addMessageHandler(Class<T> clazz, Partial<T> handler)238 public <T> void addMessageHandler(Class<T> clazz, Partial<T> handler) 239 throws IllegalStateException { 240 doAddMessageHandler(clazz, handler); 241 } 242 243 244 @Override addMessageHandler(Class<T> clazz, Whole<T> handler)245 public <T> void addMessageHandler(Class<T> clazz, Whole<T> handler) 246 throws IllegalStateException { 247 doAddMessageHandler(clazz, handler); 248 } 249 250 251 @SuppressWarnings("unchecked") doAddMessageHandler(Class<?> target, MessageHandler listener)252 private void doAddMessageHandler(Class<?> target, MessageHandler listener) { 253 checkState(); 254 255 // Message handlers that require decoders may map to text messages, 256 // binary messages, both or neither. 257 258 // The frame processing code expects binary message handlers to 259 // accept ByteBuffer 260 261 // Use the POJO message handler wrappers as they are designed to wrap 262 // arbitrary objects with MessageHandlers and can wrap MessageHandlers 263 // just as easily. 264 265 Set<MessageHandlerResult> mhResults = Util.getMessageHandlers(target, listener, 266 endpointConfig, this); 267 268 for (MessageHandlerResult mhResult : mhResults) { 269 switch (mhResult.getType()) { 270 case TEXT: { 271 if (textMessageHandler != null) { 272 throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerText")); 273 } 274 textMessageHandler = mhResult.getHandler(); 275 break; 276 } 277 case BINARY: { 278 if (binaryMessageHandler != null) { 279 throw new IllegalStateException( 280 sm.getString("wsSession.duplicateHandlerBinary")); 281 } 282 binaryMessageHandler = mhResult.getHandler(); 283 break; 284 } 285 case PONG: { 286 if (pongMessageHandler != null) { 287 throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerPong")); 288 } 289 MessageHandler handler = mhResult.getHandler(); 290 if (handler instanceof MessageHandler.Whole<?>) { 291 pongMessageHandler = (MessageHandler.Whole<PongMessage>) handler; 292 } else { 293 throw new IllegalStateException( 294 sm.getString("wsSession.invalidHandlerTypePong")); 295 } 296 297 break; 298 } 299 default: { 300 throw new IllegalArgumentException( 301 sm.getString("wsSession.unknownHandlerType", listener, mhResult.getType())); 302 } 303 } 304 } 305 } 306 307 308 @Override getMessageHandlers()309 public Set<MessageHandler> getMessageHandlers() { 310 checkState(); 311 Set<MessageHandler> result = new HashSet<>(); 312 if (binaryMessageHandler != null) { 313 result.add(binaryMessageHandler); 314 } 315 if (textMessageHandler != null) { 316 result.add(textMessageHandler); 317 } 318 if (pongMessageHandler != null) { 319 result.add(pongMessageHandler); 320 } 321 return result; 322 } 323 324 325 @Override removeMessageHandler(MessageHandler listener)326 public void removeMessageHandler(MessageHandler listener) { 327 checkState(); 328 if (listener == null) { 329 return; 330 } 331 332 MessageHandler wrapped = null; 333 334 if (listener instanceof WrappedMessageHandler) { 335 wrapped = ((WrappedMessageHandler) listener).getWrappedHandler(); 336 } 337 338 if (wrapped == null) { 339 wrapped = listener; 340 } 341 342 boolean removed = false; 343 if (wrapped.equals(textMessageHandler) || listener.equals(textMessageHandler)) { 344 textMessageHandler = null; 345 removed = true; 346 } 347 348 if (wrapped.equals(binaryMessageHandler) || listener.equals(binaryMessageHandler)) { 349 binaryMessageHandler = null; 350 removed = true; 351 } 352 353 if (wrapped.equals(pongMessageHandler) || listener.equals(pongMessageHandler)) { 354 pongMessageHandler = null; 355 removed = true; 356 } 357 358 if (!removed) { 359 // ISE for now. Could swallow this silently / log this if the ISE 360 // becomes a problem 361 throw new IllegalStateException( 362 sm.getString("wsSession.removeHandlerFailed", listener)); 363 } 364 } 365 366 367 @Override getProtocolVersion()368 public String getProtocolVersion() { 369 checkState(); 370 return Constants.WS_VERSION_HEADER_VALUE; 371 } 372 373 374 @Override getNegotiatedSubprotocol()375 public String getNegotiatedSubprotocol() { 376 checkState(); 377 return subProtocol; 378 } 379 380 381 @Override getNegotiatedExtensions()382 public List<Extension> getNegotiatedExtensions() { 383 checkState(); 384 return negotiatedExtensions; 385 } 386 387 388 @Override isSecure()389 public boolean isSecure() { 390 checkState(); 391 return secure; 392 } 393 394 395 @Override isOpen()396 public boolean isOpen() { 397 return state == State.OPEN; 398 } 399 400 401 @Override getMaxIdleTimeout()402 public long getMaxIdleTimeout() { 403 checkState(); 404 return maxIdleTimeout; 405 } 406 407 408 @Override setMaxIdleTimeout(long timeout)409 public void setMaxIdleTimeout(long timeout) { 410 checkState(); 411 this.maxIdleTimeout = timeout; 412 } 413 414 415 @Override setMaxBinaryMessageBufferSize(int max)416 public void setMaxBinaryMessageBufferSize(int max) { 417 checkState(); 418 this.maxBinaryMessageBufferSize = max; 419 } 420 421 422 @Override getMaxBinaryMessageBufferSize()423 public int getMaxBinaryMessageBufferSize() { 424 checkState(); 425 return maxBinaryMessageBufferSize; 426 } 427 428 429 @Override setMaxTextMessageBufferSize(int max)430 public void setMaxTextMessageBufferSize(int max) { 431 checkState(); 432 this.maxTextMessageBufferSize = max; 433 } 434 435 436 @Override getMaxTextMessageBufferSize()437 public int getMaxTextMessageBufferSize() { 438 checkState(); 439 return maxTextMessageBufferSize; 440 } 441 442 443 @Override getOpenSessions()444 public Set<Session> getOpenSessions() { 445 checkState(); 446 return webSocketContainer.getOpenSessions(localEndpoint); 447 } 448 449 450 @Override getAsyncRemote()451 public RemoteEndpoint.Async getAsyncRemote() { 452 checkState(); 453 return remoteEndpointAsync; 454 } 455 456 457 @Override getBasicRemote()458 public RemoteEndpoint.Basic getBasicRemote() { 459 checkState(); 460 return remoteEndpointBasic; 461 } 462 463 464 @Override close()465 public void close() throws IOException { 466 close(new CloseReason(CloseCodes.NORMAL_CLOSURE, "")); 467 } 468 469 470 @Override close(CloseReason closeReason)471 public void close(CloseReason closeReason) throws IOException { 472 doClose(closeReason, closeReason); 473 } 474 475 476 /** 477 * WebSocket 1.0. Section 2.1.5. 478 * Need internal close method as spec requires that the local endpoint 479 * receives a 1006 on timeout. 480 * 481 * @param closeReasonMessage The close reason to pass to the remote endpoint 482 * @param closeReasonLocal The close reason to pass to the local endpoint 483 */ doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal)484 public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal) { 485 // Double-checked locking. OK because state is volatile 486 if (state != State.OPEN) { 487 return; 488 } 489 490 synchronized (stateLock) { 491 if (state != State.OPEN) { 492 return; 493 } 494 495 if (log.isDebugEnabled()) { 496 log.debug(sm.getString("wsSession.doClose", id)); 497 } 498 try { 499 wsRemoteEndpoint.setBatchingAllowed(false); 500 } catch (IOException e) { 501 log.warn(sm.getString("wsSession.flushFailOnClose"), e); 502 fireEndpointOnError(e); 503 } 504 505 state = State.OUTPUT_CLOSED; 506 507 sendCloseMessage(closeReasonMessage); 508 fireEndpointOnClose(closeReasonLocal); 509 } 510 511 IOException ioe = new IOException(sm.getString("wsSession.messageFailed")); 512 SendResult sr = new SendResult(ioe); 513 for (FutureToSendHandler f2sh : futures.keySet()) { 514 f2sh.onResult(sr); 515 } 516 } 517 518 519 /** 520 * Called when a close message is received. Should only ever happen once. 521 * Also called after a protocol error when the ProtocolHandler needs to 522 * force the closing of the connection. 523 * 524 * @param closeReason The reason contained within the received close 525 * message. 526 */ onClose(CloseReason closeReason)527 public void onClose(CloseReason closeReason) { 528 529 synchronized (stateLock) { 530 if (state != State.CLOSED) { 531 try { 532 wsRemoteEndpoint.setBatchingAllowed(false); 533 } catch (IOException e) { 534 log.warn(sm.getString("wsSession.flushFailOnClose"), e); 535 fireEndpointOnError(e); 536 } 537 if (state == State.OPEN) { 538 state = State.OUTPUT_CLOSED; 539 sendCloseMessage(closeReason); 540 fireEndpointOnClose(closeReason); 541 } 542 state = State.CLOSED; 543 544 // Close the socket 545 wsRemoteEndpoint.close(); 546 } 547 } 548 } 549 550 onClose()551 public void onClose() { 552 553 synchronized (stateLock) { 554 if (state != State.CLOSED) { 555 try { 556 wsRemoteEndpoint.setBatchingAllowed(false); 557 } catch (IOException e) { 558 log.warn(sm.getString("wsSession.flushFailOnClose"), e); 559 fireEndpointOnError(e); 560 } 561 if (state == State.OPEN) { 562 state = State.OUTPUT_CLOSED; 563 fireEndpointOnClose(new CloseReason( 564 CloseReason.CloseCodes.NORMAL_CLOSURE, "")); 565 } 566 state = State.CLOSED; 567 568 // Close the socket 569 wsRemoteEndpoint.close(); 570 } 571 } 572 } 573 574 fireEndpointOnClose(CloseReason closeReason)575 private void fireEndpointOnClose(CloseReason closeReason) { 576 577 // Fire the onClose event 578 Throwable throwable = null; 579 InstanceManager instanceManager = webSocketContainer.getInstanceManager(); 580 if (instanceManager == null) { 581 instanceManager = InstanceManagerBindings.get(applicationClassLoader); 582 } 583 Thread t = Thread.currentThread(); 584 ClassLoader cl = t.getContextClassLoader(); 585 t.setContextClassLoader(applicationClassLoader); 586 try { 587 localEndpoint.onClose(this, closeReason); 588 } catch (Throwable t1) { 589 ExceptionUtils.handleThrowable(t1); 590 throwable = t1; 591 } finally { 592 if (instanceManager != null) { 593 try { 594 instanceManager.destroyInstance(localEndpoint); 595 } catch (Throwable t2) { 596 ExceptionUtils.handleThrowable(t2); 597 if (throwable == null) { 598 throwable = t2; 599 } 600 } 601 } 602 t.setContextClassLoader(cl); 603 } 604 605 if (throwable != null) { 606 fireEndpointOnError(throwable); 607 } 608 } 609 610 fireEndpointOnError(Throwable throwable)611 private void fireEndpointOnError(Throwable throwable) { 612 613 // Fire the onError event 614 Thread t = Thread.currentThread(); 615 ClassLoader cl = t.getContextClassLoader(); 616 t.setContextClassLoader(applicationClassLoader); 617 try { 618 localEndpoint.onError(this, throwable); 619 } finally { 620 t.setContextClassLoader(cl); 621 } 622 } 623 624 sendCloseMessage(CloseReason closeReason)625 private void sendCloseMessage(CloseReason closeReason) { 626 // 125 is maximum size for the payload of a control message 627 ByteBuffer msg = ByteBuffer.allocate(125); 628 CloseCode closeCode = closeReason.getCloseCode(); 629 // CLOSED_ABNORMALLY should not be put on the wire 630 if (closeCode == CloseCodes.CLOSED_ABNORMALLY) { 631 // PROTOCOL_ERROR is probably better than GOING_AWAY here 632 msg.putShort((short) CloseCodes.PROTOCOL_ERROR.getCode()); 633 } else { 634 msg.putShort((short) closeCode.getCode()); 635 } 636 637 String reason = closeReason.getReasonPhrase(); 638 if (reason != null && reason.length() > 0) { 639 appendCloseReasonWithTruncation(msg, reason); 640 } 641 msg.flip(); 642 try { 643 wsRemoteEndpoint.sendMessageBlock(Constants.OPCODE_CLOSE, msg, true); 644 } catch (IOException | WritePendingException e) { 645 // Failed to send close message. Close the socket and let the caller 646 // deal with the Exception 647 if (log.isDebugEnabled()) { 648 log.debug(sm.getString("wsSession.sendCloseFail", id), e); 649 } 650 wsRemoteEndpoint.close(); 651 // Failure to send a close message is not unexpected in the case of 652 // an abnormal closure (usually triggered by a failure to read/write 653 // from/to the client. In this case do not trigger the endpoint's 654 // error handling 655 if (closeCode != CloseCodes.CLOSED_ABNORMALLY) { 656 localEndpoint.onError(this, e); 657 } 658 } finally { 659 webSocketContainer.unregisterSession(localEndpoint, this); 660 } 661 } 662 663 664 /** 665 * Use protected so unit tests can access this method directly. 666 * @param msg The message 667 * @param reason The reason 668 */ appendCloseReasonWithTruncation(ByteBuffer msg, String reason)669 protected static void appendCloseReasonWithTruncation(ByteBuffer msg, String reason) { 670 // Once the close code has been added there are a maximum of 123 bytes 671 // left for the reason phrase. If it is truncated then care needs to be 672 // taken to ensure the bytes are not truncated in the middle of a 673 // multi-byte UTF-8 character. 674 byte[] reasonBytes = reason.getBytes(StandardCharsets.UTF_8); 675 676 if (reasonBytes.length <= 123) { 677 // No need to truncate 678 msg.put(reasonBytes); 679 } else { 680 // Need to truncate 681 int remaining = 123 - ELLIPSIS_BYTES_LEN; 682 int pos = 0; 683 byte[] bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8); 684 while (remaining >= bytesNext.length) { 685 msg.put(bytesNext); 686 remaining -= bytesNext.length; 687 pos++; 688 bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8); 689 } 690 msg.put(ELLIPSIS_BYTES); 691 } 692 } 693 694 695 /** 696 * Make the session aware of a {@link FutureToSendHandler} that will need to 697 * be forcibly closed if the session closes before the 698 * {@link FutureToSendHandler} completes. 699 * @param f2sh The handler 700 */ registerFuture(FutureToSendHandler f2sh)701 protected void registerFuture(FutureToSendHandler f2sh) { 702 // Ideally, this code should sync on stateLock so that the correct 703 // action is taken based on the current state of the connection. 704 // However, a sync on stateLock can't be used here as it will create the 705 // possibility of a dead-lock. See BZ 61183. 706 // Therefore, a slightly less efficient approach is used. 707 708 // Always register the future. 709 futures.put(f2sh, f2sh); 710 711 if (state == State.OPEN) { 712 // The session is open. The future has been registered with the open 713 // session. Normal processing continues. 714 return; 715 } 716 717 // The session is closed. The future may or may not have been registered 718 // in time for it to be processed during session closure. 719 720 if (f2sh.isDone()) { 721 // The future has completed. It is not known if the future was 722 // completed normally by the I/O layer or in error by doClose(). It 723 // doesn't matter which. There is nothing more to do here. 724 return; 725 } 726 727 // The session is closed. The Future had not completed when last checked. 728 // There is a small timing window that means the Future may have been 729 // completed since the last check. There is also the possibility that 730 // the Future was not registered in time to be cleaned up during session 731 // close. 732 // Attempt to complete the Future with an error result as this ensures 733 // that the Future completes and any client code waiting on it does not 734 // hang. It is slightly inefficient since the Future may have been 735 // completed in another thread or another thread may be about to 736 // complete the Future but knowing if this is the case requires the sync 737 // on stateLock (see above). 738 // Note: If multiple attempts are made to complete the Future, the 739 // second and subsequent attempts are ignored. 740 741 IOException ioe = new IOException(sm.getString("wsSession.messageFailed")); 742 SendResult sr = new SendResult(ioe); 743 f2sh.onResult(sr); 744 } 745 746 747 /** 748 * Remove a {@link FutureToSendHandler} from the set of tracked instances. 749 * @param f2sh The handler 750 */ unregisterFuture(FutureToSendHandler f2sh)751 protected void unregisterFuture(FutureToSendHandler f2sh) { 752 futures.remove(f2sh); 753 } 754 755 756 @Override getRequestURI()757 public URI getRequestURI() { 758 checkState(); 759 return requestUri; 760 } 761 762 763 @Override getRequestParameterMap()764 public Map<String, List<String>> getRequestParameterMap() { 765 checkState(); 766 return requestParameterMap; 767 } 768 769 770 @Override getQueryString()771 public String getQueryString() { 772 checkState(); 773 return queryString; 774 } 775 776 777 @Override getUserPrincipal()778 public Principal getUserPrincipal() { 779 checkState(); 780 return userPrincipal; 781 } 782 783 784 @Override getPathParameters()785 public Map<String, String> getPathParameters() { 786 checkState(); 787 return pathParameters; 788 } 789 790 791 @Override getId()792 public String getId() { 793 return id; 794 } 795 796 797 @Override getUserProperties()798 public Map<String, Object> getUserProperties() { 799 checkState(); 800 return userProperties; 801 } 802 803 getLocal()804 public Endpoint getLocal() { 805 return localEndpoint; 806 } 807 808 getHttpSessionId()809 public String getHttpSessionId() { 810 return httpSessionId; 811 } 812 813 private ByteBuffer rawFragments; 814 processFrame(ByteBuffer buf, byte opCode, boolean last)815 public void processFrame(ByteBuffer buf, byte opCode, boolean last) 816 throws IOException 817 { 818 if (state == State.CLOSED) { 819 return; 820 } 821 822 if (opCode == Constants.OPCODE_CONTINUATION) { 823 opCode = startOpCode; 824 825 if (rawFragments != null && rawFragments.position() > 0) { 826 rawFragments.put(buf); 827 rawFragments.flip(); 828 buf = rawFragments; 829 } 830 } else { 831 if (!last && (opCode == Constants.OPCODE_BINARY || 832 opCode == Constants.OPCODE_TEXT)) { 833 startOpCode = opCode; 834 835 if (rawFragments != null) { 836 rawFragments.clear(); 837 } 838 } 839 } 840 841 if (last) { 842 startOpCode = Constants.OPCODE_CONTINUATION; 843 } 844 845 if (opCode == Constants.OPCODE_PONG) { 846 if (pongMessageHandler != null) { 847 final ByteBuffer b = buf; 848 849 PongMessage pongMessage = new PongMessage() { 850 @Override 851 public ByteBuffer getApplicationData() { 852 return b; 853 } 854 }; 855 856 pongMessageHandler.onMessage(pongMessage); 857 } 858 } 859 860 if (opCode == Constants.OPCODE_CLOSE) { 861 CloseReason closeReason; 862 863 if (buf.remaining() >= 2) { 864 short closeCode = buf.order(ByteOrder.BIG_ENDIAN).getShort(); 865 866 closeReason = new CloseReason( 867 CloseReason.CloseCodes.getCloseCode(closeCode), 868 buf.asCharBuffer().toString()); 869 } else { 870 closeReason = new CloseReason( 871 CloseReason.CloseCodes.NORMAL_CLOSURE, ""); 872 } 873 874 onClose(closeReason); 875 } 876 877 if (opCode == Constants.OPCODE_BINARY) { 878 onMessage(buf, last); 879 } 880 881 if (opCode == Constants.OPCODE_TEXT) { 882 if (messageBufferText.position() == 0 && maxTextMessageBufferSize != messageBufferText.capacity()) { 883 messageBufferText = CharBuffer.allocate(maxTextMessageBufferSize); 884 } 885 886 CoderResult cr = utf8DecoderMessage.decode(buf, messageBufferText, last); 887 if (cr.isError()) { 888 throw new WsIOException(new CloseReason( 889 CloseCodes.NOT_CONSISTENT, 890 sm.getString("wsFrame.invalidUtf8"))); 891 } else if (cr.isOverflow()) { 892 // Ran out of space in text buffer - flush it 893 if (hasTextPartial()) { 894 do { 895 onMessage(messageBufferText, false); 896 897 cr = utf8DecoderMessage.decode(buf, messageBufferText, last); 898 } while (cr.isOverflow()); 899 } else { 900 throw new WsIOException(new CloseReason( 901 CloseCodes.TOO_BIG, 902 sm.getString("wsFrame.textMessageTooBig"))); 903 } 904 } else if (cr.isUnderflow() && !last) { 905 updateRawFragments(buf, last); 906 907 if (hasTextPartial()) { 908 onMessage(messageBufferText, false); 909 } 910 911 return; 912 } 913 914 if (last) { 915 utf8DecoderMessage.reset(); 916 } 917 918 updateRawFragments(buf, last); 919 920 onMessage(messageBufferText, last); 921 } 922 } 923 924 hasTextPartial()925 private boolean hasTextPartial() { 926 return textMessageHandler instanceof MessageHandler.Partial<?>; 927 } 928 929 onMessage(CharBuffer buf, boolean last)930 private void onMessage(CharBuffer buf, boolean last) throws IOException { 931 buf.flip(); 932 try { 933 onMessage(buf.toString(), last); 934 } catch (Throwable t) { 935 handleThrowableOnSend(t); 936 } finally { 937 buf.clear(); 938 } 939 } 940 941 updateRawFragments(ByteBuffer buf, boolean last)942 private void updateRawFragments(ByteBuffer buf, boolean last) { 943 if (!last && buf.remaining() > 0) { 944 if (buf == rawFragments) { 945 buf.compact(); 946 } else { 947 if (rawFragments == null || (rawFragments.position() == 0 && maxTextMessageBufferSize != rawFragments.capacity())) { 948 rawFragments = ByteBuffer.allocateDirect(maxTextMessageBufferSize); 949 } 950 rawFragments.put(buf); 951 } 952 } else { 953 if (rawFragments != null) { 954 rawFragments.clear(); 955 } 956 } 957 } 958 959 960 @SuppressWarnings("unchecked") onMessage(String text, boolean last)961 public void onMessage(String text, boolean last) { 962 if (hasTextPartial()) { 963 ((MessageHandler.Partial<String>) textMessageHandler).onMessage(text, last); 964 } else { 965 // Caller ensures last == true if this branch is used 966 ((MessageHandler.Whole<String>) textMessageHandler).onMessage(text); 967 } 968 } 969 970 971 @SuppressWarnings("unchecked") onMessage(ByteBuffer buf, boolean last)972 public void onMessage(ByteBuffer buf, boolean last) 973 throws IOException 974 { 975 if (binaryMessageHandler instanceof MessageHandler.Partial<?>) { 976 ((MessageHandler.Partial<ByteBuffer>) binaryMessageHandler).onMessage(buf, last); 977 } else { 978 if (last && (binaryBuffer == null || binaryBuffer.position() == 0)) { 979 ((MessageHandler.Whole<ByteBuffer>) binaryMessageHandler).onMessage(buf); 980 return; 981 } 982 983 if (binaryBuffer == null || 984 (binaryBuffer.position() == 0 && binaryBuffer.capacity() != maxBinaryMessageBufferSize)) 985 { 986 binaryBuffer = ByteBuffer.allocateDirect(maxBinaryMessageBufferSize); 987 } 988 989 if (binaryBuffer.remaining() < buf.remaining()) { 990 throw new WsIOException(new CloseReason( 991 CloseCodes.TOO_BIG, 992 sm.getString("wsFrame.textMessageTooBig"))); 993 } 994 995 binaryBuffer.put(buf); 996 997 if (last) { 998 binaryBuffer.flip(); 999 try { 1000 ((MessageHandler.Whole<ByteBuffer>) binaryMessageHandler).onMessage(binaryBuffer); 1001 } finally { 1002 binaryBuffer.clear(); 1003 } 1004 } 1005 } 1006 } 1007 1008 handleThrowableOnSend(Throwable t)1009 private void handleThrowableOnSend(Throwable t) throws WsIOException { 1010 ExceptionUtils.handleThrowable(t); 1011 getLocal().onError(this, t); 1012 CloseReason cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, 1013 sm.getString("wsFrame.ioeTriggeredClose")); 1014 throw new WsIOException(cr); 1015 } 1016 1017 getTextMessageHandler()1018 protected MessageHandler getTextMessageHandler() { 1019 return textMessageHandler; 1020 } 1021 1022 getBinaryMessageHandler()1023 protected MessageHandler getBinaryMessageHandler() { 1024 return binaryMessageHandler; 1025 } 1026 1027 getPongMessageHandler()1028 protected MessageHandler.Whole<PongMessage> getPongMessageHandler() { 1029 return pongMessageHandler; 1030 } 1031 1032 updateLastActive()1033 protected void updateLastActive() { 1034 lastActive = System.currentTimeMillis(); 1035 } 1036 1037 checkExpiration()1038 protected void checkExpiration() { 1039 long timeout = maxIdleTimeout; 1040 if (timeout < 1) { 1041 return; 1042 } 1043 1044 if (System.currentTimeMillis() - lastActive > timeout) { 1045 String msg = sm.getString("wsSession.timeout", getId()); 1046 if (log.isDebugEnabled()) { 1047 log.debug(msg); 1048 } 1049 doClose(new CloseReason(CloseCodes.GOING_AWAY, msg), 1050 new CloseReason(CloseCodes.CLOSED_ABNORMALLY, msg)); 1051 } 1052 } 1053 1054 checkState()1055 private void checkState() { 1056 if (state == State.CLOSED) { 1057 /* 1058 * As per RFC 6455, a WebSocket connection is considered to be 1059 * closed once a peer has sent and received a WebSocket close frame. 1060 */ 1061 throw new IllegalStateException(sm.getString("wsSession.closed", id)); 1062 } 1063 } 1064 1065 private enum State { 1066 OPEN, 1067 OUTPUT_CLOSED, 1068 CLOSED 1069 } 1070 } 1071