xref: /unit/src/nxt_conn.h (revision 2185:2227bdbb3c89)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_CONN_H_INCLUDED_
8 #define _NXT_CONN_H_INCLUDED_
9 
10 
11 typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c);
12 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
13 
14 
15 typedef struct {
16     nxt_work_handler_t            ready_handler;
17     nxt_work_handler_t            close_handler;
18     nxt_work_handler_t            error_handler;
19 
20     nxt_conn_io_read_t            io_read_handler;
21 
22     nxt_work_handler_t            timer_handler;
23     nxt_conn_timer_value_t        timer_value;
24     uintptr_t                     timer_data;
25 
26     uint8_t                       timer_autoreset;
27 } nxt_conn_state_t;
28 
29 
30 typedef struct {
31     double                        average;
32     size_t                        limit;
33     size_t                        limit_after;
34     size_t                        max_limit;
35     nxt_msec_t                    last;
36 } nxt_event_write_rate_t;
37 
38 
39 typedef struct {
40 
41     nxt_work_handler_t            connect;
42     nxt_work_handler_t            accept;
43 
44     /*
45      * The read() with NULL c->read buffer waits readiness of a connection
46      * to avoid allocation of read buffer if the connection will time out
47      * or will be closed with error.  The kqueue-specific read() can also
48      * detect case if a client did not sent anything and has just closed the
49      * connection without errors.  In the latter case state's close_handler
50      * is called.
51      */
52     nxt_work_handler_t            read;
53 
54     ssize_t                       (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
55 
56     ssize_t                       (*recv)(nxt_conn_t *c, void *buf,
57                                       size_t size, nxt_uint_t flags);
58 
59     /* The write() is an interface to write a buffer chain. */
60     nxt_work_handler_t            write;
61 
62     /*
63      * The sendbuf() is an interface for OS-specific sendfile
64      * implementations or simple writev().
65      */
66     ssize_t                       (*sendbuf)(nxt_task_t *task,
67                                        nxt_sendbuf_t *sb);
68     /*
69      * The sendbuf() is an interface for OS-specific sendfile
70      * implementations or simple writev().
71      */
72     ssize_t                       (*old_sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
73                                       size_t limit);
74     /*
75      * The writev() is an interface to write several nxt_iobuf_t buffers.
76      */
77     ssize_t                       (*writev)(nxt_conn_t *c,
78                                       nxt_iobuf_t *iob, nxt_uint_t niob);
79     /*
80      * The send() is an interface to write a single buffer.  SSL/TLS
81      * libraries' send() interface handles also the libraries' errors.
82      */
83     ssize_t                       (*send)(nxt_conn_t *c, void *buf,
84                                       size_t size);
85 
86     nxt_work_handler_t            shutdown;
87 } nxt_conn_io_t;
88 
89 
90 /*
91  * The nxt_listen_event_t is separated from nxt_listen_socket_t
92  * because nxt_listen_socket_t is one per process whilst each worker
93  * thread uses own nxt_listen_event_t.
94  */
95 typedef struct {
96     /* Must be the first field. */
97     nxt_fd_event_t                socket;
98 
99     nxt_task_t                    task;
100 
101     uint32_t                      ready;
102     uint32_t                      batch;
103     uint32_t                      count;
104 
105     /* An accept() interface is cached to minimize memory accesses. */
106     nxt_work_handler_t            accept;
107 
108     nxt_listen_socket_t           *listen;
109     nxt_conn_t                    *next;
110     nxt_work_queue_t              *work_queue;
111 
112     nxt_timer_t                   timer;
113 
114     nxt_queue_link_t              link;
115 } nxt_listen_event_t;
116 
117 
118 struct nxt_conn_s {
119     /*
120      * Must be the first field, since nxt_fd_event_t
121      * and nxt_conn_t are used interchangeably.
122      */
123     nxt_fd_event_t                socket;
124 
125     nxt_buf_t                     *read;
126     const nxt_conn_state_t        *read_state;
127     nxt_work_queue_t              *read_work_queue;
128     nxt_timer_t                   read_timer;
129 
130     nxt_buf_t                     *write;
131     const nxt_conn_state_t        *write_state;
132     nxt_work_queue_t              *write_work_queue;
133     nxt_event_write_rate_t        *rate;
134     nxt_timer_t                   write_timer;
135 
136     nxt_off_t                     sent;
137     uint32_t                      max_chunk;
138     uint32_t                      nbytes;
139 
140     nxt_conn_io_t                 *io;
141 
142     union {
143 #if (NXT_TLS)
144         void                      *tls;
145 #endif
146         nxt_thread_pool_t         *thread_pool;
147     } u;
148 
149     nxt_mp_t                      *mem_pool;
150 
151     nxt_task_t                    task;
152     nxt_log_t                     log;
153 
154     nxt_listen_event_t            *listen;
155 
156     nxt_sockaddr_t                *remote;
157     nxt_sockaddr_t                *local;
158     const char                    *action;
159 
160     uint8_t                       block_read;   /* 1 bit */
161     uint8_t                       block_write;  /* 1 bit */
162     uint8_t                       delayed;      /* 1 bit */
163     uint8_t                       idle;         /* 1 bit */
164 
165 #define NXT_CONN_SENDFILE_OFF     0
166 #define NXT_CONN_SENDFILE_ON      1
167 #define NXT_CONN_SENDFILE_UNSET   3
168 
169     uint8_t                       sendfile;     /* 2 bits */
170     uint8_t                       tcp_nodelay;  /* 1 bit */
171 
172     nxt_queue_link_t              link;
173 };
174 
175 
176 #define nxt_conn_timer_init(ev, c, wq)                                        \
177     do {                                                                      \
178         (ev)->work_queue = (wq);                                              \
179         (ev)->log = &(c)->log;                                                \
180         (ev)->bias = NXT_TIMER_DEFAULT_BIAS;                                  \
181     } while (0)
182 
183 
184 #define nxt_read_timer_conn(ev)                                               \
185     nxt_timer_data(ev, nxt_conn_t, read_timer)
186 
187 
188 #define nxt_write_timer_conn(ev)                                              \
189     nxt_timer_data(ev, nxt_conn_t, write_timer)
190 
191 
192 #if (NXT_HAVE_UNIX_DOMAIN)
193 
194 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
195     do {                                                                      \
196         nxt_int_t  ret;                                                       \
197                                                                               \
198         if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) {                   \
199             ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,    \
200                                         TCP_NODELAY, 1);                      \
201                                                                               \
202             (c)->tcp_nodelay = (ret == NXT_OK);                               \
203         }                                                                     \
204     } while (0)
205 
206 
207 #else
208 
209 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
210     do {                                                                      \
211         nxt_int_t  ret;                                                       \
212                                                                               \
213         ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,        \
214                                     TCP_NODELAY, 1);                          \
215                                                                               \
216         (c)->tcp_nodelay = (ret == NXT_OK);                                   \
217     } while (0)
218 
219 #endif
220 
221 
222 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task);
223 NXT_EXPORT void nxt_conn_free(nxt_task_t *task, nxt_conn_t *c);
224 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
225 
226 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
227     const nxt_conn_state_t *state, nxt_timer_t *tev);
228 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
229 NXT_EXPORT nxt_sockaddr_t *nxt_conn_local_addr(nxt_task_t *task,
230     nxt_conn_t *c);
231 
232 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
233 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
234 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
235 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
236 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
237 
238 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
239     nxt_listen_socket_t *ls);
240 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
241 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
242     nxt_conn_t *c);
243 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
244     const char *accept_syscall, nxt_err_t err);
245 
246 void nxt_conn_wait(nxt_conn_t *c);
247 
248 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
249 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
250 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
251     nxt_uint_t flags);
252 
253 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
254 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
255 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
256     nxt_iobuf_t *iob, nxt_uint_t niob);
257 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
258     size_t size);
259 
260 size_t nxt_event_conn_write_limit(nxt_conn_t *c);
261 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
262     nxt_conn_t *c, size_t sent);
263 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
264     nxt_uint_t niob);
265 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
266 
267 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
268     nxt_conn_t *c);
269 
270 
271 #define nxt_conn_connect(engine, c)                                           \
272     nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket,       \
273                        c->socket.task, c, c->socket.data)
274 
275 
276 #define nxt_conn_read(engine, c)                                              \
277     do {                                                                      \
278         nxt_event_engine_t  *e = engine;                                      \
279                                                                               \
280         c->socket.read_work_queue = &e->read_work_queue;                      \
281                                                                               \
282         nxt_work_queue_add(&e->read_work_queue, c->io->read,                  \
283                            c->socket.task, c, c->socket.data);                \
284     } while (0)
285 
286 
287 #define nxt_conn_write(engine, c)                                             \
288     do {                                                                      \
289         nxt_event_engine_t  *e = engine;                                      \
290                                                                               \
291         c->socket.write_work_queue = &e->write_work_queue;                    \
292                                                                               \
293         nxt_work_queue_add(&e->write_work_queue, c->io->write,                \
294                            c->socket.task, c, c->socket.data);                \
295     } while (0)
296 
297 
298 #define nxt_conn_idle(engine, c)                                              \
299     do {                                                                      \
300         nxt_event_engine_t  *e = engine;                                      \
301                                                                               \
302         nxt_queue_insert_head(&e->idle_connections, &c->link);                \
303                                                                               \
304         c->idle = 1;                                                          \
305         e->idle_conns_cnt++;                                                  \
306     } while (0)
307 
308 
309 #define nxt_conn_active(engine, c)                                            \
310     do {                                                                      \
311         nxt_event_engine_t  *e = engine;                                      \
312                                                                               \
313         nxt_queue_remove(&c->link);                                           \
314                                                                               \
315         c->idle = 0;                                                          \
316         e->idle_conns_cnt--;                                                  \
317     } while (0)
318 
319 
320 extern nxt_conn_io_t             nxt_unix_conn_io;
321 
322 
323 typedef struct {
324     /*
325      * Client and peer connections are not embedded because already
326      * existent connections can be switched to the event connection proxy.
327      */
328     nxt_conn_t                   *client;
329     nxt_conn_t                   *peer;
330     nxt_buf_t                    *client_buffer;
331     nxt_buf_t                    *peer_buffer;
332 
333     size_t                       client_buffer_size;
334     size_t                       peer_buffer_size;
335 
336     nxt_msec_t                   client_wait_timeout;
337     nxt_msec_t                   connect_timeout;
338     nxt_msec_t                   reconnect_timeout;
339     nxt_msec_t                   peer_wait_timeout;
340     nxt_msec_t                   client_write_timeout;
341     nxt_msec_t                   peer_write_timeout;
342 
343     uint8_t                      connected;  /* 1 bit */
344     uint8_t                      delayed;    /* 1 bit */
345     uint8_t                      retries;    /* 8 bits */
346     uint8_t                      retain;     /* 2 bits */
347 
348     nxt_work_handler_t           completion_handler;
349 } nxt_conn_proxy_t;
350 
351 
352 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
353 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
354 
355 
356 /* STUB */
357 #define nxt_event_conn_t         nxt_conn_t
358 #define nxt_event_conn_state_t   nxt_conn_state_t
359 #define nxt_event_conn_proxy_t   nxt_conn_proxy_t
360 #define nxt_event_conn_read      nxt_conn_read
361 #define nxt_event_conn_write     nxt_conn_write
362 #define nxt_event_conn_close     nxt_conn_close
363 
364 
365 #endif /* _NXT_CONN_H_INCLUDED_ */
366