1*1157Smax.romanov@nginx.com /* 2*1157Smax.romanov@nginx.com * Licensed to the Apache Software Foundation (ASF) under one or more 3*1157Smax.romanov@nginx.com * contributor license agreements. See the NOTICE file distributed with 4*1157Smax.romanov@nginx.com * this work for additional information regarding copyright ownership. 5*1157Smax.romanov@nginx.com * The ASF licenses this file to You under the Apache License, Version 2.0 6*1157Smax.romanov@nginx.com * (the "License"); you may not use this file except in compliance with 7*1157Smax.romanov@nginx.com * the License. You may obtain a copy of the License at 8*1157Smax.romanov@nginx.com * 9*1157Smax.romanov@nginx.com * http://www.apache.org/licenses/LICENSE-2.0 10*1157Smax.romanov@nginx.com * 11*1157Smax.romanov@nginx.com * Unless required by applicable law or agreed to in writing, software 12*1157Smax.romanov@nginx.com * distributed under the License is distributed on an "AS IS" BASIS, 13*1157Smax.romanov@nginx.com * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*1157Smax.romanov@nginx.com * See the License for the specific language governing permissions and 15*1157Smax.romanov@nginx.com * limitations under the License. 16*1157Smax.romanov@nginx.com */ 17*1157Smax.romanov@nginx.com package nginx.unit.websocket.server; 18*1157Smax.romanov@nginx.com 19*1157Smax.romanov@nginx.com import java.util.Comparator; 20*1157Smax.romanov@nginx.com import java.util.Set; 21*1157Smax.romanov@nginx.com import java.util.concurrent.ConcurrentSkipListSet; 22*1157Smax.romanov@nginx.com import java.util.concurrent.atomic.AtomicInteger; 23*1157Smax.romanov@nginx.com 24*1157Smax.romanov@nginx.com import nginx.unit.websocket.BackgroundProcess; 25*1157Smax.romanov@nginx.com import nginx.unit.websocket.BackgroundProcessManager; 26*1157Smax.romanov@nginx.com 27*1157Smax.romanov@nginx.com /** 28*1157Smax.romanov@nginx.com * Provides timeouts for asynchronous web socket writes. On the server side we 29*1157Smax.romanov@nginx.com * only have access to {@link javax.servlet.ServletOutputStream} and 30*1157Smax.romanov@nginx.com * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout 31*1157Smax.romanov@nginx.com * for writes to the client. 32*1157Smax.romanov@nginx.com */ 33*1157Smax.romanov@nginx.com public class WsWriteTimeout implements BackgroundProcess { 34*1157Smax.romanov@nginx.com 35*1157Smax.romanov@nginx.com private final Set<WsRemoteEndpointImplServer> endpoints = 36*1157Smax.romanov@nginx.com new ConcurrentSkipListSet<>(new EndpointComparator()); 37*1157Smax.romanov@nginx.com private final AtomicInteger count = new AtomicInteger(0); 38*1157Smax.romanov@nginx.com private int backgroundProcessCount = 0; 39*1157Smax.romanov@nginx.com private volatile int processPeriod = 1; 40*1157Smax.romanov@nginx.com 41*1157Smax.romanov@nginx.com @Override backgroundProcess()42*1157Smax.romanov@nginx.com public void backgroundProcess() { 43*1157Smax.romanov@nginx.com // This method gets called once a second. 44*1157Smax.romanov@nginx.com backgroundProcessCount ++; 45*1157Smax.romanov@nginx.com 46*1157Smax.romanov@nginx.com if (backgroundProcessCount >= processPeriod) { 47*1157Smax.romanov@nginx.com backgroundProcessCount = 0; 48*1157Smax.romanov@nginx.com 49*1157Smax.romanov@nginx.com long now = System.currentTimeMillis(); 50*1157Smax.romanov@nginx.com for (WsRemoteEndpointImplServer endpoint : endpoints) { 51*1157Smax.romanov@nginx.com if (endpoint.getTimeoutExpiry() < now) { 52*1157Smax.romanov@nginx.com // Background thread, not the thread that triggered the 53*1157Smax.romanov@nginx.com // write so no need to use a dispatch 54*1157Smax.romanov@nginx.com endpoint.onTimeout(false); 55*1157Smax.romanov@nginx.com } else { 56*1157Smax.romanov@nginx.com // Endpoints are ordered by timeout expiry so if this point 57*1157Smax.romanov@nginx.com // is reached there is no need to check the remaining 58*1157Smax.romanov@nginx.com // endpoints 59*1157Smax.romanov@nginx.com break; 60*1157Smax.romanov@nginx.com } 61*1157Smax.romanov@nginx.com } 62*1157Smax.romanov@nginx.com } 63*1157Smax.romanov@nginx.com } 64*1157Smax.romanov@nginx.com 65*1157Smax.romanov@nginx.com 66*1157Smax.romanov@nginx.com @Override setProcessPeriod(int period)67*1157Smax.romanov@nginx.com public void setProcessPeriod(int period) { 68*1157Smax.romanov@nginx.com this.processPeriod = period; 69*1157Smax.romanov@nginx.com } 70*1157Smax.romanov@nginx.com 71*1157Smax.romanov@nginx.com 72*1157Smax.romanov@nginx.com /** 73*1157Smax.romanov@nginx.com * {@inheritDoc} 74*1157Smax.romanov@nginx.com * 75*1157Smax.romanov@nginx.com * The default value is 1 which means asynchronous write timeouts are 76*1157Smax.romanov@nginx.com * processed every 1 second. 77*1157Smax.romanov@nginx.com */ 78*1157Smax.romanov@nginx.com @Override getProcessPeriod()79*1157Smax.romanov@nginx.com public int getProcessPeriod() { 80*1157Smax.romanov@nginx.com return processPeriod; 81*1157Smax.romanov@nginx.com } 82*1157Smax.romanov@nginx.com 83*1157Smax.romanov@nginx.com register(WsRemoteEndpointImplServer endpoint)84*1157Smax.romanov@nginx.com public void register(WsRemoteEndpointImplServer endpoint) { 85*1157Smax.romanov@nginx.com boolean result = endpoints.add(endpoint); 86*1157Smax.romanov@nginx.com if (result) { 87*1157Smax.romanov@nginx.com int newCount = count.incrementAndGet(); 88*1157Smax.romanov@nginx.com if (newCount == 1) { 89*1157Smax.romanov@nginx.com BackgroundProcessManager.getInstance().register(this); 90*1157Smax.romanov@nginx.com } 91*1157Smax.romanov@nginx.com } 92*1157Smax.romanov@nginx.com } 93*1157Smax.romanov@nginx.com 94*1157Smax.romanov@nginx.com unregister(WsRemoteEndpointImplServer endpoint)95*1157Smax.romanov@nginx.com public void unregister(WsRemoteEndpointImplServer endpoint) { 96*1157Smax.romanov@nginx.com boolean result = endpoints.remove(endpoint); 97*1157Smax.romanov@nginx.com if (result) { 98*1157Smax.romanov@nginx.com int newCount = count.decrementAndGet(); 99*1157Smax.romanov@nginx.com if (newCount == 0) { 100*1157Smax.romanov@nginx.com BackgroundProcessManager.getInstance().unregister(this); 101*1157Smax.romanov@nginx.com } 102*1157Smax.romanov@nginx.com } 103*1157Smax.romanov@nginx.com } 104*1157Smax.romanov@nginx.com 105*1157Smax.romanov@nginx.com 106*1157Smax.romanov@nginx.com /** 107*1157Smax.romanov@nginx.com * Note: this comparator imposes orderings that are inconsistent with equals 108*1157Smax.romanov@nginx.com */ 109*1157Smax.romanov@nginx.com private static class EndpointComparator implements 110*1157Smax.romanov@nginx.com Comparator<WsRemoteEndpointImplServer> { 111*1157Smax.romanov@nginx.com 112*1157Smax.romanov@nginx.com @Override compare(WsRemoteEndpointImplServer o1, WsRemoteEndpointImplServer o2)113*1157Smax.romanov@nginx.com public int compare(WsRemoteEndpointImplServer o1, 114*1157Smax.romanov@nginx.com WsRemoteEndpointImplServer o2) { 115*1157Smax.romanov@nginx.com 116*1157Smax.romanov@nginx.com long t1 = o1.getTimeoutExpiry(); 117*1157Smax.romanov@nginx.com long t2 = o2.getTimeoutExpiry(); 118*1157Smax.romanov@nginx.com 119*1157Smax.romanov@nginx.com if (t1 < t2) { 120*1157Smax.romanov@nginx.com return -1; 121*1157Smax.romanov@nginx.com } else if (t1 == t2) { 122*1157Smax.romanov@nginx.com return 0; 123*1157Smax.romanov@nginx.com } else { 124*1157Smax.romanov@nginx.com return 1; 125*1157Smax.romanov@nginx.com } 126*1157Smax.romanov@nginx.com } 127*1157Smax.romanov@nginx.com } 128*1157Smax.romanov@nginx.com } 129