xref: /unit/src/nxt_conn.h (revision 811:d0d9acf87625)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_CONN_H_INCLUDED_
8 #define _NXT_CONN_H_INCLUDED_
9 
10 
11 typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c);
12 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
13 
14 
15 typedef struct {
16     nxt_work_handler_t            ready_handler;
17     nxt_work_handler_t            close_handler;
18     nxt_work_handler_t            error_handler;
19 
20     nxt_conn_io_read_t            io_read_handler;
21 
22     nxt_work_handler_t            timer_handler;
23     nxt_conn_timer_value_t        timer_value;
24     uintptr_t                     timer_data;
25 
26     uint8_t                       timer_autoreset;
27 } nxt_conn_state_t;
28 
29 
30 typedef struct {
31     double                        average;
32     size_t                        limit;
33     size_t                        limit_after;
34     size_t                        max_limit;
35     nxt_msec_t                    last;
36 } nxt_event_write_rate_t;
37 
38 
39 typedef struct {
40 
41     nxt_work_handler_t            connect;
42     nxt_work_handler_t            accept;
43 
44     /*
45      * The read() with NULL c->read buffer waits readiness of a connection
46      * to avoid allocation of read buffer if the connection will time out
47      * or will be closed with error.  The kqueue-specific read() can also
48      * detect case if a client did not sent anything and has just closed the
49      * connection without errors.  In the latter case state's close_handler
50      * is called.
51      */
52     nxt_work_handler_t            read;
53 
54     ssize_t                       (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
55 
56     ssize_t                       (*recv)(nxt_conn_t *c, void *buf,
57                                       size_t size, nxt_uint_t flags);
58 
59     /* The write() is an interface to write a buffer chain. */
60     nxt_work_handler_t            write;
61 
62     /*
63      * The sendbuf() is an interface for OS-specific sendfile
64      * implementations or simple writev().
65      */
66     ssize_t                       (*sendbuf)(nxt_task_t *task,
67                                        nxt_sendbuf_t *sb);
68     /*
69      * The sendbuf() is an interface for OS-specific sendfile
70      * implementations or simple writev().
71      */
72     ssize_t                       (*old_sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
73                                       size_t limit);
74     /*
75      * The writev() is an interface to write several nxt_iobuf_t buffers.
76      */
77     ssize_t                       (*writev)(nxt_conn_t *c,
78                                       nxt_iobuf_t *iob, nxt_uint_t niob);
79     /*
80      * The send() is an interface to write a single buffer.  SSL/TLS
81      * libraries' send() interface handles also the libraries' errors.
82      */
83     ssize_t                       (*send)(nxt_conn_t *c, void *buf,
84                                       size_t size);
85 
86     nxt_work_handler_t            shutdown;
87 } nxt_conn_io_t;
88 
89 
90 /*
91  * The nxt_listen_event_t is separated from nxt_listen_socket_t
92  * because nxt_listen_socket_t is one per process whilst each worker
93  * thread uses own nxt_listen_event_t.
94  */
95 typedef struct {
96     /* Must be the first field. */
97     nxt_fd_event_t                socket;
98 
99     nxt_task_t                    task;
100 
101     uint32_t                      ready;
102     uint32_t                      batch;
103     uint32_t                      count;
104 
105     /* An accept() interface is cached to minimize memory accesses. */
106     nxt_work_handler_t            accept;
107 
108     nxt_listen_socket_t           *listen;
109     nxt_conn_t                    *next;   /* STUB */
110     nxt_work_queue_t              *work_queue;
111 
112     nxt_timer_t                   timer;
113 
114     nxt_queue_link_t              link;
115 } nxt_listen_event_t;
116 
117 
118 struct nxt_conn_s {
119     /*
120      * Must be the first field, since nxt_fd_event_t
121      * and nxt_conn_t are used interchangeably.
122      */
123     nxt_fd_event_t                socket;
124 
125     nxt_buf_t                     *read;
126     const nxt_conn_state_t        *read_state;
127     nxt_work_queue_t              *read_work_queue;
128     nxt_timer_t                   read_timer;
129 
130     nxt_buf_t                     *write;
131     const nxt_conn_state_t        *write_state;
132     nxt_work_queue_t              *write_work_queue;
133     nxt_event_write_rate_t        *rate;
134     nxt_timer_t                   write_timer;
135 
136     nxt_off_t                     sent;
137     uint32_t                      max_chunk;
138     uint32_t                      nbytes;
139 
140     nxt_conn_io_t                 *io;
141 
142     union {
143 #if (NXT_TLS)
144         void                      *tls;
145 #endif
146         nxt_thread_pool_t         *thread_pool;
147     } u;
148 
149     nxt_mp_t                      *mem_pool;
150 
151     nxt_task_t                    task;
152     nxt_log_t                     log;
153 
154     nxt_listen_event_t            *listen;
155 
156     nxt_sockaddr_t                *remote;
157     nxt_sockaddr_t                *local;
158     const char                    *action;
159 
160     uint8_t                       blocked;      /* 1 bit */
161     uint8_t                       delayed;      /* 1 bit */
162 
163 #define NXT_CONN_SENDFILE_OFF     0
164 #define NXT_CONN_SENDFILE_ON      1
165 #define NXT_CONN_SENDFILE_UNSET   3
166 
167     uint8_t                       sendfile;     /* 2 bits */
168     uint8_t                       tcp_nodelay;  /* 1 bit */
169 
170     nxt_queue_link_t              link;
171 };
172 
173 
174 #define nxt_conn_timer_init(ev, c, wq)                                        \
175     do {                                                                      \
176         (ev)->work_queue = (wq);                                              \
177         (ev)->log = &(c)->log;                                                \
178         (ev)->bias = NXT_TIMER_DEFAULT_BIAS;                                  \
179     } while (0)
180 
181 
182 #define nxt_read_timer_conn(ev)                                               \
183     nxt_timer_data(ev, nxt_conn_t, read_timer)
184 
185 
186 #define nxt_write_timer_conn(ev)                                              \
187     nxt_timer_data(ev, nxt_conn_t, write_timer)
188 
189 
190 #if (NXT_HAVE_UNIX_DOMAIN)
191 
192 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
193     do {                                                                      \
194         nxt_int_t  ret;                                                       \
195                                                                               \
196         if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) {                   \
197             ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,    \
198                                         TCP_NODELAY, 1);                      \
199                                                                               \
200             (c)->tcp_nodelay = (ret == NXT_OK);                               \
201         }                                                                     \
202     } while (0)
203 
204 
205 #else
206 
207 #define nxt_conn_tcp_nodelay_on(task, c)                                      \
208     do {                                                                      \
209         nxt_int_t  ret;                                                       \
210                                                                               \
211         ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP,        \
212                                     TCP_NODELAY, 1);                          \
213                                                                               \
214         (c)->tcp_nodelay = (ret == NXT_OK);                                   \
215     } while (0)
216 
217 #endif
218 
219 
220 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task);
221 NXT_EXPORT void nxt_conn_free(nxt_task_t *task, nxt_conn_t *c);
222 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
223 
224 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
225     const nxt_conn_state_t *state, nxt_timer_t *tev);
226 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
227 NXT_EXPORT nxt_sockaddr_t *nxt_conn_local_addr(nxt_task_t *task,
228     nxt_conn_t *c);
229 
230 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
231 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
232 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
233 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
234 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
235 
236 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
237     nxt_listen_socket_t *ls);
238 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
239 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
240     nxt_conn_t *c);
241 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
242     const char *accept_syscall, nxt_err_t err);
243 
244 void nxt_conn_wait(nxt_conn_t *c);
245 
246 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
247 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
248 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
249     nxt_uint_t flags);
250 
251 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
252 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
253 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
254     nxt_iobuf_t *iob, nxt_uint_t niob);
255 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
256     size_t size);
257 
258 size_t nxt_event_conn_write_limit(nxt_conn_t *c);
259 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
260     nxt_conn_t *c, size_t sent);
261 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
262     nxt_uint_t niob);
263 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
264 
265 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
266     nxt_conn_t *c);
267 
268 
269 #define nxt_conn_connect(engine, c)                                           \
270     nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket,       \
271                        c->socket.task, c, c->socket.data)
272 
273 
274 #define nxt_conn_read(engine, c)                                              \
275     do {                                                                      \
276         nxt_event_engine_t  *e = engine;                                      \
277                                                                               \
278         c->socket.read_work_queue = &e->read_work_queue;                      \
279                                                                               \
280         nxt_work_queue_add(&e->read_work_queue, c->io->read,                  \
281                            c->socket.task, c, c->socket.data);                \
282     } while (0)
283 
284 
285 #define nxt_conn_write(engine, c)                                             \
286     do {                                                                      \
287         nxt_event_engine_t  *e = engine;                                      \
288                                                                               \
289         c->socket.write_work_queue = &e->write_work_queue;                    \
290                                                                               \
291         nxt_work_queue_add(&e->write_work_queue, c->io->write,                \
292                            c->socket.task, c, c->socket.data);                \
293     } while (0)
294 
295 
296 extern nxt_conn_io_t             nxt_unix_conn_io;
297 
298 
299 typedef struct {
300     /*
301      * Client and peer connections are not embedded because already
302      * existent connections can be switched to the event connection proxy.
303      */
304     nxt_conn_t                   *client;
305     nxt_conn_t                   *peer;
306     nxt_buf_t                    *client_buffer;
307     nxt_buf_t                    *peer_buffer;
308 
309     size_t                       client_buffer_size;
310     size_t                       peer_buffer_size;
311 
312     nxt_msec_t                   client_wait_timeout;
313     nxt_msec_t                   connect_timeout;
314     nxt_msec_t                   reconnect_timeout;
315     nxt_msec_t                   peer_wait_timeout;
316     nxt_msec_t                   client_write_timeout;
317     nxt_msec_t                   peer_write_timeout;
318 
319     uint8_t                      connected;  /* 1 bit */
320     uint8_t                      delayed;    /* 1 bit */
321     uint8_t                      retries;    /* 8 bits */
322     uint8_t                      retain;     /* 2 bits */
323 
324     nxt_work_handler_t           completion_handler;
325 } nxt_conn_proxy_t;
326 
327 
328 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
329 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
330 
331 
332 /* STUB */
333 #define nxt_event_conn_t         nxt_conn_t
334 #define nxt_event_conn_state_t   nxt_conn_state_t
335 #define nxt_event_conn_proxy_t   nxt_conn_proxy_t
336 #define nxt_event_conn_read      nxt_conn_read
337 #define nxt_event_conn_write     nxt_conn_write
338 #define nxt_event_conn_close     nxt_conn_close
339 
340 
341 #endif /* _NXT_CONN_H_INCLUDED_ */
342