1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 #include <nxt_upstream.h> 10 11 12 struct nxt_upstream_proxy_s { 13 nxt_sockaddr_t *sockaddr; 14 uint8_t protocol; 15 }; 16 17 18 static void nxt_http_proxy_server_get(nxt_task_t *task, 19 nxt_upstream_server_t *us); 20 static void nxt_http_proxy_upstream_ready(nxt_task_t *task, 21 nxt_upstream_server_t *us); 22 static void nxt_http_proxy_upstream_error(nxt_task_t *task, 23 nxt_upstream_server_t *us); 24 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, 25 nxt_http_request_t *r, nxt_http_action_t *action); 26 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); 27 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); 28 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); 29 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); 30 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, 31 void *data); 32 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); 33 34 35 static const nxt_http_request_state_t nxt_http_proxy_header_send_state; 36 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; 37 static const nxt_http_request_state_t nxt_http_proxy_header_read_state; 38 static const nxt_http_request_state_t nxt_http_proxy_read_state; 39 40 41 static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = { 42 .get = nxt_http_proxy_server_get, 43 }; 44 45 46 static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = { 47 .ready = nxt_http_proxy_upstream_ready, 48 .error = nxt_http_proxy_upstream_error, 49 }; 50 51 52 nxt_int_t 53 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) 54 { 55 nxt_str_t name; 56 nxt_sockaddr_t *sa; 57 nxt_upstream_t *up; 58 nxt_upstream_proxy_t *proxy; 59 60 sa = NULL; 61 name = action->name; 62 63 if (nxt_str_start(&name, "http://", 7)) { 64 name.length -= 7; 65 name.start += 7; 66 67 sa = nxt_sockaddr_parse(mp, &name); 68 if (nxt_slow_path(sa == NULL)) { 69 return NXT_ERROR; 70 } 71 72 sa->type = SOCK_STREAM; 73 } 74 75 if (sa != NULL) { 76 up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); 77 if (nxt_slow_path(up == NULL)) { 78 return NXT_ERROR; 79 } 80 81 up->name.length = sa->length; 82 up->name.start = nxt_sockaddr_start(sa); 83 up->proto = &nxt_upstream_simple_proto; 84 85 proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t)); 86 if (nxt_slow_path(proxy == NULL)) { 87 return NXT_ERROR; 88 } 89 90 proxy->sockaddr = sa; 91 proxy->protocol = NXT_HTTP_PROTO_H1; 92 up->type.proxy = proxy; 93 94 action->u.upstream = up; 95 action->handler = nxt_http_proxy_handler; 96 } 97 98 return NXT_OK; 99 } 100 101 102 static nxt_http_action_t * 103 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 104 nxt_http_action_t *action) 105 { 106 return nxt_upstream_proxy_handler(task, r, action->u.upstream); 107 } 108 109 110 nxt_http_action_t * 111 nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 112 nxt_upstream_t *upstream) 113 { 114 nxt_http_peer_t *peer; 115 nxt_upstream_server_t *us; 116 117 us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t)); 118 if (nxt_slow_path(us == NULL)) { 119 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 120 return NULL; 121 } 122 123 peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); 124 if (nxt_slow_path(peer == NULL)) { 125 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 126 return NULL; 127 } 128 129 peer->request = r; 130 r->peer = peer; 131 132 nxt_mp_retain(r->mem_pool); 133 134 us->state = &nxt_upstream_proxy_state; 135 us->peer.http = peer; 136 peer->server = us; 137 138 us->upstream = upstream; 139 upstream->proto->get(task, us); 140 141 return NULL; 142 } 143 144 145 static void 146 nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us) 147 { 148 nxt_upstream_proxy_t *proxy; 149 150 proxy = us->upstream->type.proxy; 151 152 us->sockaddr = proxy->sockaddr; 153 us->protocol = proxy->protocol; 154 155 us->state->ready(task, us); 156 } 157 158 159 static void 160 nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us) 161 { 162 nxt_http_peer_t *peer; 163 164 peer = us->peer.http; 165 166 peer->protocol = us->protocol; 167 168 peer->request->state = &nxt_http_proxy_header_send_state; 169 170 nxt_http_proto[peer->protocol].peer_connect(task, peer); 171 } 172 173 174 static void 175 nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us) 176 { 177 nxt_http_request_t *r; 178 179 r = us->peer.http->request; 180 181 nxt_mp_release(r->mem_pool); 182 183 nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY); 184 } 185 186 187 static const nxt_http_request_state_t nxt_http_proxy_header_send_state 188 nxt_aligned(64) = 189 { 190 .ready_handler = nxt_http_proxy_header_send, 191 .error_handler = nxt_http_proxy_error, 192 }; 193 194 195 static void 196 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) 197 { 198 nxt_http_peer_t *peer; 199 nxt_http_request_t *r; 200 201 r = obj; 202 peer = data; 203 r->state = &nxt_http_proxy_header_sent_state; 204 205 nxt_http_proto[peer->protocol].peer_header_send(task, peer); 206 } 207 208 209 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state 210 nxt_aligned(64) = 211 { 212 .ready_handler = nxt_http_proxy_header_sent, 213 .error_handler = nxt_http_proxy_error, 214 }; 215 216 217 static void 218 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) 219 { 220 nxt_http_peer_t *peer; 221 nxt_http_request_t *r; 222 223 r = obj; 224 peer = data; 225 r->state = &nxt_http_proxy_header_read_state; 226 227 nxt_http_proto[peer->protocol].peer_header_read(task, peer); 228 } 229 230 231 static const nxt_http_request_state_t nxt_http_proxy_header_read_state 232 nxt_aligned(64) = 233 { 234 .ready_handler = nxt_http_proxy_header_read, 235 .error_handler = nxt_http_proxy_error, 236 }; 237 238 239 static void 240 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) 241 { 242 nxt_http_peer_t *peer; 243 nxt_http_field_t *f, *field; 244 nxt_http_request_t *r; 245 246 r = obj; 247 peer = data; 248 249 r->status = peer->status; 250 251 nxt_debug(task, "http proxy status: %d", peer->status); 252 253 nxt_list_each(field, peer->fields) { 254 255 nxt_debug(task, "http proxy header: \"%*s: %*s\"", 256 (size_t) field->name_length, field->name, 257 (size_t) field->value_length, field->value); 258 259 if (!field->skip) { 260 f = nxt_list_add(r->resp.fields); 261 if (nxt_slow_path(f == NULL)) { 262 nxt_http_proxy_error(task, r, peer); 263 return; 264 } 265 266 *f = *field; 267 } 268 269 } nxt_list_loop; 270 271 r->state = &nxt_http_proxy_read_state; 272 273 nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); 274 } 275 276 277 static const nxt_http_request_state_t nxt_http_proxy_read_state 278 nxt_aligned(64) = 279 { 280 .ready_handler = nxt_http_proxy_send_body, 281 .error_handler = nxt_http_proxy_error, 282 }; 283 284 285 static void 286 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) 287 { 288 nxt_buf_t *out; 289 nxt_http_peer_t *peer; 290 nxt_http_request_t *r; 291 292 r = obj; 293 peer = data; 294 out = peer->body; 295 296 if (out != NULL) { 297 peer->body = NULL; 298 nxt_http_request_send(task, r, out); 299 } 300 301 if (!peer->closed) { 302 nxt_http_proto[peer->protocol].peer_read(task, peer); 303 304 } else { 305 nxt_http_proto[peer->protocol].peer_close(task, peer); 306 307 nxt_mp_release(r->mem_pool); 308 } 309 } 310 311 312 nxt_buf_t * 313 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, 314 size_t size) 315 { 316 nxt_buf_t *b; 317 318 b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); 319 if (nxt_fast_path(b != NULL)) { 320 b->completion_handler = nxt_http_proxy_buf_mem_completion; 321 b->parent = r; 322 nxt_mp_retain(r->mem_pool); 323 324 } else { 325 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 326 } 327 328 return b; 329 } 330 331 332 static void 333 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) 334 { 335 nxt_buf_t *b, *next; 336 nxt_http_peer_t *peer; 337 nxt_http_request_t *r; 338 339 b = obj; 340 r = data; 341 342 peer = r->peer; 343 344 do { 345 next = b->next; 346 347 nxt_http_proxy_buf_mem_free(task, r, b); 348 349 b = next; 350 } while (b != NULL); 351 352 if (!peer->closed) { 353 nxt_http_proto[peer->protocol].peer_read(task, peer); 354 } 355 } 356 357 358 void 359 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, 360 nxt_buf_t *b) 361 { 362 nxt_event_engine_buf_mem_free(task->thread->engine, b); 363 364 nxt_mp_release(r->mem_pool); 365 } 366 367 368 static void 369 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) 370 { 371 nxt_http_peer_t *peer; 372 nxt_http_request_t *r; 373 374 r = obj; 375 peer = r->peer; 376 377 nxt_http_proto[peer->protocol].peer_close(task, peer); 378 379 nxt_mp_release(r->mem_pool); 380 381 nxt_http_request_error(&r->task, r, peer->status); 382 } 383 384 385 nxt_int_t 386 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) 387 { 388 nxt_http_request_t *r; 389 390 r = ctx; 391 392 r->resp.date = field; 393 394 return NXT_OK; 395 } 396 397 398 nxt_int_t 399 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, 400 uintptr_t data) 401 { 402 nxt_off_t n; 403 nxt_http_request_t *r; 404 405 r = ctx; 406 407 r->resp.content_length = field; 408 409 n = nxt_off_t_parse(field->value, field->value_length); 410 411 if (nxt_fast_path(n >= 0)) { 412 r->resp.content_length_n = n; 413 } 414 415 return NXT_OK; 416 } 417 418 419 nxt_int_t 420 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) 421 { 422 field->skip = 1; 423 424 return NXT_OK; 425 } 426