1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, 11 void *data); 12 13 14 void 15 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) 16 { 17 ssize_t ret; 18 nxt_buf_t *b; 19 nxt_conn_t *c; 20 nxt_sendbuf_t sb; 21 nxt_event_engine_t *engine; 22 23 c = obj; 24 25 nxt_debug(task, "conn write fd:%d", c->socket.fd); 26 27 if (c->socket.error != 0) { 28 goto error; 29 } 30 31 if (!c->socket.write_ready || c->write == NULL) { 32 return; 33 } 34 35 engine = task->thread->engine; 36 37 c->socket.write_handler = nxt_conn_io_write; 38 c->socket.error_handler = c->write_state->error_handler; 39 40 b = c->write; 41 42 sb.socket = c->socket.fd; 43 sb.error = 0; 44 sb.sent = 0; 45 sb.size = 0; 46 sb.buf = b; 47 sb.limit = 10 * 1024 * 1024; 48 sb.ready = 1; 49 sb.sync = 0; 50 51 do { 52 ret = nxt_conn_io_sendbuf(task, &sb); 53 54 c->socket.write_ready = sb.ready; 55 c->socket.error = sb.error; 56 57 if (ret < 0) { 58 /* ret == NXT_AGAIN || ret == NXT_ERROR. */ 59 break; 60 } 61 62 sb.sent += ret; 63 sb.limit -= ret; 64 65 b = nxt_sendbuf_update(b, ret); 66 67 if (b == NULL) { 68 nxt_fd_event_block_write(engine, &c->socket); 69 break; 70 } 71 72 sb.buf = b; 73 74 if (!c->socket.write_ready) { 75 ret = NXT_AGAIN; 76 break; 77 } 78 79 } while (sb.limit != 0); 80 81 nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent); 82 83 if (sb.sent != 0) { 84 if (c->write_state->timer_autoreset) { 85 nxt_timer_disable(engine, &c->write_timer); 86 } 87 } 88 89 if (ret != NXT_ERROR) { 90 91 if (sb.limit == 0) { 92 /* 93 * Postpone writing until next event poll to allow to 94 * process other recevied events and to get new events. 95 */ 96 c->write_timer.handler = nxt_conn_write_timer_handler; 97 nxt_timer_add(engine, &c->write_timer, 0); 98 99 } else if (ret == NXT_AGAIN) { 100 /* 101 * SSL libraries can require to toggle either write or read 102 * event if renegotiation occurs during SSL write operation. 103 * This case is handled on the event_io->send() level. Timer 104 * can be set here because it should be set only for write 105 * direction. 106 */ 107 nxt_conn_timer(engine, c, c->write_state, &c->write_timer); 108 109 if (nxt_fd_event_is_disabled(c->socket.write)) { 110 nxt_fd_event_enable_write(engine, &c->socket); 111 } 112 } 113 } 114 115 if (ret == 0 || sb.sent != 0) { 116 /* 117 * ret == 0 means a sync buffer was processed. 118 * ret == NXT_ERROR is ignored here if some data was sent, 119 * the error will be handled on the next nxt_conn_write() call. 120 */ 121 c->sent += sb.sent; 122 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, 123 task, c, data); 124 return; 125 } 126 127 /* ret == NXT_ERROR */ 128 129 nxt_fd_event_block_write(engine, &c->socket); 130 131 error: 132 133 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, 134 task, c, data); 135 } 136 137 138 static void 139 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) 140 { 141 nxt_conn_t *c; 142 nxt_timer_t *timer; 143 144 timer = obj; 145 146 nxt_debug(task, "event conn conn timer"); 147 148 c = nxt_write_timer_conn(timer); 149 c->delayed = 0; 150 151 c->io->write(task, c, c->socket.data); 152 } 153 154 155 ssize_t 156 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) 157 { 158 nxt_uint_t niov; 159 struct iovec iov[NXT_IOBUF_MAX]; 160 161 niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); 162 163 if (niov == 0 && sb->sync) { 164 return 0; 165 } 166 167 return nxt_conn_io_writev(task, sb, iov, niov); 168 } 169 170 171 ssize_t 172 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, 173 nxt_uint_t niov) 174 { 175 ssize_t n; 176 nxt_err_t err; 177 178 if (niov == 1) { 179 /* Disposal of surplus kernel iovec copy-in operation. */ 180 return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); 181 } 182 183 for ( ;; ) { 184 n = writev(sb->socket, iov, niov); 185 186 err = (n == -1) ? nxt_socket_errno : 0; 187 188 nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n); 189 190 if (n > 0) { 191 return n; 192 } 193 194 /* n == -1 */ 195 196 switch (err) { 197 198 case NXT_EAGAIN: 199 sb->ready = 0; 200 nxt_debug(task, "writev() %E", err); 201 202 return NXT_AGAIN; 203 204 case NXT_EINTR: 205 nxt_debug(task, "writev() %E", err); 206 continue; 207 208 default: 209 sb->error = err; 210 nxt_log(task, nxt_socket_error_level(err), 211 "writev(%d, %ui) failed %E", sb->socket, niov, err); 212 213 return NXT_ERROR; 214 } 215 } 216 } 217 218 219 ssize_t 220 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) 221 { 222 ssize_t n; 223 nxt_err_t err; 224 225 for ( ;; ) { 226 n = send(sb->socket, buf, size, 0); 227 228 err = (n == -1) ? nxt_socket_errno : 0; 229 230 nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); 231 232 if (n > 0) { 233 return n; 234 } 235 236 /* n == -1 */ 237 238 switch (err) { 239 240 case NXT_EAGAIN: 241 sb->ready = 0; 242 nxt_debug(task, "send() %E", err); 243 244 return NXT_AGAIN; 245 246 case NXT_EINTR: 247 nxt_debug(task, "send() %E", err); 248 continue; 249 250 default: 251 sb->error = err; 252 nxt_log(task, nxt_socket_error_level(err), 253 "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); 254 255 return NXT_ERROR; 256 } 257 } 258 } 259 260 261 /* Obsolete interfaces. */ 262 263 size_t 264 nxt_event_conn_write_limit(nxt_conn_t *c) 265 { 266 ssize_t limit, correction; 267 nxt_event_write_rate_t *rate; 268 269 rate = c->rate; 270 271 if (rate == NULL) { 272 return c->max_chunk; 273 } 274 275 limit = rate->limit; 276 correction = limit - (size_t) rate->average; 277 278 nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f", 279 correction, rate->average); 280 281 limit += correction; 282 283 if (limit <= 0) { 284 return 0; 285 } 286 287 if (rate->limit_after != 0) { 288 limit += rate->limit_after; 289 limit = nxt_min((size_t) limit, rate->max_limit); 290 } 291 292 return nxt_min((size_t) limit, c->max_chunk); 293 } 294 295 296 nxt_bool_t 297 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c, 298 size_t sent) 299 { 300 return 0; 301 } 302 303 304 ssize_t 305 nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit) 306 { 307 ssize_t ret; 308 309 ret = c->io->sendbuf(c, b, limit); 310 311 if ((ret == NXT_AGAIN || !c->socket.write_ready) 312 && nxt_fd_event_is_disabled(c->socket.write)) 313 { 314 nxt_fd_event_enable_write(c->socket.task->thread->engine, &c->socket); 315 } 316 317 return ret; 318 } 319 320 321 ssize_t 322 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit) 323 { 324 nxt_uint_t niob; 325 struct iovec iob[NXT_IOBUF_MAX]; 326 nxt_sendbuf_coalesce_t sb; 327 328 sb.buf = b; 329 sb.iobuf = iob; 330 sb.nmax = NXT_IOBUF_MAX; 331 sb.sync = 0; 332 sb.size = 0; 333 sb.limit = limit; 334 335 niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); 336 337 if (niob == 0 && sb.sync) { 338 return 0; 339 } 340 341 return nxt_event_conn_io_writev(c, iob, niob); 342 } 343 344 345 ssize_t 346 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) 347 { 348 ssize_t n; 349 nxt_err_t err; 350 351 if (niob == 1) { 352 /* Disposal of surplus kernel iovec copy-in operation. */ 353 return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len); 354 } 355 356 for ( ;; ) { 357 n = writev(c->socket.fd, iob, niob); 358 359 err = (n == -1) ? nxt_socket_errno : 0; 360 361 nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n); 362 363 if (n > 0) { 364 return n; 365 } 366 367 /* n == -1 */ 368 369 switch (err) { 370 371 case NXT_EAGAIN: 372 nxt_debug(c->socket.task, "writev() %E", err); 373 c->socket.write_ready = 0; 374 return NXT_AGAIN; 375 376 case NXT_EINTR: 377 nxt_debug(c->socket.task, "writev() %E", err); 378 continue; 379 380 default: 381 c->socket.error = err; 382 nxt_log(c->socket.task, nxt_socket_error_level(err), 383 "writev(%d, %ui) failed %E", c->socket.fd, niob, err); 384 return NXT_ERROR; 385 } 386 } 387 } 388 389 390 ssize_t 391 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size) 392 { 393 ssize_t n; 394 nxt_err_t err; 395 396 for ( ;; ) { 397 n = send(c->socket.fd, buf, size, 0); 398 399 err = (n == -1) ? nxt_socket_errno : 0; 400 401 nxt_debug(c->socket.task, "send(%d, %p, %uz): %z", 402 c->socket.fd, buf, size, n); 403 404 if (n > 0) { 405 return n; 406 } 407 408 /* n == -1 */ 409 410 switch (err) { 411 412 case NXT_EAGAIN: 413 nxt_debug(c->socket.task, "send() %E", err); 414 c->socket.write_ready = 0; 415 return NXT_AGAIN; 416 417 case NXT_EINTR: 418 nxt_debug(c->socket.task, "send() %E", err); 419 continue; 420 421 default: 422 c->socket.error = err; 423 nxt_log(c->socket.task, nxt_socket_error_level(err), 424 "send(%d, %p, %uz) failed %E", 425 c->socket.fd, buf, size, err); 426 return NXT_ERROR; 427 } 428 } 429 } 430