162Sigor@sysoev.ru
262Sigor@sysoev.ru /*
362Sigor@sysoev.ru * Copyright (C) Igor Sysoev
462Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
562Sigor@sysoev.ru */
662Sigor@sysoev.ru
762Sigor@sysoev.ru #include <nxt_main.h>
862Sigor@sysoev.ru
962Sigor@sysoev.ru
1062Sigor@sysoev.ru void
nxt_conn_wait(nxt_conn_t * c)1162Sigor@sysoev.ru nxt_conn_wait(nxt_conn_t *c)
1262Sigor@sysoev.ru {
1362Sigor@sysoev.ru nxt_event_engine_t *engine;
1462Sigor@sysoev.ru const nxt_conn_state_t *state;
1562Sigor@sysoev.ru
1662Sigor@sysoev.ru nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
1762Sigor@sysoev.ru c->socket.fd, c->socket.read_ready);
1862Sigor@sysoev.ru
1962Sigor@sysoev.ru engine = c->socket.task->thread->engine;
2062Sigor@sysoev.ru state = c->read_state;
2162Sigor@sysoev.ru
2262Sigor@sysoev.ru if (c->socket.read_ready) {
2362Sigor@sysoev.ru nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
2462Sigor@sysoev.ru c->socket.task, c, c->socket.data);
2562Sigor@sysoev.ru return;
2662Sigor@sysoev.ru }
2762Sigor@sysoev.ru
2862Sigor@sysoev.ru c->socket.read_handler = state->ready_handler;
2962Sigor@sysoev.ru c->socket.error_handler = state->error_handler;
3062Sigor@sysoev.ru
3162Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer);
3262Sigor@sysoev.ru
3362Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket);
3462Sigor@sysoev.ru }
3562Sigor@sysoev.ru
3662Sigor@sysoev.ru
3762Sigor@sysoev.ru void
nxt_conn_io_read(nxt_task_t * task,void * obj,void * data)3862Sigor@sysoev.ru nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
3962Sigor@sysoev.ru {
4062Sigor@sysoev.ru ssize_t n;
4162Sigor@sysoev.ru nxt_conn_t *c;
4262Sigor@sysoev.ru nxt_event_engine_t *engine;
4362Sigor@sysoev.ru nxt_work_handler_t handler;
4462Sigor@sysoev.ru const nxt_conn_state_t *state;
4562Sigor@sysoev.ru
4662Sigor@sysoev.ru c = obj;
4762Sigor@sysoev.ru
48979Sigor@sysoev.ru nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d",
49979Sigor@sysoev.ru c->socket.fd, c->socket.read_ready, c->socket.closed,
50979Sigor@sysoev.ru c->socket.error, c->block_read);
5162Sigor@sysoev.ru
52979Sigor@sysoev.ru if (c->socket.error != 0 || c->block_read) {
53836Sigor@sysoev.ru return;
54836Sigor@sysoev.ru }
55836Sigor@sysoev.ru
5662Sigor@sysoev.ru engine = task->thread->engine;
5762Sigor@sysoev.ru
58771Sigor@sysoev.ru /*
59771Sigor@sysoev.ru * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
60771Sigor@sysoev.ru * because the function can be called by nxt_kqueue_conn_io_read().
61771Sigor@sysoev.ru */
62771Sigor@sysoev.ru c->socket.read_handler = c->io->read;
6362Sigor@sysoev.ru state = c->read_state;
64771Sigor@sysoev.ru c->socket.error_handler = state->error_handler;
6562Sigor@sysoev.ru
6662Sigor@sysoev.ru if (c->socket.read_ready) {
6762Sigor@sysoev.ru
68629Sigor@sysoev.ru if (state->io_read_handler == NULL) {
69629Sigor@sysoev.ru n = c->io->recvbuf(c, c->read);
7062Sigor@sysoev.ru
7162Sigor@sysoev.ru } else {
72*1268Sigor@sysoev.ru n = state->io_read_handler(task, c);
73741Sigor@sysoev.ru /* The state can be changed by io_read_handler. */
74741Sigor@sysoev.ru state = c->read_state;
7562Sigor@sysoev.ru }
7662Sigor@sysoev.ru
7762Sigor@sysoev.ru if (n > 0) {
7862Sigor@sysoev.ru c->nbytes = n;
7962Sigor@sysoev.ru
80629Sigor@sysoev.ru nxt_recvbuf_update(c->read, n);
8162Sigor@sysoev.ru
8262Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket);
8362Sigor@sysoev.ru
8462Sigor@sysoev.ru if (state->timer_autoreset) {
8562Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer);
8662Sigor@sysoev.ru }
8762Sigor@sysoev.ru
88771Sigor@sysoev.ru nxt_work_queue_add(c->read_work_queue,
89771Sigor@sysoev.ru state->ready_handler, task, c, data);
9062Sigor@sysoev.ru return;
9162Sigor@sysoev.ru }
9262Sigor@sysoev.ru
9362Sigor@sysoev.ru if (n != NXT_AGAIN) {
94771Sigor@sysoev.ru /* n == 0 or n == NXT_ERROR. */
95771Sigor@sysoev.ru handler = (n == 0) ? state->close_handler : state->error_handler;
96771Sigor@sysoev.ru
9762Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket);
9862Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer);
9962Sigor@sysoev.ru
100771Sigor@sysoev.ru nxt_work_queue_add(&engine->fast_work_queue,
101771Sigor@sysoev.ru handler, task, c, data);
102771Sigor@sysoev.ru return;
103771Sigor@sysoev.ru }
104771Sigor@sysoev.ru
105771Sigor@sysoev.ru /* n == NXT_AGAIN. */
10662Sigor@sysoev.ru
107771Sigor@sysoev.ru if (c->socket.read_ready) {
108771Sigor@sysoev.ru /*
109771Sigor@sysoev.ru * SSL/TLS library can return NXT_AGAIN if renegotiation
110771Sigor@sysoev.ru * occured during read operation, it toggled write event
111771Sigor@sysoev.ru * internally so only read timer should be set.
112771Sigor@sysoev.ru */
113809Svbart@nginx.com if (!c->read_timer.enabled) {
114771Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer);
115771Sigor@sysoev.ru }
11662Sigor@sysoev.ru
11762Sigor@sysoev.ru return;
11862Sigor@sysoev.ru }
11962Sigor@sysoev.ru }
12062Sigor@sysoev.ru
121766Sigor@sysoev.ru if (nxt_fd_event_is_disabled(c->socket.read)) {
122766Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket);
12362Sigor@sysoev.ru }
12462Sigor@sysoev.ru
125809Svbart@nginx.com if (state->timer_autoreset || !c->read_timer.enabled) {
126766Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer);
127766Sigor@sysoev.ru }
12862Sigor@sysoev.ru }
12962Sigor@sysoev.ru
13062Sigor@sysoev.ru
13162Sigor@sysoev.ru ssize_t
nxt_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)13262Sigor@sysoev.ru nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
13362Sigor@sysoev.ru {
13462Sigor@sysoev.ru ssize_t n;
13562Sigor@sysoev.ru nxt_err_t err;
13662Sigor@sysoev.ru nxt_uint_t niov;
13762Sigor@sysoev.ru struct iovec iov[NXT_IOBUF_MAX];
13862Sigor@sysoev.ru nxt_recvbuf_coalesce_t rb;
13962Sigor@sysoev.ru
14062Sigor@sysoev.ru rb.buf = b;
14162Sigor@sysoev.ru rb.iobuf = iov;
14262Sigor@sysoev.ru rb.nmax = NXT_IOBUF_MAX;
14362Sigor@sysoev.ru rb.size = 0;
14462Sigor@sysoev.ru
14562Sigor@sysoev.ru niov = nxt_recvbuf_mem_coalesce(&rb);
14662Sigor@sysoev.ru
14762Sigor@sysoev.ru if (niov == 1) {
14862Sigor@sysoev.ru /* Disposal of surplus kernel iovec copy-in operation. */
14962Sigor@sysoev.ru return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
15062Sigor@sysoev.ru }
15162Sigor@sysoev.ru
15262Sigor@sysoev.ru for ( ;; ) {
15362Sigor@sysoev.ru n = readv(c->socket.fd, iov, niov);
15462Sigor@sysoev.ru
15562Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0;
15662Sigor@sysoev.ru
15762Sigor@sysoev.ru nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
15862Sigor@sysoev.ru
15962Sigor@sysoev.ru if (n > 0) {
16062Sigor@sysoev.ru if ((size_t) n < rb.size) {
16162Sigor@sysoev.ru c->socket.read_ready = 0;
16262Sigor@sysoev.ru }
16362Sigor@sysoev.ru
16462Sigor@sysoev.ru return n;
16562Sigor@sysoev.ru }
16662Sigor@sysoev.ru
16762Sigor@sysoev.ru if (n == 0) {
16862Sigor@sysoev.ru c->socket.closed = 1;
16962Sigor@sysoev.ru c->socket.read_ready = 0;
17062Sigor@sysoev.ru return n;
17162Sigor@sysoev.ru }
17262Sigor@sysoev.ru
17362Sigor@sysoev.ru /* n == -1 */
17462Sigor@sysoev.ru
17562Sigor@sysoev.ru switch (err) {
17662Sigor@sysoev.ru
17762Sigor@sysoev.ru case NXT_EAGAIN:
17862Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err);
17962Sigor@sysoev.ru c->socket.read_ready = 0;
18062Sigor@sysoev.ru return NXT_AGAIN;
18162Sigor@sysoev.ru
18262Sigor@sysoev.ru case NXT_EINTR:
18362Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err);
18462Sigor@sysoev.ru continue;
18562Sigor@sysoev.ru
18662Sigor@sysoev.ru default:
18762Sigor@sysoev.ru c->socket.error = err;
18862Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err),
18962Sigor@sysoev.ru "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
19062Sigor@sysoev.ru
19162Sigor@sysoev.ru return NXT_ERROR;
19262Sigor@sysoev.ru }
19362Sigor@sysoev.ru }
19462Sigor@sysoev.ru }
19562Sigor@sysoev.ru
19662Sigor@sysoev.ru
19762Sigor@sysoev.ru ssize_t
nxt_conn_io_recv(nxt_conn_t * c,void * buf,size_t size,nxt_uint_t flags)19862Sigor@sysoev.ru nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
19962Sigor@sysoev.ru {
20062Sigor@sysoev.ru ssize_t n;
20162Sigor@sysoev.ru nxt_err_t err;
20262Sigor@sysoev.ru
20362Sigor@sysoev.ru for ( ;; ) {
20462Sigor@sysoev.ru n = recv(c->socket.fd, buf, size, flags);
20562Sigor@sysoev.ru
20662Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0;
20762Sigor@sysoev.ru
20862Sigor@sysoev.ru nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
20962Sigor@sysoev.ru c->socket.fd, buf, size, flags, n);
21062Sigor@sysoev.ru
21162Sigor@sysoev.ru if (n > 0) {
212771Sigor@sysoev.ru if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
21362Sigor@sysoev.ru c->socket.read_ready = 0;
21462Sigor@sysoev.ru }
21562Sigor@sysoev.ru
21662Sigor@sysoev.ru return n;
21762Sigor@sysoev.ru }
21862Sigor@sysoev.ru
21962Sigor@sysoev.ru if (n == 0) {
22062Sigor@sysoev.ru c->socket.closed = 1;
221771Sigor@sysoev.ru
222771Sigor@sysoev.ru if ((flags & MSG_PEEK) == 0) {
223771Sigor@sysoev.ru c->socket.read_ready = 0;
224771Sigor@sysoev.ru }
22562Sigor@sysoev.ru
22662Sigor@sysoev.ru return n;
22762Sigor@sysoev.ru }
22862Sigor@sysoev.ru
22962Sigor@sysoev.ru /* n == -1 */
23062Sigor@sysoev.ru
23162Sigor@sysoev.ru switch (err) {
23262Sigor@sysoev.ru
23362Sigor@sysoev.ru case NXT_EAGAIN:
23462Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err);
23562Sigor@sysoev.ru c->socket.read_ready = 0;
23662Sigor@sysoev.ru
23762Sigor@sysoev.ru return NXT_AGAIN;
23862Sigor@sysoev.ru
23962Sigor@sysoev.ru case NXT_EINTR:
24062Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err);
24162Sigor@sysoev.ru continue;
24262Sigor@sysoev.ru
24362Sigor@sysoev.ru default:
24462Sigor@sysoev.ru c->socket.error = err;
24562Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err),
24662Sigor@sysoev.ru "recv(%d, %p, %uz, %ui) failed %E",
24762Sigor@sysoev.ru c->socket.fd, buf, size, flags, err);
24862Sigor@sysoev.ru
24962Sigor@sysoev.ru return NXT_ERROR;
25062Sigor@sysoev.ru }
25162Sigor@sysoev.ru }
25262Sigor@sysoev.ru }
253