1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 10 11 typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, 12 nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); 13 14 15 struct nxt_http_upstream_s { 16 uint32_t current; 17 uint32_t n; 18 uint8_t protocol; 19 nxt_http_upstream_connect_t connect; 20 nxt_sockaddr_t *sockaddr[1]; 21 }; 22 23 24 static void nxt_http_upstream_connect(nxt_task_t *task, 25 nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); 26 static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, 27 nxt_http_request_t *r, nxt_http_action_t *action); 28 static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); 29 static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); 30 static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); 31 static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); 32 static void nxt_http_proxy_request_send(nxt_task_t *task, 33 nxt_http_request_t *r, nxt_buf_t *out); 34 static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); 35 static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, 36 void *data); 37 static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); 38 39 40 static const nxt_http_request_state_t nxt_http_proxy_header_send_state; 41 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; 42 static const nxt_http_request_state_t nxt_http_proxy_header_read_state; 43 static const nxt_http_request_state_t nxt_http_proxy_read_state; 44 45 46 nxt_int_t 47 nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) 48 { 49 nxt_str_t name; 50 nxt_sockaddr_t *sa; 51 nxt_http_upstream_t *upstream; 52 53 sa = NULL; 54 name = action->name; 55 56 if (nxt_str_start(&name, "http://", 7)) { 57 name.length -= 7; 58 name.start += 7; 59 60 sa = nxt_sockaddr_parse(mp, &name); 61 if (nxt_slow_path(sa == NULL)) { 62 return NXT_ERROR; 63 } 64 65 sa->type = SOCK_STREAM; 66 } 67 68 if (sa != NULL) { 69 upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); 70 if (nxt_slow_path(upstream == NULL)) { 71 return NXT_ERROR; 72 } 73 74 upstream->current = 0; 75 upstream->n = 1; 76 upstream->protocol = NXT_HTTP_PROTO_H1; 77 upstream->connect = nxt_http_upstream_connect; 78 upstream->sockaddr[0] = sa; 79 80 action->u.upstream = upstream; 81 action->handler = nxt_http_proxy_handler; 82 } 83 84 return NXT_OK; 85 } 86 87 88 static nxt_http_action_t * 89 nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, 90 nxt_http_action_t *action) 91 { 92 nxt_http_peer_t *peer; 93 94 peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); 95 if (nxt_slow_path(peer == NULL)) { 96 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 97 return NULL; 98 } 99 100 peer->request = r; 101 r->peer = peer; 102 103 nxt_mp_retain(r->mem_pool); 104 105 action->u.upstream->connect(task, action->u.upstream, peer); 106 107 return NULL; 108 } 109 110 111 static void 112 nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, 113 nxt_http_peer_t *peer) 114 { 115 peer->protocol = upstream->protocol; 116 peer->sockaddr = upstream->sockaddr[0]; 117 118 peer->request->state = &nxt_http_proxy_header_send_state; 119 120 nxt_http_proto[peer->protocol].peer_connect(task, peer); 121 } 122 123 124 static const nxt_http_request_state_t nxt_http_proxy_header_send_state 125 nxt_aligned(64) = 126 { 127 .ready_handler = nxt_http_proxy_header_send, 128 .error_handler = nxt_http_proxy_error, 129 }; 130 131 132 static void 133 nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) 134 { 135 nxt_http_peer_t *peer; 136 nxt_http_request_t *r; 137 138 r = obj; 139 peer = data; 140 r->state = &nxt_http_proxy_header_sent_state; 141 142 nxt_http_proto[peer->protocol].peer_header_send(task, peer); 143 } 144 145 146 static const nxt_http_request_state_t nxt_http_proxy_header_sent_state 147 nxt_aligned(64) = 148 { 149 .ready_handler = nxt_http_proxy_header_sent, 150 .error_handler = nxt_http_proxy_error, 151 }; 152 153 154 static void 155 nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) 156 { 157 nxt_http_peer_t *peer; 158 nxt_http_request_t *r; 159 160 r = obj; 161 peer = data; 162 r->state = &nxt_http_proxy_header_read_state; 163 164 nxt_http_proto[peer->protocol].peer_header_read(task, peer); 165 } 166 167 168 static const nxt_http_request_state_t nxt_http_proxy_header_read_state 169 nxt_aligned(64) = 170 { 171 .ready_handler = nxt_http_proxy_header_read, 172 .error_handler = nxt_http_proxy_error, 173 }; 174 175 176 static void 177 nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) 178 { 179 nxt_http_peer_t *peer; 180 nxt_http_field_t *f, *field; 181 nxt_http_request_t *r; 182 183 r = obj; 184 peer = data; 185 186 r->status = peer->status; 187 188 nxt_debug(task, "http proxy status: %d", peer->status); 189 190 if (r->resp.content_length_n > 0) { 191 peer->remainder = r->resp.content_length_n; 192 } 193 194 nxt_list_each(field, peer->fields) { 195 196 nxt_debug(task, "http proxy header: \"%*s: %*s\"", 197 (size_t) field->name_length, field->name, 198 (size_t) field->value_length, field->value); 199 200 if (!field->skip) { 201 f = nxt_list_add(r->resp.fields); 202 if (nxt_slow_path(f == NULL)) { 203 nxt_http_proxy_error(task, r, peer); 204 return; 205 } 206 207 *f = *field; 208 } 209 210 } nxt_list_loop; 211 212 nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); 213 } 214 215 216 static void 217 nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) 218 { 219 nxt_buf_t *out; 220 nxt_http_peer_t *peer; 221 nxt_http_request_t *r; 222 223 r = obj; 224 peer = data; 225 out = peer->body; 226 227 if (out != NULL) { 228 peer->body = NULL; 229 nxt_http_proxy_request_send(task, r, out); 230 } 231 232 r->state = &nxt_http_proxy_read_state; 233 234 nxt_http_proto[peer->protocol].peer_read(task, peer); 235 } 236 237 238 static void 239 nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r, 240 nxt_buf_t *out) 241 { 242 size_t length; 243 244 if (r->peer->remainder > 0) { 245 length = nxt_buf_chain_length(out); 246 r->peer->remainder -= length; 247 } 248 249 nxt_http_request_send(task, r, out); 250 } 251 252 253 static const nxt_http_request_state_t nxt_http_proxy_read_state 254 nxt_aligned(64) = 255 { 256 .ready_handler = nxt_http_proxy_read, 257 .error_handler = nxt_http_proxy_error, 258 }; 259 260 261 static void 262 nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) 263 { 264 nxt_buf_t *out; 265 nxt_bool_t last; 266 nxt_http_peer_t *peer; 267 nxt_http_request_t *r; 268 269 r = obj; 270 peer = data; 271 out = peer->body; 272 peer->body = NULL; 273 last = nxt_buf_is_last(out); 274 275 nxt_http_proxy_request_send(task, r, out); 276 277 if (!last) { 278 nxt_http_proto[peer->protocol].peer_read(task, peer); 279 280 } else { 281 r->inconsistent = (peer->remainder != 0); 282 283 nxt_http_proto[peer->protocol].peer_close(task, peer); 284 285 nxt_mp_release(r->mem_pool); 286 } 287 } 288 289 290 nxt_buf_t * 291 nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, 292 size_t size) 293 { 294 nxt_buf_t *b; 295 296 b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); 297 if (nxt_fast_path(b != NULL)) { 298 b->completion_handler = nxt_http_proxy_buf_mem_completion; 299 b->parent = r; 300 nxt_mp_retain(r->mem_pool); 301 302 } else { 303 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 304 } 305 306 return b; 307 } 308 309 310 static void 311 nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) 312 { 313 nxt_buf_t *b, *next; 314 nxt_http_peer_t *peer; 315 nxt_http_request_t *r; 316 317 b = obj; 318 r = data; 319 320 peer = r->peer; 321 322 do { 323 next = b->next; 324 325 nxt_http_proxy_buf_mem_free(task, r, b); 326 327 b = next; 328 } while (b != NULL); 329 330 if (!peer->closed) { 331 nxt_http_proto[peer->protocol].peer_read(task, peer); 332 } 333 } 334 335 336 void 337 nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, 338 nxt_buf_t *b) 339 { 340 nxt_event_engine_buf_mem_free(task->thread->engine, b); 341 342 nxt_mp_release(r->mem_pool); 343 } 344 345 346 static void 347 nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) 348 { 349 nxt_http_peer_t *peer; 350 nxt_http_request_t *r; 351 352 r = obj; 353 peer = r->peer; 354 355 nxt_http_proto[peer->protocol].peer_close(task, peer); 356 357 nxt_mp_release(r->mem_pool); 358 359 nxt_http_request_error(task, r, peer->status); 360 } 361 362 363 nxt_int_t 364 nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) 365 { 366 nxt_http_request_t *r; 367 368 r = ctx; 369 370 r->resp.date = field; 371 372 return NXT_OK; 373 } 374 375 376 nxt_int_t 377 nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, 378 uintptr_t data) 379 { 380 nxt_off_t n; 381 nxt_http_request_t *r; 382 383 r = ctx; 384 385 r->resp.content_length = field; 386 387 n = nxt_off_t_parse(field->value, field->value_length); 388 389 if (nxt_fast_path(n >= 0)) { 390 r->resp.content_length_n = n; 391 } 392 393 return NXT_OK; 394 } 395 396 397 nxt_int_t 398 nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) 399 { 400 field->skip = 1; 401 402 return NXT_OK; 403 } 404