xref: /unit/src/nxt_http_websocket.c (revision 1547)
11131Smax.romanov@nginx.com 
21131Smax.romanov@nginx.com /*
31131Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
41131Smax.romanov@nginx.com  */
51131Smax.romanov@nginx.com 
61131Smax.romanov@nginx.com #include <nxt_main.h>
71131Smax.romanov@nginx.com #include <nxt_router.h>
81131Smax.romanov@nginx.com #include <nxt_http.h>
91131Smax.romanov@nginx.com #include <nxt_router_request.h>
101131Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
111131Smax.romanov@nginx.com #include <nxt_websocket.h>
121131Smax.romanov@nginx.com #include <nxt_websocket_header.h>
131131Smax.romanov@nginx.com 
141131Smax.romanov@nginx.com 
151131Smax.romanov@nginx.com static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
161131Smax.romanov@nginx.com static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
171131Smax.romanov@nginx.com     void *data);
181131Smax.romanov@nginx.com 
191131Smax.romanov@nginx.com 
201131Smax.romanov@nginx.com const nxt_http_request_state_t  nxt_http_websocket
211131Smax.romanov@nginx.com     nxt_aligned(64) =
221131Smax.romanov@nginx.com {
231131Smax.romanov@nginx.com     .ready_handler = nxt_http_websocket_client,
241131Smax.romanov@nginx.com     .error_handler = nxt_http_websocket_error_handler,
251131Smax.romanov@nginx.com };
261131Smax.romanov@nginx.com 
271131Smax.romanov@nginx.com 
281131Smax.romanov@nginx.com static void
291131Smax.romanov@nginx.com nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
301131Smax.romanov@nginx.com {
311131Smax.romanov@nginx.com     size_t                  frame_size, used_size, copy_size, buf_free_size;
321131Smax.romanov@nginx.com     size_t                  chunk_copy_size;
331131Smax.romanov@nginx.com     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
341131Smax.romanov@nginx.com     nxt_int_t               res;
351131Smax.romanov@nginx.com     nxt_http_request_t      *r;
361131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
371131Smax.romanov@nginx.com     nxt_websocket_header_t  *wsh;
381131Smax.romanov@nginx.com 
391131Smax.romanov@nginx.com     r = obj;
40*1547Smax.romanov@nginx.com     req_rpc_data = r->req_rpc_data;
411131Smax.romanov@nginx.com 
42*1547Smax.romanov@nginx.com     if (nxt_slow_path(req_rpc_data == NULL)) {
431131Smax.romanov@nginx.com         nxt_debug(task, "websocket client frame for destroyed request");
441131Smax.romanov@nginx.com 
451131Smax.romanov@nginx.com         return;
461131Smax.romanov@nginx.com     }
471131Smax.romanov@nginx.com 
481131Smax.romanov@nginx.com     nxt_debug(task, "http websocket client frame");
491131Smax.romanov@nginx.com 
501131Smax.romanov@nginx.com     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
511131Smax.romanov@nginx.com 
521131Smax.romanov@nginx.com     frame_size = nxt_websocket_frame_header_size(wsh)
531131Smax.romanov@nginx.com                   + nxt_websocket_frame_payload_len(wsh);
541131Smax.romanov@nginx.com 
551131Smax.romanov@nginx.com     buf = NULL;
561131Smax.romanov@nginx.com     buf_free_size = 0;
571131Smax.romanov@nginx.com     out = NULL;
581131Smax.romanov@nginx.com     out_tail = &out;
591131Smax.romanov@nginx.com 
601131Smax.romanov@nginx.com     b = r->ws_frame;
611131Smax.romanov@nginx.com 
621131Smax.romanov@nginx.com     while (b != NULL && frame_size > 0) {
631131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
641131Smax.romanov@nginx.com         copy_size = nxt_min(used_size, frame_size);
651131Smax.romanov@nginx.com 
661131Smax.romanov@nginx.com         while (copy_size > 0) {
671131Smax.romanov@nginx.com             if (buf == NULL || buf_free_size == 0) {
681131Smax.romanov@nginx.com                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
691131Smax.romanov@nginx.com 
70*1547Smax.romanov@nginx.com                 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
711131Smax.romanov@nginx.com                                             buf_free_size);
721131Smax.romanov@nginx.com 
731131Smax.romanov@nginx.com                 *out_tail = buf;
741131Smax.romanov@nginx.com                 out_tail = &buf->next;
751131Smax.romanov@nginx.com             }
761131Smax.romanov@nginx.com 
771131Smax.romanov@nginx.com             chunk_copy_size = nxt_min(buf_free_size, copy_size);
781131Smax.romanov@nginx.com 
791131Smax.romanov@nginx.com             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
801131Smax.romanov@nginx.com                                        chunk_copy_size);
811131Smax.romanov@nginx.com 
821131Smax.romanov@nginx.com             copy_size -= chunk_copy_size;
831131Smax.romanov@nginx.com             b->mem.pos += chunk_copy_size;
841131Smax.romanov@nginx.com             buf_free_size -= chunk_copy_size;
851131Smax.romanov@nginx.com         }
861131Smax.romanov@nginx.com 
871131Smax.romanov@nginx.com         frame_size -= copy_size;
881131Smax.romanov@nginx.com         next = b->next;
891269Sigor@sysoev.ru         b->next = NULL;
901131Smax.romanov@nginx.com 
911131Smax.romanov@nginx.com         if (nxt_buf_mem_used_size(&b->mem) == 0) {
921131Smax.romanov@nginx.com             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
931131Smax.romanov@nginx.com                                b->completion_handler, task, b, b->parent);
941131Smax.romanov@nginx.com 
951131Smax.romanov@nginx.com             r->ws_frame = next;
961131Smax.romanov@nginx.com         }
971131Smax.romanov@nginx.com 
981131Smax.romanov@nginx.com         b = next;
991131Smax.romanov@nginx.com     }
1001131Smax.romanov@nginx.com 
101*1547Smax.romanov@nginx.com     res = nxt_port_socket_twrite(task, req_rpc_data->app_port,
1021131Smax.romanov@nginx.com                                  NXT_PORT_MSG_WEBSOCKET, -1,
103*1547Smax.romanov@nginx.com                                  req_rpc_data->stream,
104*1547Smax.romanov@nginx.com                                  task->thread->engine->port->id, out, NULL);
1051131Smax.romanov@nginx.com     if (nxt_slow_path(res != NXT_OK)) {
1061131Smax.romanov@nginx.com         // TODO: handle
1071131Smax.romanov@nginx.com     }
1081131Smax.romanov@nginx.com 
1091131Smax.romanov@nginx.com     b = r->ws_frame;
1101131Smax.romanov@nginx.com 
1111131Smax.romanov@nginx.com     if (b != NULL) {
1121131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
1131131Smax.romanov@nginx.com 
1141131Smax.romanov@nginx.com         if (used_size > 0) {
1151131Smax.romanov@nginx.com             nxt_memmove(b->mem.start, b->mem.pos, used_size);
1161131Smax.romanov@nginx.com 
1171131Smax.romanov@nginx.com             b->mem.pos = b->mem.start;
1181131Smax.romanov@nginx.com             b->mem.free = b->mem.start + used_size;
1191131Smax.romanov@nginx.com         }
1201131Smax.romanov@nginx.com     }
1211131Smax.romanov@nginx.com 
1221131Smax.romanov@nginx.com     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
1231131Smax.romanov@nginx.com }
1241131Smax.romanov@nginx.com 
1251131Smax.romanov@nginx.com 
1261131Smax.romanov@nginx.com static void
1271131Smax.romanov@nginx.com nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
1281131Smax.romanov@nginx.com {
1291131Smax.romanov@nginx.com     nxt_http_request_t      *r;
1301131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
1311131Smax.romanov@nginx.com 
1321131Smax.romanov@nginx.com     nxt_debug(task, "http websocket error handler");
1331131Smax.romanov@nginx.com 
1341131Smax.romanov@nginx.com     r = obj;
135*1547Smax.romanov@nginx.com     req_rpc_data = r->req_rpc_data;
1361131Smax.romanov@nginx.com 
137*1547Smax.romanov@nginx.com     if (req_rpc_data == NULL) {
1381131Smax.romanov@nginx.com         nxt_debug(task, "  req_rpc_data is NULL");
1391131Smax.romanov@nginx.com         goto close_handler;
1401131Smax.romanov@nginx.com     }
1411131Smax.romanov@nginx.com 
142*1547Smax.romanov@nginx.com     if (req_rpc_data->app_port == NULL) {
1431131Smax.romanov@nginx.com         nxt_debug(task, "  app_port is NULL");
1441131Smax.romanov@nginx.com         goto close_handler;
1451131Smax.romanov@nginx.com     }
1461131Smax.romanov@nginx.com 
147*1547Smax.romanov@nginx.com     (void) nxt_port_socket_twrite(task, req_rpc_data->app_port,
1481131Smax.romanov@nginx.com                                   NXT_PORT_MSG_WEBSOCKET_LAST,
149*1547Smax.romanov@nginx.com                                   -1, req_rpc_data->stream,
150*1547Smax.romanov@nginx.com                                   task->thread->engine->port->id, NULL, NULL);
1511131Smax.romanov@nginx.com 
1521131Smax.romanov@nginx.com close_handler:
1531131Smax.romanov@nginx.com 
1541131Smax.romanov@nginx.com     nxt_http_request_close_handler(task, obj, data);
1551131Smax.romanov@nginx.com }
156