1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17 package nginx.unit.websocket;
18 
19 import java.util.HashSet;
20 import java.util.Set;
21 
22 import org.apache.juli.logging.Log;
23 import org.apache.juli.logging.LogFactory;
24 import org.apache.tomcat.util.ExceptionUtils;
25 import org.apache.tomcat.util.res.StringManager;
26 
27 /**
28  * Provides a background processing mechanism that triggers roughly once a
29  * second. The class maintains a thread that only runs when there is at least
30  * one instance of {@link BackgroundProcess} registered.
31  */
32 public class BackgroundProcessManager {
33 
34     private final Log log =
35             LogFactory.getLog(BackgroundProcessManager.class);
36     private static final StringManager sm =
37             StringManager.getManager(BackgroundProcessManager.class);
38     private static final BackgroundProcessManager instance;
39 
40 
41     static {
42         instance = new BackgroundProcessManager();
43     }
44 
45 
getInstance()46     public static BackgroundProcessManager getInstance() {
47         return instance;
48     }
49 
50     private final Set<BackgroundProcess> processes = new HashSet<>();
51     private final Object processesLock = new Object();
52     private WsBackgroundThread wsBackgroundThread = null;
53 
BackgroundProcessManager()54     private BackgroundProcessManager() {
55         // Hide default constructor
56     }
57 
58 
register(BackgroundProcess process)59     public void register(BackgroundProcess process) {
60         synchronized (processesLock) {
61             if (processes.size() == 0) {
62                 wsBackgroundThread = new WsBackgroundThread(this);
63                 wsBackgroundThread.setContextClassLoader(
64                         this.getClass().getClassLoader());
65                 wsBackgroundThread.setDaemon(true);
66                 wsBackgroundThread.start();
67             }
68             processes.add(process);
69         }
70     }
71 
72 
unregister(BackgroundProcess process)73     public void unregister(BackgroundProcess process) {
74         synchronized (processesLock) {
75             processes.remove(process);
76             if (wsBackgroundThread != null && processes.size() == 0) {
77                 wsBackgroundThread.halt();
78                 wsBackgroundThread = null;
79             }
80         }
81     }
82 
83 
process()84     private void process() {
85         Set<BackgroundProcess> currentProcesses = new HashSet<>();
86         synchronized (processesLock) {
87             currentProcesses.addAll(processes);
88         }
89         for (BackgroundProcess process : currentProcesses) {
90             try {
91                 process.backgroundProcess();
92             } catch (Throwable t) {
93                 ExceptionUtils.handleThrowable(t);
94                 log.error(sm.getString(
95                         "backgroundProcessManager.processFailed"), t);
96             }
97         }
98     }
99 
100 
101     /*
102      * For unit testing.
103      */
getProcessCount()104     int getProcessCount() {
105         synchronized (processesLock) {
106             return processes.size();
107         }
108     }
109 
110 
shutdown()111     void shutdown() {
112         synchronized (processesLock) {
113             processes.clear();
114             if (wsBackgroundThread != null) {
115                 wsBackgroundThread.halt();
116                 wsBackgroundThread = null;
117             }
118         }
119     }
120 
121 
122     private static class WsBackgroundThread extends Thread {
123 
124         private final BackgroundProcessManager manager;
125         private volatile boolean running = true;
126 
WsBackgroundThread(BackgroundProcessManager manager)127         public WsBackgroundThread(BackgroundProcessManager manager) {
128             setName("WebSocket background processing");
129             this.manager = manager;
130         }
131 
132         @Override
run()133         public void run() {
134             while (running) {
135                 try {
136                     Thread.sleep(1000);
137                 } catch (InterruptedException e) {
138                     // Ignore
139                 }
140                 manager.process();
141             }
142         }
143 
halt()144         public void halt() {
145             setName("WebSocket background processing - stopping");
146             running = false;
147         }
148     }
149 }
150