1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #ifndef _NXT_CONN_H_INCLUDED_ 8 #define _NXT_CONN_H_INCLUDED_ 9 10 11 typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); 12 13 14 typedef struct { 15 nxt_work_handler_t ready_handler; 16 nxt_work_handler_t close_handler; 17 nxt_work_handler_t error_handler; 18 19 nxt_work_handler_t timer_handler; 20 nxt_conn_timer_value_t timer_value; 21 uintptr_t timer_data; 22 23 uint8_t timer_autoreset; 24 } nxt_conn_state_t; 25 26 27 typedef struct { 28 double average; 29 size_t limit; 30 size_t limit_after; 31 size_t max_limit; 32 nxt_msec_t last; 33 } nxt_event_write_rate_t; 34 35 36 typedef struct { 37 38 nxt_work_handler_t connect; 39 nxt_work_handler_t accept; 40 41 /* 42 * The read() with NULL c->read buffer waits readiness of a connection 43 * to avoid allocation of read buffer if the connection will time out 44 * or will be closed with error. The kqueue-specific read() can also 45 * detect case if a client did not sent anything and has just closed the 46 * connection without errors. In the latter case state's close_handler 47 * is called. 48 */ 49 nxt_work_handler_t read; 50 51 ssize_t (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b); 52 53 ssize_t (*recv)(nxt_conn_t *c, void *buf, 54 size_t size, nxt_uint_t flags); 55 56 /* 57 * The write() is an interface to write a buffer chain with a given rate 58 * limit. It calls write_chunk() in a loop and handles write event timer. 59 */ 60 nxt_work_handler_t write; 61 62 /* 63 * The write_chunk() interface writes a buffer chain with a given limit 64 * and toggles write event. SSL/TLS libraries' write_chunk() interface 65 * buffers data and calls the library specific send() interface to write 66 * the buffered data eventually. 67 */ 68 ssize_t (*write_chunk)(nxt_conn_t *c, 69 nxt_buf_t *b, size_t limit); 70 71 /* 72 * The sendbuf() is an interface for OS-specific sendfile 73 * implementations or simple writev(). 74 */ 75 ssize_t (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b, 76 size_t limit); 77 /* 78 * The writev() is an interface to write several nxt_iobuf_t buffers. 79 */ 80 ssize_t (*writev)(nxt_conn_t *c, 81 nxt_iobuf_t *iob, nxt_uint_t niob); 82 /* 83 * The send() is an interface to write a single buffer. SSL/TLS 84 * libraries' send() interface handles also the libraries' errors. 85 */ 86 ssize_t (*send)(nxt_conn_t *c, void *buf, 87 size_t size); 88 89 nxt_work_handler_t shutdown; 90 } nxt_conn_io_t; 91 92 93 /* 94 * The nxt_listen_event_t is separated from nxt_listen_socket_t 95 * because nxt_listen_socket_t is one per process whilst each worker 96 * thread uses own nxt_listen_event_t. 97 */ 98 typedef struct { 99 /* Must be the first field. */ 100 nxt_fd_event_t socket; 101 102 nxt_task_t task; 103 104 uint32_t ready; 105 uint32_t batch; 106 107 /* An accept() interface is cached to minimize memory accesses. */ 108 nxt_work_handler_t accept; 109 110 nxt_listen_socket_t *listen; 111 nxt_conn_t *next; /* STUB */ 112 nxt_work_queue_t *work_queue; 113 114 nxt_timer_t timer; 115 116 nxt_queue_link_t link; 117 } nxt_listen_event_t; 118 119 120 struct nxt_conn_s { 121 /* 122 * Must be the first field, since nxt_fd_event_t 123 * and nxt_conn_t are used interchangeably. 124 */ 125 nxt_fd_event_t socket; 126 127 nxt_buf_t *read; 128 const nxt_conn_state_t *read_state; 129 nxt_work_queue_t *read_work_queue; 130 nxt_timer_t read_timer; 131 132 nxt_buf_t *write; 133 const nxt_conn_state_t *write_state; 134 nxt_work_queue_t *write_work_queue; 135 nxt_event_write_rate_t *rate; 136 nxt_timer_t write_timer; 137 138 nxt_off_t sent; 139 uint32_t max_chunk; 140 uint32_t nbytes; 141 142 nxt_conn_io_t *io; 143 144 nxt_queue_t requests; /* of nxt_req_conn_link_t */ 145 146 #if (NXT_SSLTLS || NXT_THREADS) 147 /* SunC does not support "zero-sized struct/union". */ 148 149 union { 150 #if (NXT_SSLTLS) 151 void *ssltls; 152 #endif 153 #if (NXT_THREADS) 154 nxt_thread_pool_t *thread_pool; 155 #endif 156 } u; 157 158 #endif 159 160 nxt_mp_t *mem_pool; 161 162 nxt_task_t task; 163 nxt_log_t log; 164 165 nxt_listen_event_t *listen; 166 nxt_sockaddr_t *remote; 167 nxt_sockaddr_t *local; 168 const char *action; 169 170 uint8_t peek; 171 uint8_t blocked; /* 1 bit */ 172 uint8_t delayed; /* 1 bit */ 173 174 #define NXT_CONN_SENDFILE_OFF 0 175 #define NXT_CONN_SENDFILE_ON 1 176 #define NXT_CONN_SENDFILE_UNSET 3 177 178 uint8_t sendfile; /* 2 bits */ 179 uint8_t tcp_nodelay; /* 1 bit */ 180 181 nxt_queue_link_t link; 182 }; 183 184 185 typedef uint32_t nxt_req_id_t; 186 187 typedef struct { 188 nxt_req_id_t req_id; 189 nxt_conn_t *conn; 190 nxt_port_t *app_port; 191 nxt_port_t *reply_port; 192 193 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 194 nxt_queue_link_t app_link; /* for nxt_app_t.requests */ 195 } nxt_req_conn_link_t; 196 197 198 #define nxt_conn_timer_init(ev, c, wq) \ 199 do { \ 200 (ev)->work_queue = (wq); \ 201 (ev)->log = &(c)->log; \ 202 (ev)->precision = NXT_TIMER_DEFAULT_PRECISION; \ 203 } while (0) 204 205 206 #define nxt_read_timer_conn(ev) \ 207 nxt_timer_data(ev, nxt_conn_t, read_timer) 208 209 210 #define nxt_write_timer_conn(ev) \ 211 nxt_timer_data(ev, nxt_conn_t, write_timer) 212 213 214 #if (NXT_HAVE_UNIX_DOMAIN) 215 216 #define nxt_conn_tcp_nodelay_on(task, c) \ 217 do { \ 218 nxt_int_t ret; \ 219 \ 220 if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ 221 ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ 222 TCP_NODELAY, 1); \ 223 \ 224 (c)->tcp_nodelay = (ret == NXT_OK); \ 225 } \ 226 } while (0) 227 228 229 #else 230 231 #define nxt_conn_tcp_nodelay_on(task, c) \ 232 do { \ 233 nxt_int_t ret; \ 234 \ 235 ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ 236 TCP_NODELAY, 1); \ 237 \ 238 (c)->tcp_nodelay = (ret == NXT_OK); \ 239 } while (0) 240 241 #endif 242 243 244 NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task); 245 void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); 246 NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c); 247 248 NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, 249 const nxt_conn_state_t *state, nxt_timer_t *tev); 250 NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq); 251 252 void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data); 253 void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data); 254 nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c); 255 void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data); 256 void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data); 257 258 NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task, 259 nxt_listen_socket_t *ls); 260 void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data); 261 NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, 262 nxt_conn_t *c); 263 void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 264 const char *accept_syscall, nxt_err_t err); 265 266 void nxt_conn_wait(nxt_conn_t *c); 267 268 void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data); 269 ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); 270 ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, 271 nxt_uint_t flags); 272 273 void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); 274 ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); 275 ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, 276 nxt_iobuf_t *iob, nxt_uint_t niob); 277 ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, 278 size_t size); 279 280 size_t nxt_event_conn_write_limit(nxt_conn_t *c); 281 nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, 282 nxt_conn_t *c, size_t sent); 283 ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, 284 size_t limit); 285 ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, 286 nxt_uint_t niob); 287 ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size); 288 289 NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, 290 nxt_conn_t *c); 291 292 293 #define nxt_conn_connect(engine, c) \ 294 nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket, \ 295 c->socket.task, c, c->socket.data) 296 297 298 #define nxt_conn_read(engine, c) \ 299 do { \ 300 nxt_event_engine_t *e = engine; \ 301 \ 302 c->socket.read_work_queue = &e->read_work_queue; \ 303 \ 304 nxt_work_queue_add(&e->read_work_queue, c->io->read, \ 305 c->socket.task, c, c->socket.data); \ 306 } while (0) 307 308 309 #define nxt_conn_write(e, c) \ 310 do { \ 311 nxt_event_engine_t *engine = e; \ 312 \ 313 c->socket.write_work_queue = &engine->write_work_queue; \ 314 \ 315 nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ 316 c->socket.task, c, c->socket.data); \ 317 } while (0) 318 319 320 extern nxt_conn_io_t nxt_unix_conn_io; 321 322 323 typedef struct { 324 /* 325 * Client and peer connections are not embedded because already 326 * existent connections can be switched to the event connection proxy. 327 */ 328 nxt_conn_t *client; 329 nxt_conn_t *peer; 330 nxt_buf_t *client_buffer; 331 nxt_buf_t *peer_buffer; 332 333 size_t client_buffer_size; 334 size_t peer_buffer_size; 335 336 nxt_msec_t client_wait_timeout; 337 nxt_msec_t connect_timeout; 338 nxt_msec_t reconnect_timeout; 339 nxt_msec_t peer_wait_timeout; 340 nxt_msec_t client_write_timeout; 341 nxt_msec_t peer_write_timeout; 342 343 uint8_t connected; /* 1 bit */ 344 uint8_t delayed; /* 1 bit */ 345 uint8_t retries; /* 8 bits */ 346 uint8_t retain; /* 2 bits */ 347 348 nxt_work_handler_t completion_handler; 349 } nxt_conn_proxy_t; 350 351 352 NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c); 353 NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); 354 355 356 /* STUB */ 357 #define nxt_event_conn_t nxt_conn_t 358 #define nxt_event_conn_state_t nxt_conn_state_t 359 #define nxt_event_conn_proxy_t nxt_conn_proxy_t 360 #define nxt_event_conn_read nxt_conn_read 361 #define nxt_event_conn_write nxt_conn_write 362 #define nxt_event_conn_close nxt_conn_close 363 364 365 NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c, 366 nxt_req_id_t req_id); 367 NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c, 368 nxt_req_conn_link_t *rc); 369 370 371 #endif /* _NXT_CONN_H_INCLUDED_ */ 372