xref: /unit/src/nxt_conn.h (revision 141:96a65c601420)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_CONN_H_INCLUDED_
8 #define _NXT_CONN_H_INCLUDED_
9 
10 
11 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
12 
13 
14 typedef struct {
15     nxt_work_handler_t            ready_handler;
16     nxt_work_handler_t            close_handler;
17     nxt_work_handler_t            error_handler;
18 
19     nxt_work_handler_t            timer_handler;
20     nxt_conn_timer_value_t        timer_value;
21     uintptr_t                     timer_data;
22 
23     uint8_t                       timer_autoreset;
24 } nxt_conn_state_t;
25 
26 
27 typedef struct {
28     double                        average;
29     size_t                        limit;
30     size_t                        limit_after;
31     size_t                        max_limit;
32     nxt_msec_t                    last;
33 } nxt_event_write_rate_t;
34 
35 
36 typedef struct {
37 
38     nxt_work_handler_t            connect;
39     nxt_work_handler_t            accept;
40 
41     /*
42      * The read() with NULL c->read buffer waits readiness of a connection
43      * to avoid allocation of read buffer if the connection will time out
44      * or will be closed with error.  The kqueue-specific read() can also
45      * detect case if a client did not sent anything and has just closed the
46      * connection without errors.  In the latter case state's close_handler
47      * is called.
48      */
49     nxt_work_handler_t            read;
50 
51     ssize_t                       (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
52 
53     ssize_t                       (*recv)(nxt_conn_t *c, void *buf,
54                                       size_t size, nxt_uint_t flags);
55 
56     /*
57      * The write() is an interface to write a buffer chain with a given rate
58      * limit.  It calls write_chunk() in a loop and handles write event timer.
59      */
60     nxt_work_handler_t            write;
61 
62     /*
63      * The write_chunk() interface writes a buffer chain with a given limit
64      * and toggles write event.  SSL/TLS libraries' write_chunk() interface
65      * buffers data and calls the library specific send() interface to write
66      * the buffered data eventually.
67      */
68     ssize_t                       (*write_chunk)(nxt_conn_t *c,
69                                       nxt_buf_t *b, size_t limit);
70 
71     /*
72      * The sendbuf() is an interface for OS-specific sendfile
73      * implementations or simple writev().
74      */
75     ssize_t                       (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
76                                       size_t limit);
77     /*
78      * The writev() is an interface to write several nxt_iobuf_t buffers.
79      */
80     ssize_t                       (*writev)(nxt_conn_t *c,
81                                       nxt_iobuf_t *iob, nxt_uint_t niob);
82     /*
83      * The send() is an interface to write a single buffer.  SSL/TLS
84      * libraries' send() interface handles also the libraries' errors.
85      */
86     ssize_t                       (*send)(nxt_conn_t *c, void *buf,
87                                       size_t size);
88 
89     nxt_work_handler_t            shutdown;
90 } nxt_conn_io_t;
91 
92 
93 /*
94  * The nxt_listen_event_t is separated from nxt_listen_socket_t
95  * because nxt_listen_socket_t is one per process whilst each worker
96  * thread uses own nxt_listen_event_t.
97  */
98 typedef struct {
99     /* Must be the first field. */
100     nxt_fd_event_t                socket;
101 
102     nxt_task_t                    task;
103 
104     uint32_t                      ready;
105     uint32_t                      batch;
106 
107     /* An accept() interface is cached to minimize memory accesses. */
108     nxt_work_handler_t            accept;
109 
110     nxt_listen_socket_t           *listen;
111     nxt_conn_t                    *next;   /* STUB */
112     nxt_work_queue_t              *work_queue;
113 
114     nxt_timer_t                   timer;
115 
116     nxt_queue_link_t              link;
117 } nxt_listen_event_t;
118 
119 
120 struct nxt_conn_s {
121     /*
122      * Must be the first field, since nxt_fd_event_t
123      * and nxt_conn_t are used interchangeably.
124      */
125     nxt_fd_event_t                socket;
126 
127     nxt_buf_t                     *read;
128     const nxt_conn_state_t        *read_state;
129     nxt_work_queue_t              *read_work_queue;
130     nxt_timer_t                   read_timer;
131 
132     nxt_buf_t                     *write;
133     const nxt_conn_state_t        *write_state;
134     nxt_work_queue_t              *write_work_queue;
135     nxt_event_write_rate_t        *rate;
136     nxt_timer_t                   write_timer;
137 
138     nxt_off_t                     sent;
139     uint32_t                      max_chunk;
140     uint32_t                      nbytes;
141 
142     nxt_conn_io_t                 *io;
143 
144     nxt_queue_t                   requests; /* of nxt_req_conn_link_t */
145 
146 #if (NXT_SSLTLS || NXT_THREADS)
147     /* SunC does not support "zero-sized struct/union". */
148 
149     union {
150 #if (NXT_SSLTLS)
151         void                      *ssltls;
152 #endif
153 #if (NXT_THREADS)
154         nxt_thread_pool_t         *thread_pool;
155 #endif
156     } u;
157 
158 #endif
159 
160     nxt_mp_t                      *mem_pool;
161 
162     nxt_task_t                    task;
163     nxt_log_t                     log;
164 
165     nxt_listen_event_t            *listen;
166     nxt_sockaddr_t                *remote;
167     nxt_sockaddr_t                *local;
168     const char                    *action;
169 
170     uint8_t                       peek;
171     uint8_t                       blocked;      /* 1 bit */
172     uint8_t                       delayed;      /* 1 bit */
173 
174 #define NXT_CONN_SENDFILE_OFF     0
175 #define NXT_CONN_SENDFILE_ON      1
176 #define NXT_CONN_SENDFILE_UNSET   3
177 
178     uint8_t                       sendfile;     /* 2 bits */
179     uint8_t                       tcp_nodelay;  /* 1 bit */
180 
181     nxt_queue_link_t              link;
182 };
183 
184 
185 typedef uint32_t nxt_req_id_t;
186 
187 typedef struct {
188     nxt_req_id_t      req_id;
189     nxt_conn_t        *conn;
190     nxt_port_t        *app_port;
191     nxt_port_t        *reply_port;
192 
193     nxt_queue_link_t  link;     /* for nxt_conn_t.requests */
194     nxt_queue_link_t  app_link; /* for nxt_app_t.requests */
195 } nxt_req_conn_link_t;
196 
197 
198 #define nxt_conn_timer_init(ev, c, wq)                                        \
199     do {                                                                      \
200         (ev)->work_queue = (wq);                                              \
201         (ev)->log = &(c)->log;                                                \
202         (ev)->precision = NXT_TIMER_DEFAULT_PRECISION;                        \
203     } while (0)
204 
205 
206 #define nxt_read_timer_conn(ev)                                               \
207     nxt_timer_data(ev, nxt_conn_t, read_timer)
208 
209 
210 #define nxt_write_timer_conn(ev)                                              \
211     nxt_timer_data(ev, nxt_conn_t, write_timer)
212 
213 
214 #if (NXT_HAVE_UNIX_DOMAIN)
215 
216 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
217     do {                                                                      \
218         nxt_int_t  ret;                                                       \
219                                                                               \
220         if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) {                   \
221             ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,    \
222                                         TCP_NODELAY, 1);                      \
223                                                                               \
224             (c)->tcp_nodelay = (ret == NXT_OK);                               \
225         }                                                                     \
226     } while (0)
227 
228 
229 #else
230 
231 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
232     do {                                                                      \
233         nxt_int_t  ret;                                                       \
234                                                                               \
235         ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,        \
236                                     TCP_NODELAY, 1);                          \
237                                                                               \
238         (c)->tcp_nodelay = (ret == NXT_OK);                                   \
239     } while (0)
240 
241 #endif
242 
243 
244 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task);
245 void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
246 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
247 
248 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
249     const nxt_conn_state_t *state, nxt_timer_t *tev);
250 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
251 
252 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
253 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
254 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
255 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
256 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
257 
258 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
259     nxt_listen_socket_t *ls);
260 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
261 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
262     nxt_conn_t *c);
263 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
264     const char *accept_syscall, nxt_err_t err);
265 
266 void nxt_conn_wait(nxt_conn_t *c);
267 
268 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
269 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
270 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
271     nxt_uint_t flags);
272 
273 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
274 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
275 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
276     nxt_iobuf_t *iob, nxt_uint_t niob);
277 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
278     size_t size);
279 
280 size_t nxt_event_conn_write_limit(nxt_conn_t *c);
281 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
282     nxt_conn_t *c, size_t sent);
283 ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b,
284     size_t limit);
285 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
286     nxt_uint_t niob);
287 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
288 
289 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
290     nxt_conn_t *c);
291 
292 
293 #define nxt_conn_connect(engine, c)                                           \
294     nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket,       \
295                        c->socket.task, c, c->socket.data)
296 
297 
298 #define nxt_conn_read(engine, c)                                              \
299     do {                                                                      \
300         nxt_event_engine_t  *e = engine;                                      \
301                                                                               \
302         c->socket.read_work_queue = &e->read_work_queue;                      \
303                                                                               \
304         nxt_work_queue_add(&e->read_work_queue, c->io->read,                  \
305                            c->socket.task, c, c->socket.data);                \
306     } while (0)
307 
308 
309 #define nxt_conn_write(e, c)                                                  \
310     do {                                                                      \
311         nxt_event_engine_t  *engine = e;                                      \
312                                                                               \
313         c->socket.write_work_queue = &engine->write_work_queue;               \
314                                                                               \
315         nxt_work_queue_add(&engine->write_work_queue, c->io->write,           \
316                            c->socket.task, c, c->socket.data);                \
317     } while (0)
318 
319 
320 extern nxt_conn_io_t             nxt_unix_conn_io;
321 
322 
323 typedef struct {
324     /*
325      * Client and peer connections are not embedded because already
326      * existent connections can be switched to the event connection proxy.
327      */
328     nxt_conn_t                   *client;
329     nxt_conn_t                   *peer;
330     nxt_buf_t                    *client_buffer;
331     nxt_buf_t                    *peer_buffer;
332 
333     size_t                       client_buffer_size;
334     size_t                       peer_buffer_size;
335 
336     nxt_msec_t                   client_wait_timeout;
337     nxt_msec_t                   connect_timeout;
338     nxt_msec_t                   reconnect_timeout;
339     nxt_msec_t                   peer_wait_timeout;
340     nxt_msec_t                   client_write_timeout;
341     nxt_msec_t                   peer_write_timeout;
342 
343     uint8_t                      connected;  /* 1 bit */
344     uint8_t                      delayed;    /* 1 bit */
345     uint8_t                      retries;    /* 8 bits */
346     uint8_t                      retain;     /* 2 bits */
347 
348     nxt_work_handler_t           completion_handler;
349 } nxt_conn_proxy_t;
350 
351 
352 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
353 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
354 
355 
356 /* STUB */
357 #define nxt_event_conn_t         nxt_conn_t
358 #define nxt_event_conn_state_t   nxt_conn_state_t
359 #define nxt_event_conn_proxy_t   nxt_conn_proxy_t
360 #define nxt_event_conn_read      nxt_conn_read
361 #define nxt_event_conn_write     nxt_conn_write
362 #define nxt_event_conn_close     nxt_conn_close
363 
364 
365 NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c,
366     nxt_req_id_t req_id);
367 NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c,
368     nxt_req_conn_link_t *rc);
369 
370 
371 #endif /* _NXT_CONN_H_INCLUDED_ */
372