xref: /unit/src/nxt_http_websocket.c (revision 1269)
11131Smax.romanov@nginx.com 
21131Smax.romanov@nginx.com /*
31131Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
41131Smax.romanov@nginx.com  */
51131Smax.romanov@nginx.com 
61131Smax.romanov@nginx.com #include <nxt_main.h>
71131Smax.romanov@nginx.com #include <nxt_router.h>
81131Smax.romanov@nginx.com #include <nxt_http.h>
91131Smax.romanov@nginx.com #include <nxt_router_request.h>
101131Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
111131Smax.romanov@nginx.com #include <nxt_websocket.h>
121131Smax.romanov@nginx.com #include <nxt_websocket_header.h>
131131Smax.romanov@nginx.com 
141131Smax.romanov@nginx.com 
151131Smax.romanov@nginx.com static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
161131Smax.romanov@nginx.com static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
171131Smax.romanov@nginx.com     void *data);
181131Smax.romanov@nginx.com 
191131Smax.romanov@nginx.com 
201131Smax.romanov@nginx.com const nxt_http_request_state_t  nxt_http_websocket
211131Smax.romanov@nginx.com     nxt_aligned(64) =
221131Smax.romanov@nginx.com {
231131Smax.romanov@nginx.com     .ready_handler = nxt_http_websocket_client,
241131Smax.romanov@nginx.com     .error_handler = nxt_http_websocket_error_handler,
251131Smax.romanov@nginx.com };
261131Smax.romanov@nginx.com 
271131Smax.romanov@nginx.com 
281131Smax.romanov@nginx.com static void
291131Smax.romanov@nginx.com nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
301131Smax.romanov@nginx.com {
311131Smax.romanov@nginx.com     size_t                  frame_size, used_size, copy_size, buf_free_size;
321131Smax.romanov@nginx.com     size_t                  chunk_copy_size;
331131Smax.romanov@nginx.com     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
341131Smax.romanov@nginx.com     nxt_int_t               res;
351131Smax.romanov@nginx.com     nxt_http_request_t      *r;
361131Smax.romanov@nginx.com     nxt_request_app_link_t  *req_app_link;
371131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
381131Smax.romanov@nginx.com     nxt_websocket_header_t  *wsh;
391131Smax.romanov@nginx.com 
401131Smax.romanov@nginx.com     r = obj;
411131Smax.romanov@nginx.com 
421131Smax.romanov@nginx.com     if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
431131Smax.romanov@nginx.com          || (req_app_link = req_rpc_data->req_app_link) == NULL))
441131Smax.romanov@nginx.com     {
451131Smax.romanov@nginx.com         nxt_debug(task, "websocket client frame for destroyed request");
461131Smax.romanov@nginx.com 
471131Smax.romanov@nginx.com         return;
481131Smax.romanov@nginx.com     }
491131Smax.romanov@nginx.com 
501131Smax.romanov@nginx.com     nxt_debug(task, "http websocket client frame");
511131Smax.romanov@nginx.com 
521131Smax.romanov@nginx.com     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
531131Smax.romanov@nginx.com 
541131Smax.romanov@nginx.com     frame_size = nxt_websocket_frame_header_size(wsh)
551131Smax.romanov@nginx.com                   + nxt_websocket_frame_payload_len(wsh);
561131Smax.romanov@nginx.com 
571131Smax.romanov@nginx.com     buf = NULL;
581131Smax.romanov@nginx.com     buf_free_size = 0;
591131Smax.romanov@nginx.com     out = NULL;
601131Smax.romanov@nginx.com     out_tail = &out;
611131Smax.romanov@nginx.com 
621131Smax.romanov@nginx.com     b = r->ws_frame;
631131Smax.romanov@nginx.com 
641131Smax.romanov@nginx.com     while (b != NULL && frame_size > 0) {
651131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
661131Smax.romanov@nginx.com         copy_size = nxt_min(used_size, frame_size);
671131Smax.romanov@nginx.com 
681131Smax.romanov@nginx.com         while (copy_size > 0) {
691131Smax.romanov@nginx.com             if (buf == NULL || buf_free_size == 0) {
701131Smax.romanov@nginx.com                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
711131Smax.romanov@nginx.com 
721131Smax.romanov@nginx.com                 buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
731131Smax.romanov@nginx.com                                             buf_free_size);
741131Smax.romanov@nginx.com 
751131Smax.romanov@nginx.com                 *out_tail = buf;
761131Smax.romanov@nginx.com                 out_tail = &buf->next;
771131Smax.romanov@nginx.com             }
781131Smax.romanov@nginx.com 
791131Smax.romanov@nginx.com             chunk_copy_size = nxt_min(buf_free_size, copy_size);
801131Smax.romanov@nginx.com 
811131Smax.romanov@nginx.com             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
821131Smax.romanov@nginx.com                                        chunk_copy_size);
831131Smax.romanov@nginx.com 
841131Smax.romanov@nginx.com             copy_size -= chunk_copy_size;
851131Smax.romanov@nginx.com             b->mem.pos += chunk_copy_size;
861131Smax.romanov@nginx.com             buf_free_size -= chunk_copy_size;
871131Smax.romanov@nginx.com         }
881131Smax.romanov@nginx.com 
891131Smax.romanov@nginx.com         frame_size -= copy_size;
901131Smax.romanov@nginx.com         next = b->next;
91*1269Sigor@sysoev.ru         b->next = NULL;
921131Smax.romanov@nginx.com 
931131Smax.romanov@nginx.com         if (nxt_buf_mem_used_size(&b->mem) == 0) {
941131Smax.romanov@nginx.com             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
951131Smax.romanov@nginx.com                                b->completion_handler, task, b, b->parent);
961131Smax.romanov@nginx.com 
971131Smax.romanov@nginx.com             r->ws_frame = next;
981131Smax.romanov@nginx.com         }
991131Smax.romanov@nginx.com 
1001131Smax.romanov@nginx.com         b = next;
1011131Smax.romanov@nginx.com     }
1021131Smax.romanov@nginx.com 
1031131Smax.romanov@nginx.com     res = nxt_port_socket_twrite(task, req_app_link->app_port,
1041131Smax.romanov@nginx.com                                  NXT_PORT_MSG_WEBSOCKET, -1,
1051131Smax.romanov@nginx.com                                  req_app_link->stream,
1061131Smax.romanov@nginx.com                                  req_app_link->reply_port->id, out, NULL);
1071131Smax.romanov@nginx.com     if (nxt_slow_path(res != NXT_OK)) {
1081131Smax.romanov@nginx.com         // TODO: handle
1091131Smax.romanov@nginx.com     }
1101131Smax.romanov@nginx.com 
1111131Smax.romanov@nginx.com     b = r->ws_frame;
1121131Smax.romanov@nginx.com 
1131131Smax.romanov@nginx.com     if (b != NULL) {
1141131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
1151131Smax.romanov@nginx.com 
1161131Smax.romanov@nginx.com         if (used_size > 0) {
1171131Smax.romanov@nginx.com             nxt_memmove(b->mem.start, b->mem.pos, used_size);
1181131Smax.romanov@nginx.com 
1191131Smax.romanov@nginx.com             b->mem.pos = b->mem.start;
1201131Smax.romanov@nginx.com             b->mem.free = b->mem.start + used_size;
1211131Smax.romanov@nginx.com         }
1221131Smax.romanov@nginx.com     }
1231131Smax.romanov@nginx.com 
1241131Smax.romanov@nginx.com     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
1251131Smax.romanov@nginx.com }
1261131Smax.romanov@nginx.com 
1271131Smax.romanov@nginx.com 
1281131Smax.romanov@nginx.com static void
1291131Smax.romanov@nginx.com nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
1301131Smax.romanov@nginx.com {
1311131Smax.romanov@nginx.com     nxt_http_request_t      *r;
1321131Smax.romanov@nginx.com     nxt_request_app_link_t  *req_app_link;
1331131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
1341131Smax.romanov@nginx.com 
1351131Smax.romanov@nginx.com     nxt_debug(task, "http websocket error handler");
1361131Smax.romanov@nginx.com 
1371131Smax.romanov@nginx.com     r = obj;
1381131Smax.romanov@nginx.com 
1391131Smax.romanov@nginx.com     if ((req_rpc_data = r->req_rpc_data) == NULL) {
1401131Smax.romanov@nginx.com         nxt_debug(task, "  req_rpc_data is NULL");
1411131Smax.romanov@nginx.com         goto close_handler;
1421131Smax.romanov@nginx.com     }
1431131Smax.romanov@nginx.com 
1441131Smax.romanov@nginx.com     if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
1451131Smax.romanov@nginx.com         nxt_debug(task, "  req_app_link is NULL");
1461131Smax.romanov@nginx.com         goto close_handler;
1471131Smax.romanov@nginx.com     }
1481131Smax.romanov@nginx.com 
1491131Smax.romanov@nginx.com     if (req_app_link->app_port == NULL) {
1501131Smax.romanov@nginx.com         nxt_debug(task, "  app_port is NULL");
1511131Smax.romanov@nginx.com         goto close_handler;
1521131Smax.romanov@nginx.com     }
1531131Smax.romanov@nginx.com 
1541131Smax.romanov@nginx.com     (void) nxt_port_socket_twrite(task, req_app_link->app_port,
1551131Smax.romanov@nginx.com                                   NXT_PORT_MSG_WEBSOCKET_LAST,
1561131Smax.romanov@nginx.com                                   -1, req_app_link->stream,
1571131Smax.romanov@nginx.com                                   req_app_link->reply_port->id, NULL, NULL);
1581131Smax.romanov@nginx.com 
1591131Smax.romanov@nginx.com close_handler:
1601131Smax.romanov@nginx.com 
1611131Smax.romanov@nginx.com     nxt_http_request_close_handler(task, obj, data);
1621131Smax.romanov@nginx.com }
163