1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 #include <nxt_upstream.h> 10 11 12 struct nxt_upstream_proxy_s { 13 nxt_sockaddr_t *sockaddr; 14 uint8_t protocol; 15 }; 16 17 18 static void nxt_http_proxy_server_get(nxt_task_t *task, 19 nxt_upstream_server_t *us); 20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task, 21 nxt_upstream_server_t *us); 22 static void nxt_http_proxy_upstream_error(nxt_task_t *task, 23 nxt_upstream_server_t *us); 24 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, 25 nxt_http_request_t *r, nxt_http_action_t *action); 26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); 27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); 28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); 29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); 30 static void nxt_http_proxy_request_send(nxt_task_t *task, 31 nxt_http_request_t *r, nxt_buf_t *out); 32 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); 33 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, 34 void *data); 35 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); 36 37 38 static const nxt_http_request_state_t nxt_http_proxy_header_send_state; 39 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; 40 static const nxt_http_request_state_t nxt_http_proxy_header_read_state; 41 static const nxt_http_request_state_t nxt_http_proxy_read_state; 42 43 44 static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = { 45 .get = nxt_http_proxy_server_get, 46 }; 47 48 49 static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = { 50 .ready = nxt_http_proxy_upstream_ready, 51 .error = nxt_http_proxy_upstream_error, 52 }; 53 54 55 nxt_int_t 56 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) 57 { 58 nxt_str_t name; 59 nxt_sockaddr_t *sa; 60 nxt_upstream_t *up; 61 nxt_upstream_proxy_t *proxy; 62 63 sa = NULL; 64 name = action->name; 65 66 if (nxt_str_start(&name, "http://", 7)) { 67 name.length -= 7; 68 name.start += 7; 69 70 sa = nxt_sockaddr_parse(mp, &name); 71 if (nxt_slow_path(sa == NULL)) { 72 return NXT_ERROR; 73 } 74 75 sa->type = SOCK_STREAM; 76 } 77 78 if (sa != NULL) { 79 up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); 80 if (nxt_slow_path(up == NULL)) { 81 return NXT_ERROR; 82 } 83 84 up->name.length = sa->length; 85 up->name.start = nxt_sockaddr_start(sa); 86 up->proto = &nxt_upstream_simple_proto; 87 88 proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t)); 89 if (nxt_slow_path(proxy == NULL)) { 90 return NXT_ERROR; 91 } 92 93 proxy->sockaddr = sa; 94 proxy->protocol = NXT_HTTP_PROTO_H1; 95 up->type.proxy = proxy; 96 97 action->u.upstream = up; 98 action->handler = nxt_http_proxy_handler; 99 } 100 101 return NXT_OK; 102 } 103 104 105 static nxt_http_action_t * 106 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 107 nxt_http_action_t *action) 108 { 109 return nxt_upstream_proxy_handler(task, r, action->u.upstream); 110 } 111 112 113 nxt_http_action_t * 114 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 115 nxt_upstream_t *upstream) 116 { 117 nxt_http_peer_t *peer; 118 nxt_upstream_server_t *us; 119 120 us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t)); 121 if (nxt_slow_path(us == NULL)) { 122 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 123 return NULL; 124 } 125 126 peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); 127 if (nxt_slow_path(peer == NULL)) { 128 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 129 return NULL; 130 } 131 132 peer->request = r; 133 r->peer = peer; 134 135 nxt_mp_retain(r->mem_pool); 136 137 us->state = &nxt_upstream_proxy_state; 138 us->peer.http = peer; 139 peer->server = us; 140 141 us->upstream = upstream; 142 upstream->proto->get(task, us); 143 144 return NULL; 145 } 146 147 148 static void 149 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us) 150 { 151 nxt_upstream_proxy_t *proxy; 152 153 proxy = us->upstream->type.proxy; 154 155 us->sockaddr = proxy->sockaddr; 156 us->protocol = proxy->protocol; 157 158 us->state->ready(task, us); 159 } 160 161 162 static void 163 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us) 164 { 165 nxt_http_peer_t *peer; 166 167 peer = us->peer.http; 168 169 peer->protocol = us->protocol; 170 171 peer->request->state = &nxt_http_proxy_header_send_state; 172 173 nxt_http_proto[peer->protocol].peer_connect(task, peer); 174 } 175 176 177 static void 178 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us) 179 { 180 nxt_http_request_t *r; 181 182 r = us->peer.http->request; 183 184 nxt_mp_release(r->mem_pool); 185 186 nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY); 187 } 188 189 190 static const nxt_http_request_state_t nxt_http_proxy_header_send_state 191 nxt_aligned(64) = 192 { 193 .ready_handler = nxt_http_proxy_header_send, 194 .error_handler = nxt_http_proxy_error, 195 }; 196 197 198 static void 199 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) 200 { 201 nxt_http_peer_t *peer; 202 nxt_http_request_t *r; 203 204 r = obj; 205 peer = data; 206 r->state = &nxt_http_proxy_header_sent_state; 207 208 nxt_http_proto[peer->protocol].peer_header_send(task, peer); 209 } 210 211 212 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state 213 nxt_aligned(64) = 214 { 215 .ready_handler = nxt_http_proxy_header_sent, 216 .error_handler = nxt_http_proxy_error, 217 }; 218 219 220 static void 221 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) 222 { 223 nxt_http_peer_t *peer; 224 nxt_http_request_t *r; 225 226 r = obj; 227 peer = data; 228 r->state = &nxt_http_proxy_header_read_state; 229 230 nxt_http_proto[peer->protocol].peer_header_read(task, peer); 231 } 232 233 234 static const nxt_http_request_state_t nxt_http_proxy_header_read_state 235 nxt_aligned(64) = 236 { 237 .ready_handler = nxt_http_proxy_header_read, 238 .error_handler = nxt_http_proxy_error, 239 }; 240 241 242 static void 243 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) 244 { 245 nxt_http_peer_t *peer; 246 nxt_http_field_t *f, *field; 247 nxt_http_request_t *r; 248 249 r = obj; 250 peer = data; 251 252 r->status = peer->status; 253 254 nxt_debug(task, "http proxy status: %d", peer->status); 255 256 if (r->resp.content_length_n > 0) { 257 peer->remainder = r->resp.content_length_n; 258 } 259 260 nxt_list_each(field, peer->fields) { 261 262 nxt_debug(task, "http proxy header: \"%*s: %*s\"", 263 (size_t) field->name_length, field->name, 264 (size_t) field->value_length, field->value); 265 266 if (!field->skip) { 267 f = nxt_list_add(r->resp.fields); 268 if (nxt_slow_path(f == NULL)) { 269 nxt_http_proxy_error(task, r, peer); 270 return; 271 } 272 273 *f = *field; 274 } 275 276 } nxt_list_loop; 277 278 nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); 279 } 280 281 282 static void 283 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) 284 { 285 nxt_buf_t *out; 286 nxt_http_peer_t *peer; 287 nxt_http_request_t *r; 288 289 r = obj; 290 peer = data; 291 out = peer->body; 292 293 if (out != NULL) { 294 peer->body = NULL; 295 nxt_http_proxy_request_send(task, r, out); 296 } 297 298 r->state = &nxt_http_proxy_read_state; 299 300 nxt_http_proto[peer->protocol].peer_read(task, peer); 301 } 302 303 304 static void 305 nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r, 306 nxt_buf_t *out) 307 { 308 size_t length; 309 310 if (r->peer->remainder > 0) { 311 length = nxt_buf_chain_length(out); 312 r->peer->remainder -= length; 313 } 314 315 nxt_http_request_send(task, r, out); 316 } 317 318 319 static const nxt_http_request_state_t nxt_http_proxy_read_state 320 nxt_aligned(64) = 321 { 322 .ready_handler = nxt_http_proxy_read, 323 .error_handler = nxt_http_proxy_error, 324 }; 325 326 327 static void 328 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) 329 { 330 nxt_buf_t *out; 331 nxt_bool_t last; 332 nxt_http_peer_t *peer; 333 nxt_http_request_t *r; 334 335 r = obj; 336 peer = data; 337 out = peer->body; 338 peer->body = NULL; 339 last = nxt_buf_is_last(out); 340 341 nxt_http_proxy_request_send(task, r, out); 342 343 if (!last) { 344 nxt_http_proto[peer->protocol].peer_read(task, peer); 345 346 } else { 347 r->inconsistent = (peer->remainder != 0); 348 349 nxt_http_proto[peer->protocol].peer_close(task, peer); 350 351 nxt_mp_release(r->mem_pool); 352 } 353 } 354 355 356 nxt_buf_t * 357 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, 358 size_t size) 359 { 360 nxt_buf_t *b; 361 362 b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); 363 if (nxt_fast_path(b != NULL)) { 364 b->completion_handler = nxt_http_proxy_buf_mem_completion; 365 b->parent = r; 366 nxt_mp_retain(r->mem_pool); 367 368 } else { 369 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 370 } 371 372 return b; 373 } 374 375 376 static void 377 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) 378 { 379 nxt_buf_t *b, *next; 380 nxt_http_peer_t *peer; 381 nxt_http_request_t *r; 382 383 b = obj; 384 r = data; 385 386 peer = r->peer; 387 388 do { 389 next = b->next; 390 391 nxt_http_proxy_buf_mem_free(task, r, b); 392 393 b = next; 394 } while (b != NULL); 395 396 if (!peer->closed) { 397 nxt_http_proto[peer->protocol].peer_read(task, peer); 398 } 399 } 400 401 402 void 403 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, 404 nxt_buf_t *b) 405 { 406 nxt_event_engine_buf_mem_free(task->thread->engine, b); 407 408 nxt_mp_release(r->mem_pool); 409 } 410 411 412 static void 413 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) 414 { 415 nxt_http_peer_t *peer; 416 nxt_http_request_t *r; 417 418 r = obj; 419 peer = r->peer; 420 421 nxt_http_proto[peer->protocol].peer_close(task, peer); 422 423 nxt_mp_release(r->mem_pool); 424 425 nxt_http_request_error(task, r, peer->status); 426 } 427 428 429 nxt_int_t 430 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) 431 { 432 nxt_http_request_t *r; 433 434 r = ctx; 435 436 r->resp.date = field; 437 438 return NXT_OK; 439 } 440 441 442 nxt_int_t 443 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, 444 uintptr_t data) 445 { 446 nxt_off_t n; 447 nxt_http_request_t *r; 448 449 r = ctx; 450 451 r->resp.content_length = field; 452 453 n = nxt_off_t_parse(field->value, field->value_length); 454 455 if (nxt_fast_path(n >= 0)) { 456 r->resp.content_length_n = n; 457 } 458 459 return NXT_OK; 460 } 461 462 463 nxt_int_t 464 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) 465 { 466 field->skip = 1; 467 468 return NXT_OK; 469 } 470