11131Smax.romanov@nginx.com 21131Smax.romanov@nginx.com /* 31131Smax.romanov@nginx.com * Copyright (C) NGINX, Inc. 41131Smax.romanov@nginx.com */ 51131Smax.romanov@nginx.com 61131Smax.romanov@nginx.com #include <nxt_main.h> 71131Smax.romanov@nginx.com #include <nxt_router.h> 81131Smax.romanov@nginx.com #include <nxt_http.h> 91131Smax.romanov@nginx.com #include <nxt_router_request.h> 101131Smax.romanov@nginx.com #include <nxt_port_memory_int.h> 111131Smax.romanov@nginx.com #include <nxt_websocket.h> 121131Smax.romanov@nginx.com #include <nxt_websocket_header.h> 131131Smax.romanov@nginx.com 141131Smax.romanov@nginx.com 151131Smax.romanov@nginx.com static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); 161131Smax.romanov@nginx.com static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, 171131Smax.romanov@nginx.com void *data); 181131Smax.romanov@nginx.com 191131Smax.romanov@nginx.com 201131Smax.romanov@nginx.com const nxt_http_request_state_t nxt_http_websocket 211131Smax.romanov@nginx.com nxt_aligned(64) = 221131Smax.romanov@nginx.com { 231131Smax.romanov@nginx.com .ready_handler = nxt_http_websocket_client, 241131Smax.romanov@nginx.com .error_handler = nxt_http_websocket_error_handler, 251131Smax.romanov@nginx.com }; 261131Smax.romanov@nginx.com 271131Smax.romanov@nginx.com 281131Smax.romanov@nginx.com static void 291131Smax.romanov@nginx.com nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) 301131Smax.romanov@nginx.com { 311131Smax.romanov@nginx.com size_t frame_size, used_size, copy_size, buf_free_size; 321131Smax.romanov@nginx.com size_t chunk_copy_size; 331131Smax.romanov@nginx.com nxt_buf_t *out, *buf, **out_tail, *b, *next; 341131Smax.romanov@nginx.com nxt_int_t res; 351131Smax.romanov@nginx.com nxt_http_request_t *r; 361131Smax.romanov@nginx.com nxt_request_rpc_data_t *req_rpc_data; 371131Smax.romanov@nginx.com nxt_websocket_header_t *wsh; 381131Smax.romanov@nginx.com 391131Smax.romanov@nginx.com r = obj; 401547Smax.romanov@nginx.com req_rpc_data = r->req_rpc_data; 411131Smax.romanov@nginx.com 421547Smax.romanov@nginx.com if (nxt_slow_path(req_rpc_data == NULL)) { 431131Smax.romanov@nginx.com nxt_debug(task, "websocket client frame for destroyed request"); 441131Smax.romanov@nginx.com 451131Smax.romanov@nginx.com return; 461131Smax.romanov@nginx.com } 471131Smax.romanov@nginx.com 481131Smax.romanov@nginx.com nxt_debug(task, "http websocket client frame"); 491131Smax.romanov@nginx.com 501131Smax.romanov@nginx.com wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; 511131Smax.romanov@nginx.com 521131Smax.romanov@nginx.com frame_size = nxt_websocket_frame_header_size(wsh) 531131Smax.romanov@nginx.com + nxt_websocket_frame_payload_len(wsh); 541131Smax.romanov@nginx.com 551131Smax.romanov@nginx.com buf = NULL; 561131Smax.romanov@nginx.com buf_free_size = 0; 571131Smax.romanov@nginx.com out = NULL; 581131Smax.romanov@nginx.com out_tail = &out; 591131Smax.romanov@nginx.com 601131Smax.romanov@nginx.com b = r->ws_frame; 611131Smax.romanov@nginx.com 621131Smax.romanov@nginx.com while (b != NULL && frame_size > 0) { 631131Smax.romanov@nginx.com used_size = nxt_buf_mem_used_size(&b->mem); 641131Smax.romanov@nginx.com copy_size = nxt_min(used_size, frame_size); 651131Smax.romanov@nginx.com 661131Smax.romanov@nginx.com while (copy_size > 0) { 671131Smax.romanov@nginx.com if (buf == NULL || buf_free_size == 0) { 681131Smax.romanov@nginx.com buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); 691131Smax.romanov@nginx.com 701547Smax.romanov@nginx.com buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing, 711131Smax.romanov@nginx.com buf_free_size); 721131Smax.romanov@nginx.com 731131Smax.romanov@nginx.com *out_tail = buf; 741131Smax.romanov@nginx.com out_tail = &buf->next; 751131Smax.romanov@nginx.com } 761131Smax.romanov@nginx.com 771131Smax.romanov@nginx.com chunk_copy_size = nxt_min(buf_free_size, copy_size); 781131Smax.romanov@nginx.com 791131Smax.romanov@nginx.com buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, 801131Smax.romanov@nginx.com chunk_copy_size); 811131Smax.romanov@nginx.com 821131Smax.romanov@nginx.com copy_size -= chunk_copy_size; 831131Smax.romanov@nginx.com b->mem.pos += chunk_copy_size; 841131Smax.romanov@nginx.com buf_free_size -= chunk_copy_size; 851131Smax.romanov@nginx.com } 861131Smax.romanov@nginx.com 871131Smax.romanov@nginx.com frame_size -= copy_size; 881131Smax.romanov@nginx.com next = b->next; 891269Sigor@sysoev.ru b->next = NULL; 901131Smax.romanov@nginx.com 911131Smax.romanov@nginx.com if (nxt_buf_mem_used_size(&b->mem) == 0) { 921131Smax.romanov@nginx.com nxt_work_queue_add(&task->thread->engine->fast_work_queue, 931131Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 941131Smax.romanov@nginx.com 951131Smax.romanov@nginx.com r->ws_frame = next; 961131Smax.romanov@nginx.com } 971131Smax.romanov@nginx.com 981131Smax.romanov@nginx.com b = next; 991131Smax.romanov@nginx.com } 1001131Smax.romanov@nginx.com 101*1555Smax.romanov@nginx.com res = nxt_port_socket_write(task, req_rpc_data->app_port, 102*1555Smax.romanov@nginx.com NXT_PORT_MSG_WEBSOCKET, -1, 103*1555Smax.romanov@nginx.com req_rpc_data->stream, 104*1555Smax.romanov@nginx.com task->thread->engine->port->id, out); 1051131Smax.romanov@nginx.com if (nxt_slow_path(res != NXT_OK)) { 1061131Smax.romanov@nginx.com // TODO: handle 1071131Smax.romanov@nginx.com } 1081131Smax.romanov@nginx.com 1091131Smax.romanov@nginx.com b = r->ws_frame; 1101131Smax.romanov@nginx.com 1111131Smax.romanov@nginx.com if (b != NULL) { 1121131Smax.romanov@nginx.com used_size = nxt_buf_mem_used_size(&b->mem); 1131131Smax.romanov@nginx.com 1141131Smax.romanov@nginx.com if (used_size > 0) { 1151131Smax.romanov@nginx.com nxt_memmove(b->mem.start, b->mem.pos, used_size); 1161131Smax.romanov@nginx.com 1171131Smax.romanov@nginx.com b->mem.pos = b->mem.start; 1181131Smax.romanov@nginx.com b->mem.free = b->mem.start + used_size; 1191131Smax.romanov@nginx.com } 1201131Smax.romanov@nginx.com } 1211131Smax.romanov@nginx.com 1221131Smax.romanov@nginx.com nxt_http_request_ws_frame_start(task, r, r->ws_frame); 1231131Smax.romanov@nginx.com } 1241131Smax.romanov@nginx.com 1251131Smax.romanov@nginx.com 1261131Smax.romanov@nginx.com static void 1271131Smax.romanov@nginx.com nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) 1281131Smax.romanov@nginx.com { 1291131Smax.romanov@nginx.com nxt_http_request_t *r; 1301131Smax.romanov@nginx.com nxt_request_rpc_data_t *req_rpc_data; 1311131Smax.romanov@nginx.com 1321131Smax.romanov@nginx.com nxt_debug(task, "http websocket error handler"); 1331131Smax.romanov@nginx.com 1341131Smax.romanov@nginx.com r = obj; 1351547Smax.romanov@nginx.com req_rpc_data = r->req_rpc_data; 1361131Smax.romanov@nginx.com 1371547Smax.romanov@nginx.com if (req_rpc_data == NULL) { 1381131Smax.romanov@nginx.com nxt_debug(task, " req_rpc_data is NULL"); 1391131Smax.romanov@nginx.com goto close_handler; 1401131Smax.romanov@nginx.com } 1411131Smax.romanov@nginx.com 1421547Smax.romanov@nginx.com if (req_rpc_data->app_port == NULL) { 1431131Smax.romanov@nginx.com nxt_debug(task, " app_port is NULL"); 1441131Smax.romanov@nginx.com goto close_handler; 1451131Smax.romanov@nginx.com } 1461131Smax.romanov@nginx.com 147*1555Smax.romanov@nginx.com (void) nxt_port_socket_write(task, req_rpc_data->app_port, 148*1555Smax.romanov@nginx.com NXT_PORT_MSG_WEBSOCKET_LAST, 149*1555Smax.romanov@nginx.com -1, req_rpc_data->stream, 150*1555Smax.romanov@nginx.com task->thread->engine->port->id, NULL); 1511131Smax.romanov@nginx.com 1521131Smax.romanov@nginx.com close_handler: 1531131Smax.romanov@nginx.com 1541131Smax.romanov@nginx.com nxt_http_request_close_handler(task, obj, data); 1551131Smax.romanov@nginx.com } 156