xref: /unit/src/nxt_conn.h (revision 318:c2442f5e054d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_CONN_H_INCLUDED_
8 #define _NXT_CONN_H_INCLUDED_
9 
10 
11 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
12 
13 
14 typedef struct {
15     nxt_work_handler_t            ready_handler;
16     nxt_work_handler_t            close_handler;
17     nxt_work_handler_t            error_handler;
18 
19     nxt_work_handler_t            timer_handler;
20     nxt_conn_timer_value_t        timer_value;
21     uintptr_t                     timer_data;
22 
23     uint8_t                       timer_autoreset;
24 } nxt_conn_state_t;
25 
26 
27 typedef struct {
28     double                        average;
29     size_t                        limit;
30     size_t                        limit_after;
31     size_t                        max_limit;
32     nxt_msec_t                    last;
33 } nxt_event_write_rate_t;
34 
35 
36 typedef struct {
37 
38     nxt_work_handler_t            connect;
39     nxt_work_handler_t            accept;
40 
41     /*
42      * The read() with NULL c->read buffer waits readiness of a connection
43      * to avoid allocation of read buffer if the connection will time out
44      * or will be closed with error.  The kqueue-specific read() can also
45      * detect case if a client did not sent anything and has just closed the
46      * connection without errors.  In the latter case state's close_handler
47      * is called.
48      */
49     nxt_work_handler_t            read;
50 
51     ssize_t                       (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
52 
53     ssize_t                       (*recv)(nxt_conn_t *c, void *buf,
54                                       size_t size, nxt_uint_t flags);
55 
56     /*
57      * The write() is an interface to write a buffer chain with a given rate
58      * limit.  It calls write_chunk() in a loop and handles write event timer.
59      */
60     nxt_work_handler_t            write;
61 
62     /*
63      * The write_chunk() interface writes a buffer chain with a given limit
64      * and toggles write event.  SSL/TLS libraries' write_chunk() interface
65      * buffers data and calls the library specific send() interface to write
66      * the buffered data eventually.
67      */
68     ssize_t                       (*write_chunk)(nxt_conn_t *c,
69                                       nxt_buf_t *b, size_t limit);
70 
71     /*
72      * The sendbuf() is an interface for OS-specific sendfile
73      * implementations or simple writev().
74      */
75     ssize_t                       (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
76                                       size_t limit);
77     /*
78      * The writev() is an interface to write several nxt_iobuf_t buffers.
79      */
80     ssize_t                       (*writev)(nxt_conn_t *c,
81                                       nxt_iobuf_t *iob, nxt_uint_t niob);
82     /*
83      * The send() is an interface to write a single buffer.  SSL/TLS
84      * libraries' send() interface handles also the libraries' errors.
85      */
86     ssize_t                       (*send)(nxt_conn_t *c, void *buf,
87                                       size_t size);
88 
89     nxt_work_handler_t            shutdown;
90 } nxt_conn_io_t;
91 
92 
93 /*
94  * The nxt_listen_event_t is separated from nxt_listen_socket_t
95  * because nxt_listen_socket_t is one per process whilst each worker
96  * thread uses own nxt_listen_event_t.
97  */
98 typedef struct {
99     /* Must be the first field. */
100     nxt_fd_event_t                socket;
101 
102     nxt_task_t                    task;
103 
104     uint32_t                      ready;
105     uint32_t                      batch;
106 
107     /* An accept() interface is cached to minimize memory accesses. */
108     nxt_work_handler_t            accept;
109 
110     nxt_listen_socket_t           *listen;
111     nxt_conn_t                    *next;   /* STUB */
112     nxt_work_queue_t              *work_queue;
113 
114     nxt_timer_t                   timer;
115 
116     nxt_queue_link_t              link;
117 } nxt_listen_event_t;
118 
119 
120 struct nxt_conn_s {
121     /*
122      * Must be the first field, since nxt_fd_event_t
123      * and nxt_conn_t are used interchangeably.
124      */
125     nxt_fd_event_t                socket;
126 
127     nxt_buf_t                     *read;
128     const nxt_conn_state_t        *read_state;
129     nxt_work_queue_t              *read_work_queue;
130     nxt_timer_t                   read_timer;
131 
132     nxt_buf_t                     *write;
133     const nxt_conn_state_t        *write_state;
134     nxt_work_queue_t              *write_work_queue;
135     nxt_event_write_rate_t        *rate;
136     nxt_timer_t                   write_timer;
137 
138     nxt_off_t                     sent;
139     uint32_t                      max_chunk;
140     uint32_t                      nbytes;
141 
142     nxt_conn_io_t                 *io;
143 
144     nxt_queue_t                   requests; /* of nxt_req_conn_link_t */
145 
146     union {
147 #if (NXT_SSLTLS)
148         void                      *ssltls;
149 #endif
150         nxt_thread_pool_t         *thread_pool;
151     } u;
152 
153     nxt_mp_t                      *mem_pool;
154 
155     nxt_task_t                    task;
156     nxt_log_t                     log;
157 
158     nxt_listen_event_t            *listen;
159     nxt_sockaddr_t                *remote;
160     nxt_sockaddr_t                *local;
161     const char                    *action;
162 
163     uint8_t                       peek;
164     uint8_t                       blocked;      /* 1 bit */
165     uint8_t                       delayed;      /* 1 bit */
166 
167 #define NXT_CONN_SENDFILE_OFF     0
168 #define NXT_CONN_SENDFILE_ON      1
169 #define NXT_CONN_SENDFILE_UNSET   3
170 
171     uint8_t                       sendfile;     /* 2 bits */
172     uint8_t                       tcp_nodelay;  /* 1 bit */
173 
174     nxt_queue_link_t              link;
175 };
176 
177 
178 #define nxt_conn_timer_init(ev, c, wq)                                        \
179     do {                                                                      \
180         (ev)->work_queue = (wq);                                              \
181         (ev)->log = &(c)->log;                                                \
182         (ev)->precision = NXT_TIMER_DEFAULT_PRECISION;                        \
183     } while (0)
184 
185 
186 #define nxt_read_timer_conn(ev)                                               \
187     nxt_timer_data(ev, nxt_conn_t, read_timer)
188 
189 
190 #define nxt_write_timer_conn(ev)                                              \
191     nxt_timer_data(ev, nxt_conn_t, write_timer)
192 
193 
194 #if (NXT_HAVE_UNIX_DOMAIN)
195 
196 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
197     do {                                                                      \
198         nxt_int_t  ret;                                                       \
199                                                                               \
200         if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) {                   \
201             ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,    \
202                                         TCP_NODELAY, 1);                      \
203                                                                               \
204             (c)->tcp_nodelay = (ret == NXT_OK);                               \
205         }                                                                     \
206     } while (0)
207 
208 
209 #else
210 
211 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
212     do {                                                                      \
213         nxt_int_t  ret;                                                       \
214                                                                               \
215         ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,        \
216                                     TCP_NODELAY, 1);                          \
217                                                                               \
218         (c)->tcp_nodelay = (ret == NXT_OK);                                   \
219     } while (0)
220 
221 #endif
222 
223 
224 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task);
225 void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
226 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
227 
228 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
229     const nxt_conn_state_t *state, nxt_timer_t *tev);
230 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
231 
232 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
233 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
234 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
235 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
236 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
237 
238 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
239     nxt_listen_socket_t *ls);
240 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
241 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
242     nxt_conn_t *c);
243 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
244     const char *accept_syscall, nxt_err_t err);
245 
246 void nxt_conn_wait(nxt_conn_t *c);
247 
248 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
249 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
250 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
251     nxt_uint_t flags);
252 
253 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
254 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
255 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
256     nxt_iobuf_t *iob, nxt_uint_t niob);
257 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
258     size_t size);
259 
260 size_t nxt_event_conn_write_limit(nxt_conn_t *c);
261 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
262     nxt_conn_t *c, size_t sent);
263 ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b,
264     size_t limit);
265 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
266     nxt_uint_t niob);
267 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
268 
269 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
270     nxt_conn_t *c);
271 
272 
273 #define nxt_conn_connect(engine, c)                                           \
274     nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket,       \
275                        c->socket.task, c, c->socket.data)
276 
277 
278 #define nxt_conn_read(engine, c)                                              \
279     do {                                                                      \
280         nxt_event_engine_t  *e = engine;                                      \
281                                                                               \
282         c->socket.read_work_queue = &e->read_work_queue;                      \
283                                                                               \
284         nxt_work_queue_add(&e->read_work_queue, c->io->read,                  \
285                            c->socket.task, c, c->socket.data);                \
286     } while (0)
287 
288 
289 #define nxt_conn_write(e, c)                                                  \
290     do {                                                                      \
291         nxt_event_engine_t  *engine = e;                                      \
292                                                                               \
293         c->socket.write_work_queue = &engine->write_work_queue;               \
294                                                                               \
295         nxt_work_queue_add(&engine->write_work_queue, c->io->write,           \
296                            c->socket.task, c, c->socket.data);                \
297     } while (0)
298 
299 
300 extern nxt_conn_io_t             nxt_unix_conn_io;
301 
302 
303 typedef struct {
304     /*
305      * Client and peer connections are not embedded because already
306      * existent connections can be switched to the event connection proxy.
307      */
308     nxt_conn_t                   *client;
309     nxt_conn_t                   *peer;
310     nxt_buf_t                    *client_buffer;
311     nxt_buf_t                    *peer_buffer;
312 
313     size_t                       client_buffer_size;
314     size_t                       peer_buffer_size;
315 
316     nxt_msec_t                   client_wait_timeout;
317     nxt_msec_t                   connect_timeout;
318     nxt_msec_t                   reconnect_timeout;
319     nxt_msec_t                   peer_wait_timeout;
320     nxt_msec_t                   client_write_timeout;
321     nxt_msec_t                   peer_write_timeout;
322 
323     uint8_t                      connected;  /* 1 bit */
324     uint8_t                      delayed;    /* 1 bit */
325     uint8_t                      retries;    /* 8 bits */
326     uint8_t                      retain;     /* 2 bits */
327 
328     nxt_work_handler_t           completion_handler;
329 } nxt_conn_proxy_t;
330 
331 
332 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
333 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
334 
335 
336 /* STUB */
337 #define nxt_event_conn_t         nxt_conn_t
338 #define nxt_event_conn_state_t   nxt_conn_state_t
339 #define nxt_event_conn_proxy_t   nxt_conn_proxy_t
340 #define nxt_event_conn_read      nxt_conn_read
341 #define nxt_event_conn_write     nxt_conn_write
342 #define nxt_event_conn_close     nxt_conn_close
343 
344 
345 #endif /* _NXT_CONN_H_INCLUDED_ */
346