1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <nxt_main.h> 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 #include <nxt_router_request.h> 10 #include <nxt_port_memory_int.h> 11 #include <nxt_websocket.h> 12 #include <nxt_websocket_header.h> 13 14 15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); 16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, 17 void *data); 18 19 20 const nxt_http_request_state_t nxt_http_websocket 21 nxt_aligned(64) = 22 { 23 .ready_handler = nxt_http_websocket_client, 24 .error_handler = nxt_http_websocket_error_handler, 25 }; 26 27 28 static void 29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) 30 { 31 size_t frame_size, used_size, copy_size, buf_free_size; 32 size_t chunk_copy_size; 33 nxt_buf_t *out, *buf, **out_tail, *b, *next; 34 nxt_int_t res; 35 nxt_http_request_t *r; 36 nxt_request_app_link_t *req_app_link; 37 nxt_request_rpc_data_t *req_rpc_data; 38 nxt_websocket_header_t *wsh; 39 40 r = obj; 41 42 if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL 43 || (req_app_link = req_rpc_data->req_app_link) == NULL)) 44 { 45 nxt_debug(task, "websocket client frame for destroyed request"); 46 47 return; 48 } 49 50 nxt_debug(task, "http websocket client frame"); 51 52 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; 53 54 frame_size = nxt_websocket_frame_header_size(wsh) 55 + nxt_websocket_frame_payload_len(wsh); 56 57 buf = NULL; 58 buf_free_size = 0; 59 out = NULL; 60 out_tail = &out; 61 62 b = r->ws_frame; 63 64 while (b != NULL && frame_size > 0) { 65 used_size = nxt_buf_mem_used_size(&b->mem); 66 copy_size = nxt_min(used_size, frame_size); 67 68 while (copy_size > 0) { 69 if (buf == NULL || buf_free_size == 0) { 70 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); 71 72 buf = nxt_port_mmap_get_buf(task, req_app_link->app_port, 73 buf_free_size); 74 75 *out_tail = buf; 76 out_tail = &buf->next; 77 } 78 79 chunk_copy_size = nxt_min(buf_free_size, copy_size); 80 81 buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, 82 chunk_copy_size); 83 84 copy_size -= chunk_copy_size; 85 b->mem.pos += chunk_copy_size; 86 buf_free_size -= chunk_copy_size; 87 } 88 89 frame_size -= copy_size; 90 next = b->next; 91 b->next = NULL; 92 93 if (nxt_buf_mem_used_size(&b->mem) == 0) { 94 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 95 b->completion_handler, task, b, b->parent); 96 97 r->ws_frame = next; 98 } 99 100 b = next; 101 } 102 103 res = nxt_port_socket_twrite(task, req_app_link->app_port, 104 NXT_PORT_MSG_WEBSOCKET, -1, 105 req_app_link->stream, 106 req_app_link->reply_port->id, out, NULL); 107 if (nxt_slow_path(res != NXT_OK)) { 108 // TODO: handle 109 } 110 111 b = r->ws_frame; 112 113 if (b != NULL) { 114 used_size = nxt_buf_mem_used_size(&b->mem); 115 116 if (used_size > 0) { 117 nxt_memmove(b->mem.start, b->mem.pos, used_size); 118 119 b->mem.pos = b->mem.start; 120 b->mem.free = b->mem.start + used_size; 121 } 122 } 123 124 nxt_http_request_ws_frame_start(task, r, r->ws_frame); 125 } 126 127 128 static void 129 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) 130 { 131 nxt_http_request_t *r; 132 nxt_request_app_link_t *req_app_link; 133 nxt_request_rpc_data_t *req_rpc_data; 134 135 nxt_debug(task, "http websocket error handler"); 136 137 r = obj; 138 139 if ((req_rpc_data = r->req_rpc_data) == NULL) { 140 nxt_debug(task, " req_rpc_data is NULL"); 141 goto close_handler; 142 } 143 144 if ((req_app_link = req_rpc_data->req_app_link) == NULL) { 145 nxt_debug(task, " req_app_link is NULL"); 146 goto close_handler; 147 } 148 149 if (req_app_link->app_port == NULL) { 150 nxt_debug(task, " app_port is NULL"); 151 goto close_handler; 152 } 153 154 (void) nxt_port_socket_twrite(task, req_app_link->app_port, 155 NXT_PORT_MSG_WEBSOCKET_LAST, 156 -1, req_app_link->stream, 157 req_app_link->reply_port->id, NULL, NULL); 158 159 close_handler: 160 161 nxt_http_request_close_handler(task, obj, data); 162 } 163