1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package nginx.unit.websocket.server;
18 
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.CompletionHandler;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.TimeUnit;
27 
28 import javax.websocket.SendHandler;
29 import javax.websocket.SendResult;
30 
31 import org.apache.juli.logging.Log;
32 import org.apache.juli.logging.LogFactory;
33 import org.apache.tomcat.util.res.StringManager;
34 import nginx.unit.websocket.Transformation;
35 import nginx.unit.websocket.WsRemoteEndpointImplBase;
36 
37 /**
38  * This is the server side {@link javax.websocket.RemoteEndpoint} implementation
39  * - i.e. what the server uses to send data to the client.
40  */
41 public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
42 
43     private static final StringManager sm =
44             StringManager.getManager(WsRemoteEndpointImplServer.class);
45     private final Log log = LogFactory.getLog(WsRemoteEndpointImplServer.class); // must not be static
46 
47     private volatile SendHandler handler = null;
48     private volatile ByteBuffer[] buffers = null;
49 
50     private volatile long timeoutExpiry = -1;
51     private volatile boolean close;
52 
WsRemoteEndpointImplServer( WsServerContainer serverContainer)53     public WsRemoteEndpointImplServer(
54             WsServerContainer serverContainer) {
55     }
56 
57 
58     @Override
isMasked()59     protected final boolean isMasked() {
60         return false;
61     }
62 
63     @Override
doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, ByteBuffer... buffers)64     protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
65             ByteBuffer... buffers) {
66     }
67 
68     @Override
doClose()69     protected void doClose() {
70         if (handler != null) {
71             // close() can be triggered by a wide range of scenarios. It is far
72             // simpler just to always use a dispatch than it is to try and track
73             // whether or not this method was called by the same thread that
74             // triggered the write
75             clearHandler(new EOFException(), true);
76         }
77     }
78 
79 
getTimeoutExpiry()80     protected long getTimeoutExpiry() {
81         return timeoutExpiry;
82     }
83 
84 
85     /*
86      * Currently this is only called from the background thread so we could just
87      * call clearHandler() with useDispatch == false but the method parameter
88      * was added in case other callers started to use this method to make sure
89      * that those callers think through what the correct value of useDispatch is
90      * for them.
91      */
onTimeout(boolean useDispatch)92     protected void onTimeout(boolean useDispatch) {
93         if (handler != null) {
94             clearHandler(new SocketTimeoutException(), useDispatch);
95         }
96         close();
97     }
98 
99 
100     @Override
setTransformation(Transformation transformation)101     protected void setTransformation(Transformation transformation) {
102         // Overridden purely so it is visible to other classes in this package
103         super.setTransformation(transformation);
104     }
105 
106 
107     /**
108      *
109      * @param t             The throwable associated with any error that
110      *                      occurred
111      * @param useDispatch   Should {@link SendHandler#onResult(SendResult)} be
112      *                      called from a new thread, keeping in mind the
113      *                      requirements of
114      *                      {@link javax.websocket.RemoteEndpoint.Async}
115      */
clearHandler(Throwable t, boolean useDispatch)116     private void clearHandler(Throwable t, boolean useDispatch) {
117         // Setting the result marks this (partial) message as
118         // complete which means the next one may be sent which
119         // could update the value of the handler. Therefore, keep a
120         // local copy before signalling the end of the (partial)
121         // message.
122         SendHandler sh = handler;
123         handler = null;
124         buffers = null;
125         if (sh != null) {
126             if (useDispatch) {
127                 OnResultRunnable r = new OnResultRunnable(sh, t);
128             } else {
129                 if (t == null) {
130                     sh.onResult(new SendResult());
131                 } else {
132                     sh.onResult(new SendResult(t));
133                 }
134             }
135         }
136     }
137 
138 
139     private static class OnResultRunnable implements Runnable {
140 
141         private final SendHandler sh;
142         private final Throwable t;
143 
OnResultRunnable(SendHandler sh, Throwable t)144         private OnResultRunnable(SendHandler sh, Throwable t) {
145             this.sh = sh;
146             this.t = t;
147         }
148 
149         @Override
run()150         public void run() {
151             if (t == null) {
152                 sh.onResult(new SendResult());
153             } else {
154                 sh.onResult(new SendResult(t));
155             }
156         }
157     }
158 }
159