1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 10 11 typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, 12 nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); 13 14 15 struct nxt_http_upstream_s { 16 uint32_t current; 17 uint32_t n; 18 uint8_t protocol; 19 nxt_http_upstream_connect_t connect; 20 nxt_sockaddr_t *sockaddr[1]; 21 }; 22 23 24 static void nxt_http_upstream_connect(nxt_task_t *task, 25 nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); 26 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, 27 nxt_http_request_t *r, nxt_http_action_t *action); 28 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); 29 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); 30 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); 31 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); 32 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); 33 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); 36 37 38 static const nxt_http_request_state_t nxt_http_proxy_header_send_state; 39 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; 40 static const nxt_http_request_state_t nxt_http_proxy_header_read_state; 41 static const nxt_http_request_state_t nxt_http_proxy_read_state; 42 43 44 nxt_int_t 45 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) 46 { 47 nxt_str_t name; 48 nxt_sockaddr_t *sa; 49 nxt_http_upstream_t *upstream; 50 51 sa = NULL; 52 name = action->name; 53 54 if (nxt_str_start(&name, "http://", 7)) { 55 name.length -= 7; 56 name.start += 7; 57 58 sa = nxt_sockaddr_parse(mp, &name); 59 if (nxt_slow_path(sa == NULL)) { 60 return NXT_ERROR; 61 } 62 63 sa->type = SOCK_STREAM; 64 } 65 66 if (sa != NULL) { 67 upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); 68 if (nxt_slow_path(upstream == NULL)) { 69 return NXT_ERROR; 70 } 71 72 upstream->current = 0; 73 upstream->n = 1; 74 upstream->protocol = NXT_HTTP_PROTO_H1; 75 upstream->connect = nxt_http_upstream_connect; 76 upstream->sockaddr[0] = sa; 77 78 action->u.upstream = upstream; 79 action->handler = nxt_http_proxy_handler; 80 } 81 82 return NXT_OK; 83 } 84 85 86 static nxt_http_action_t * 87 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 88 nxt_http_action_t *action) 89 { 90 nxt_http_peer_t *peer; 91 92 peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); 93 if (nxt_slow_path(peer == NULL)) { 94 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 95 return NULL; 96 } 97 98 peer->request = r; 99 r->peer = peer; 100 101 nxt_mp_retain(r->mem_pool); 102 103 action->u.upstream->connect(task, action->u.upstream, peer); 104 105 return NULL; 106 } 107 108 109 static void 110 nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, 111 nxt_http_peer_t *peer) 112 { 113 peer->protocol = upstream->protocol; 114 peer->sockaddr = upstream->sockaddr[0]; 115 116 peer->request->state = &nxt_http_proxy_header_send_state; 117 118 nxt_http_proto[peer->protocol].peer_connect(task, peer); 119 } 120 121 122 static const nxt_http_request_state_t nxt_http_proxy_header_send_state 123 nxt_aligned(64) = 124 { 125 .ready_handler = nxt_http_proxy_header_send, 126 .error_handler = nxt_http_proxy_error, 127 }; 128 129 130 static void 131 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) 132 { 133 nxt_http_peer_t *peer; 134 nxt_http_request_t *r; 135 136 r = obj; 137 peer = data; 138 r->state = &nxt_http_proxy_header_sent_state; 139 140 nxt_http_proto[peer->protocol].peer_header_send(task, peer); 141 } 142 143 144 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state 145 nxt_aligned(64) = 146 { 147 .ready_handler = nxt_http_proxy_header_sent, 148 .error_handler = nxt_http_proxy_error, 149 }; 150 151 152 static void 153 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) 154 { 155 nxt_http_peer_t *peer; 156 nxt_http_request_t *r; 157 158 r = obj; 159 peer = data; 160 r->state = &nxt_http_proxy_header_read_state; 161 162 nxt_http_proto[peer->protocol].peer_header_read(task, peer); 163 } 164 165 166 static const nxt_http_request_state_t nxt_http_proxy_header_read_state 167 nxt_aligned(64) = 168 { 169 .ready_handler = nxt_http_proxy_header_read, 170 .error_handler = nxt_http_proxy_error, 171 }; 172 173 174 static void 175 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) 176 { 177 nxt_http_peer_t *peer; 178 nxt_http_field_t *f, *field; 179 nxt_http_request_t *r; 180 181 r = obj; 182 peer = data; 183 184 r->status = peer->status; 185 186 nxt_debug(task, "http proxy status: %d", peer->status); 187 188 nxt_list_each(field, peer->fields) { 189 190 nxt_debug(task, "http proxy header: \"%*s: %*s\"", 191 (size_t) field->name_length, field->name, 192 (size_t) field->value_length, field->value); 193 194 if (!field->skip) { 195 f = nxt_list_add(r->resp.fields); 196 if (nxt_slow_path(f == NULL)) { 197 nxt_http_proxy_error(task, r, peer); 198 return; 199 } 200 201 *f = *field; 202 } 203 204 } nxt_list_loop; 205 206 nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); 207 } 208 209 210 static void 211 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) 212 { 213 nxt_buf_t *out; 214 nxt_http_peer_t *peer; 215 nxt_http_request_t *r; 216 217 r = obj; 218 peer = data; 219 out = peer->body; 220 221 if (out != NULL) { 222 peer->body = NULL; 223 nxt_http_request_send(task, r, out); 224 } 225 226 r->state = &nxt_http_proxy_read_state; 227 228 nxt_http_proto[peer->protocol].peer_read(task, peer); 229 } 230 231 232 static const nxt_http_request_state_t nxt_http_proxy_read_state 233 nxt_aligned(64) = 234 { 235 .ready_handler = nxt_http_proxy_read, 236 .error_handler = nxt_http_proxy_error, 237 }; 238 239 240 static void 241 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) 242 { 243 nxt_buf_t *out; 244 nxt_bool_t last; 245 nxt_http_peer_t *peer; 246 nxt_http_request_t *r; 247 248 r = obj; 249 peer = data; 250 out = peer->body; 251 peer->body = NULL; 252 last = nxt_buf_is_last(out); 253 254 nxt_http_request_send(task, r, out); 255 256 if (!last) { 257 nxt_http_proto[peer->protocol].peer_read(task, peer); 258 259 } else { 260 nxt_http_proto[peer->protocol].peer_close(task, peer); 261 262 nxt_mp_release(r->mem_pool); 263 } 264 } 265 266 267 nxt_buf_t * 268 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, 269 size_t size) 270 { 271 nxt_buf_t *b; 272 273 b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); 274 if (nxt_fast_path(b != NULL)) { 275 b->completion_handler = nxt_http_proxy_buf_mem_completion; 276 b->parent = r; 277 nxt_mp_retain(r->mem_pool); 278 279 } else { 280 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 281 } 282 283 return b; 284 } 285 286 287 static void 288 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) 289 { 290 nxt_buf_t *b, *next; 291 nxt_http_peer_t *peer; 292 nxt_http_request_t *r; 293 294 b = obj; 295 r = data; 296 297 peer = r->peer; 298 299 do { 300 next = b->next; 301 302 nxt_http_proxy_buf_mem_free(task, r, b); 303 304 b = next; 305 } while (b != NULL); 306 307 if (!peer->closed) { 308 nxt_http_proto[peer->protocol].peer_read(task, peer); 309 } 310 } 311 312 313 void 314 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, 315 nxt_buf_t *b) 316 { 317 nxt_event_engine_buf_mem_free(task->thread->engine, b); 318 319 nxt_mp_release(r->mem_pool); 320 } 321 322 323 static void 324 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) 325 { 326 nxt_http_peer_t *peer; 327 nxt_http_request_t *r; 328 329 r = obj; 330 peer = r->peer; 331 332 nxt_http_proto[peer->protocol].peer_close(task, peer); 333 334 nxt_mp_release(r->mem_pool); 335 336 nxt_http_request_error(task, r, peer->status); 337 } 338 339 340 nxt_int_t 341 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) 342 { 343 nxt_http_request_t *r; 344 345 r = ctx; 346 347 r->resp.date = field; 348 349 return NXT_OK; 350 } 351 352 353 nxt_int_t 354 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, 355 uintptr_t data) 356 { 357 nxt_off_t n; 358 nxt_http_request_t *r; 359 360 r = ctx; 361 362 r->resp.content_length = field; 363 364 n = nxt_off_t_parse(field->value, field->value_length); 365 366 if (nxt_fast_path(n >= 0)) { 367 r->resp.content_length_n = n; 368 } 369 370 return NXT_OK; 371 } 372 373 374 nxt_int_t 375 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) 376 { 377 field->skip = 1; 378 379 return NXT_OK; 380 } 381