xref: /unit/src/nxt_conn_connect.c (revision 1449:8bcb79f5d69d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_err_t nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c);
11 
12 
13 void
nxt_conn_sys_socket(nxt_task_t * task,void * obj,void * data)14 nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
15 {
16     nxt_conn_t          *c;
17     nxt_work_handler_t  handler;
18 
19     c = obj;
20 
21     if (nxt_conn_socket(task, c) == NXT_OK) {
22         c->socket.write_work_queue = c->write_work_queue;
23         handler = c->io->connect;
24 
25     } else {
26         handler = c->write_state->error_handler;
27     }
28 
29     nxt_work_queue_add(&task->thread->engine->connect_work_queue,
30                        handler, task, c, data);
31 }
32 
33 
34 void
nxt_conn_io_connect(nxt_task_t * task,void * obj,void * data)35 nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data)
36 {
37     nxt_conn_t              *c;
38     nxt_work_handler_t      handler;
39     nxt_event_engine_t      *engine;
40     const nxt_conn_state_t  *state;
41 
42     c = obj;
43 
44     state = c->write_state;
45 
46     switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
47 
48     case NXT_OK:
49         c->socket.write_ready = 1;
50         handler = state->ready_handler;
51         break;
52 
53     case NXT_AGAIN:
54         c->socket.write_handler = nxt_conn_connect_test;
55         c->socket.error_handler = nxt_conn_connect_error;
56 
57         engine = task->thread->engine;
58 
59         nxt_conn_timer(engine, c, state, &c->write_timer);
60 
61         nxt_fd_event_enable_write(engine, &c->socket);
62         return;
63 
64     case NXT_DECLINED:
65         handler = state->close_handler;
66         break;
67 
68     default: /* NXT_ERROR */
69         handler = state->error_handler;
70         break;
71     }
72 
73     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
74 }
75 
76 
77 nxt_int_t
nxt_conn_socket(nxt_task_t * task,nxt_conn_t * c)78 nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c)
79 {
80     nxt_uint_t    family;
81     nxt_socket_t  s;
82 
83     nxt_debug(task, "event conn socket");
84 
85     family = c->remote->u.sockaddr.sa_family;
86 
87     s = nxt_socket_create(task, family, c->remote->type, 0, NXT_NONBLOCK);
88 
89     if (nxt_slow_path(s == -1)) {
90         return NXT_ERROR;
91     }
92 
93     c->sendfile = 1;
94 
95 #if (NXT_HAVE_UNIX_DOMAIN && NXT_SOLARIS)
96 
97     if (family == AF_UNIX) {
98         /* Solaris AF_UNIX does not support sendfilev(). */
99         c->sendfile = 0;
100     }
101 
102 #endif
103 
104     c->socket.fd = s;
105 
106     c->socket.task = task;
107     c->read_timer.task = task;
108     c->write_timer.task = task;
109 
110     if (c->local != NULL) {
111         if (nxt_slow_path(nxt_socket_bind(task, s, c->local) != NXT_OK)) {
112             nxt_socket_close(task, s);
113             return NXT_ERROR;
114         }
115     }
116 
117     return NXT_OK;
118 }
119 
120 
121 void
nxt_conn_connect_test(nxt_task_t * task,void * obj,void * data)122 nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
123 {
124     nxt_err_t   err;
125     nxt_conn_t  *c;
126 
127     c = obj;
128 
129     nxt_debug(task, "event connect test fd:%d", c->socket.fd);
130 
131     nxt_fd_event_block_write(task->thread->engine, &c->socket);
132 
133     if (c->write_state->timer_autoreset) {
134         nxt_timer_disable(task->thread->engine, &c->write_timer);
135     }
136 
137     err = nxt_conn_connect_test_error(task, c);
138 
139     if (err == 0) {
140         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
141                            task, c, data);
142     } else {
143         nxt_conn_connect_error(task, c, data);
144     }
145 }
146 
147 
148 void
nxt_conn_connect_error(nxt_task_t * task,void * obj,void * data)149 nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
150 {
151     nxt_err_t               err;
152     nxt_conn_t              *c;
153     nxt_work_handler_t      handler;
154     const nxt_conn_state_t  *state;
155 
156     c = obj;
157     err = c->socket.error;
158 
159     if (err == 0) {
160         err = nxt_conn_connect_test_error(task, c);
161     }
162 
163     state = c->write_state;
164 
165     switch (err) {
166 
167     case NXT_ECONNREFUSED:
168 #if (NXT_LINUX)
169     case NXT_EAGAIN:
170         /*
171          * Linux returns EAGAIN instead of ECONNREFUSED
172          * for UNIX sockets if a listen queue is full.
173          */
174 #endif
175         handler = state->close_handler;
176         break;
177 
178     default:
179         handler = state->error_handler;
180         break;
181     }
182 
183     nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
184 }
185 
186 
187 static nxt_err_t
nxt_conn_connect_test_error(nxt_task_t * task,nxt_conn_t * c)188 nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c)
189 {
190     nxt_err_t  err;
191 
192     err = nxt_socket_error(c->socket.fd);
193 
194     if (err != 0) {
195         c->socket.error = err;
196 
197         nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
198                 c->socket.fd, (size_t) c->remote->length,
199                 nxt_sockaddr_start(c->remote), err);
200     }
201 
202     return err;
203 }
204