Deleted
Added
nxt_router.c (113:b0148ec28c4d) | nxt_router.c (115:bef7c075837b) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> |
9#include <nxt_conf.h> |
|
9#include <nxt_application.h> 10 11 | 10#include <nxt_application.h> 11 12 |
13typedef struct { 14 nxt_str_t application_type; 15 uint32_t application_workers; 16} nxt_router_listener_conf_t; 17 18 |
|
12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, 13 nxt_router_t *router); 14static void nxt_router_listen_sockets_sort(nxt_router_t *router, 15 nxt_router_temp_conf_t *tmcf); 16 | 19static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, 20 nxt_router_t *router); 21static void nxt_router_listen_sockets_sort(nxt_router_t *router, 22 nxt_router_temp_conf_t *tmcf); 23 |
17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, 18 nxt_router_temp_conf_t *tmcf); | 24static nxt_int_t nxt_router_conf_create(nxt_task_t *task, 25 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); |
19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, 20 nxt_router_temp_conf_t *tmcf); 21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, 22 nxt_sockaddr_t *sa); | 26static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, 27 nxt_router_temp_conf_t *tmcf); 28static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, 29 nxt_sockaddr_t *sa); |
23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, 24 nxt_mp_t *mp, uint32_t port); | |
25 26static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 28 const nxt_event_interface_t *interface); | 30 31static nxt_int_t nxt_router_engines_create(nxt_task_t *task, 32 nxt_router_t *router, nxt_router_temp_conf_t *tmcf, 33 const nxt_event_interface_t *interface); |
29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); 35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, | 34static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 35 nxt_router_engine_conf_t *recf); 36static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 37 nxt_router_engine_conf_t *recf); 38static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 39 nxt_router_engine_conf_t *recf); 40static void nxt_router_engine_socket_count(nxt_queue_t *sockets); 41static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp, |
36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 37 nxt_work_handler_t handler); | 42 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, 43 nxt_work_handler_t handler); |
38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, 39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); | 44static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, 45 nxt_queue_t *sockets); |
40 41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 42 nxt_router_temp_conf_t *tmcf); 43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 44 nxt_event_engine_t *engine); 45 46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); 47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); --- 25 unchanged lines hidden (view full) --- 73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 76 77 78nxt_int_t 79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) 80{ | 46 47static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, 48 nxt_router_temp_conf_t *tmcf); 49static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 50 nxt_event_engine_t *engine); 51 52static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); 53static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); --- 25 unchanged lines hidden (view full) --- 79static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 80static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); 81static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 82 83 84nxt_int_t 85nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) 86{ |
81 nxt_int_t ret; 82 nxt_router_t *router; 83 nxt_router_temp_conf_t *tmcf; 84 const nxt_event_interface_t *interface; | 87 nxt_int_t ret; 88 nxt_router_t *router; |
85 86 ret = nxt_app_http_init(task, rt); 87 if (nxt_slow_path(ret != NXT_OK)) { 88 return ret; 89 } 90 91 router = nxt_zalloc(sizeof(nxt_router_t)); 92 if (nxt_slow_path(router == NULL)) { 93 return NXT_ERROR; 94 } 95 96 nxt_queue_init(&router->engines); 97 nxt_queue_init(&router->sockets); 98 | 89 90 ret = nxt_app_http_init(task, rt); 91 if (nxt_slow_path(ret != NXT_OK)) { 92 return ret; 93 } 94 95 router = nxt_zalloc(sizeof(nxt_router_t)); 96 if (nxt_slow_path(router == NULL)) { 97 return NXT_ERROR; 98 } 99 100 nxt_queue_init(&router->engines); 101 nxt_queue_init(&router->sockets); 102 |
99 /**/ | 103 return NXT_OK; 104} |
100 | 105 |
106 107nxt_int_t 108nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router, 109 u_char *start, u_char *end) 110{ 111 nxt_int_t ret; 112 nxt_router_temp_conf_t *tmcf; 113 const nxt_event_interface_t *interface; 114 |
|
101 tmcf = nxt_router_temp_conf(task, router); 102 if (nxt_slow_path(tmcf == NULL)) { 103 return NXT_ERROR; 104 } 105 | 115 tmcf = nxt_router_temp_conf(task, router); 116 if (nxt_slow_path(tmcf == NULL)) { 117 return NXT_ERROR; 118 } 119 |
106 ret = nxt_router_stub_conf(task, tmcf); | 120 ret = nxt_router_conf_create(task, tmcf, start, end); |
107 if (nxt_slow_path(ret != NXT_OK)) { 108 return ret; 109 } 110 111 nxt_router_listen_sockets_sort(router, tmcf); 112 113 ret = nxt_router_listen_sockets_stub_create(task, tmcf); 114 if (nxt_slow_path(ret != NXT_OK)) { --- 12 unchanged lines hidden (view full) --- 127 return ret; 128 } 129 130 nxt_router_engines_post(tmcf); 131 132 nxt_queue_add(&router->sockets, &tmcf->updating); 133 nxt_queue_add(&router->sockets, &tmcf->creating); 134 | 121 if (nxt_slow_path(ret != NXT_OK)) { 122 return ret; 123 } 124 125 nxt_router_listen_sockets_sort(router, tmcf); 126 127 ret = nxt_router_listen_sockets_stub_create(task, tmcf); 128 if (nxt_slow_path(ret != NXT_OK)) { --- 12 unchanged lines hidden (view full) --- 141 return ret; 142 } 143 144 nxt_router_engines_post(tmcf); 145 146 nxt_queue_add(&router->sockets, &tmcf->updating); 147 nxt_queue_add(&router->sockets, &tmcf->creating); 148 |
149// nxt_mp_destroy(tmcf->mem_pool); 150 |
|
135 return NXT_OK; 136} 137 138 139static nxt_router_temp_conf_t * 140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) 141{ 142 nxt_mp_t *mp, *tmp; --- 48 unchanged lines hidden (view full) --- 191fail: 192 193 nxt_mp_destroy(mp); 194 195 return NULL; 196} 197 198 | 151 return NXT_OK; 152} 153 154 155static nxt_router_temp_conf_t * 156nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) 157{ 158 nxt_mp_t *mp, *tmp; --- 48 unchanged lines hidden (view full) --- 207fail: 208 209 nxt_mp_destroy(mp); 210 211 return NULL; 212} 213 214 |
215static nxt_conf_map_t nxt_router_conf[] = { 216 { 217 nxt_string("threads"), 218 NXT_CONF_MAP_INT32, 219 offsetof(nxt_router_conf_t, threads), 220 }, 221 222 { 223 nxt_null_string, 0, 0, 224 }, 225}; 226 227 228static nxt_conf_map_t nxt_router_listener_conf[] = { 229 { 230 nxt_string("_application_type"), 231 NXT_CONF_MAP_STR, 232 offsetof(nxt_router_listener_conf_t, application_type), 233 }, 234 235 { 236 nxt_string("_application_workers"), 237 NXT_CONF_MAP_INT32, 238 offsetof(nxt_router_listener_conf_t, application_workers), 239 }, 240 241 { 242 nxt_null_string, 0, 0, 243 }, 244}; 245 246 247static nxt_conf_map_t nxt_router_http_conf[] = { 248 { 249 nxt_string("header_buffer_size"), 250 NXT_CONF_MAP_SIZE, 251 offsetof(nxt_socket_conf_t, header_buffer_size), 252 }, 253 254 { 255 nxt_string("large_header_buffer_size"), 256 NXT_CONF_MAP_SIZE, 257 offsetof(nxt_socket_conf_t, large_header_buffer_size), 258 }, 259 260 { 261 nxt_string("header_read_timeout"), 262 NXT_CONF_MAP_MSEC, 263 offsetof(nxt_socket_conf_t, header_read_timeout), 264 }, 265 266 { 267 nxt_null_string, 0, 0, 268 }, 269}; 270 271 |
|
199static nxt_int_t | 272static nxt_int_t |
200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) | 273nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, 274 u_char *start, u_char *end) |
201{ | 275{ |
202 nxt_mp_t *mp; 203 nxt_sockaddr_t *sa; 204 nxt_socket_conf_t *skcf; | 276 nxt_mp_t *mp; 277 uint32_t next; 278 nxt_int_t ret; 279 nxt_str_t name; 280 nxt_sockaddr_t *sa; 281 nxt_conf_value_t *conf, *listeners, *router, *http, *listener; 282 nxt_socket_conf_t *skcf; 283 nxt_router_listener_conf_t lscf; |
205 | 284 |
206 tmcf->conf->threads = 1; | 285 static nxt_str_t router_path = nxt_string("/router"); 286 static nxt_str_t http_path = nxt_string("/http"); 287 static nxt_str_t listeners_path = nxt_string("/listeners"); |
207 | 288 |
289 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end); 290 if (conf == NULL) { 291 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); 292 return NXT_ERROR; 293 } 294 295 router = nxt_conf_get_path(conf, &router_path); 296 297 if (router == NULL) { 298 nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block"); 299 return NXT_ERROR; 300 } 301 302 ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf); 303 if (ret != NXT_OK) { 304 nxt_log(task, NXT_LOG_CRIT, "router map error"); 305 return NXT_ERROR; 306 } 307 308 http = nxt_conf_get_path(conf, &http_path); 309 310 if (http == NULL) { 311 nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block"); 312 return NXT_ERROR; 313 } 314 315 listeners = nxt_conf_get_path(conf, &listeners_path); 316 317 if (listeners == NULL) { 318 nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block"); 319 return NXT_ERROR; 320 } 321 |
|
208 mp = tmcf->conf->mem_pool; 209 | 322 mp = tmcf->conf->mem_pool; 323 |
210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); 211 skcf = nxt_router_socket_conf(task, mp, sa); | 324 next = 0; |
212 | 325 |
213 skcf->listen.handler = nxt_router_conn_init; 214 skcf->header_buffer_size = 2048; 215 skcf->large_header_buffer_size = 8192; 216 skcf->header_read_timeout = 5000; | 326 for ( ;; ) { 327 listener = nxt_conf_next_object_member(listeners, &name, &next); 328 if (listener == NULL) { 329 break; 330 } |
217 | 331 |
218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); | 332 sa = nxt_sockaddr_parse(mp, &name); 333 if (sa == NULL) { 334 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name); 335 return NXT_ERROR; 336 } |
219 | 337 |
220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); 221 skcf = nxt_router_socket_conf(task, mp, sa); | 338 sa->type = SOCK_STREAM; |
222 | 339 |
223 skcf->listen.handler = nxt_stream_connection_init; 224 skcf->header_read_timeout = 5000; | 340 nxt_debug(task, "router listener: \"%*s\"", 341 sa->length, nxt_sockaddr_start(sa)); |
225 | 342 |
226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); | 343 skcf = nxt_router_socket_conf(task, mp, sa); 344 if (skcf == NULL) { 345 return NXT_ERROR; 346 } |
227 | 347 |
348 ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf); 349 if (ret != NXT_OK) { 350 nxt_log(task, NXT_LOG_CRIT, "listener map error"); 351 return NXT_ERROR; 352 } 353 354 nxt_debug(task, "router type: %V", &lscf.application_type); 355 nxt_debug(task, "router workers: %D", lscf.application_workers); 356 357 ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf); 358 if (ret != NXT_OK) { 359 nxt_log(task, NXT_LOG_CRIT, "http map error"); 360 return NXT_ERROR; 361 } 362 363 skcf->listen.handler = nxt_router_conn_init; 364 skcf->router_conf = tmcf->conf; 365 366 nxt_queue_insert_tail(&tmcf->pending, &skcf->link); 367 } 368 |
|
228 return NXT_OK; 229} 230 231 232static nxt_socket_conf_t * 233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 234{ 235 nxt_socket_conf_t *conf; 236 237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 238 if (nxt_slow_path(conf == NULL)) { 239 return NULL; 240 } 241 | 369 return NXT_OK; 370} 371 372 373static nxt_socket_conf_t * 374nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 375{ 376 nxt_socket_conf_t *conf; 377 378 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 379 if (nxt_slow_path(conf == NULL)) { 380 return NULL; 381 } 382 |
383 conf->sockaddr = sa; 384 |
|
242 conf->listen.sockaddr = sa; 243 conf->listen.socklen = sa->socklen; 244 conf->listen.address_length = sa->length; 245 246 conf->listen.socket = -1; 247 conf->listen.backlog = NXT_LISTEN_BACKLOG; 248 conf->listen.flags = NXT_NONBLOCK; 249 conf->listen.read_after_accept = 1; 250 251 return conf; 252} 253 254 | 385 conf->listen.sockaddr = sa; 386 conf->listen.socklen = sa->socklen; 387 conf->listen.address_length = sa->length; 388 389 conf->listen.socket = -1; 390 conf->listen.backlog = NXT_LISTEN_BACKLOG; 391 conf->listen.flags = NXT_NONBLOCK; 392 conf->listen.read_after_accept = 1; 393 394 return conf; 395} 396 397 |
255static nxt_sockaddr_t * 256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port) 257{ 258 nxt_sockaddr_t *sa; 259 struct sockaddr_in sin; 260 261 nxt_memzero(&sin, sizeof(struct sockaddr_in)); 262 263 sin.sin_family = AF_INET; 264 sin.sin_port = htons(port); 265 266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, 267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); 268 if (nxt_slow_path(sa == NULL)) { 269 return NULL; 270 } 271 272 sa->type = SOCK_STREAM; 273 274 nxt_sockaddr_text(sa); 275 276 return sa; 277} 278 279 | |
280static void 281nxt_router_listen_sockets_sort(nxt_router_t *router, 282 nxt_router_temp_conf_t *tmcf) 283{ 284 nxt_queue_link_t *nqlk, *oqlk, *next; 285 nxt_socket_conf_t *nskcf, *oskcf; 286 287 for (nqlk = nxt_queue_first(&tmcf->pending); --- 4 unchanged lines hidden (view full) --- 292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); 293 294 for (oqlk = nxt_queue_first(&router->sockets); 295 oqlk != nxt_queue_tail(&router->sockets); 296 oqlk = nxt_queue_next(oqlk)) 297 { 298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); 299 | 398static void 399nxt_router_listen_sockets_sort(nxt_router_t *router, 400 nxt_router_temp_conf_t *tmcf) 401{ 402 nxt_queue_link_t *nqlk, *oqlk, *next; 403 nxt_socket_conf_t *nskcf, *oskcf; 404 405 for (nqlk = nxt_queue_first(&tmcf->pending); --- 4 unchanged lines hidden (view full) --- 410 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); 411 412 for (oqlk = nxt_queue_first(&router->sockets); 413 oqlk != nxt_queue_tail(&router->sockets); 414 oqlk = nxt_queue_next(oqlk)) 415 { 416 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); 417 |
300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, 301 oskcf->listen.sockaddr)) 302 { | 418 if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) { 419 nskcf->socket = oskcf->socket; 420 nskcf->listen.socket = oskcf->listen.socket; 421 |
303 nxt_queue_remove(oqlk); 304 nxt_queue_insert_tail(&tmcf->keeping, oqlk); 305 306 nxt_queue_remove(nqlk); 307 nxt_queue_insert_tail(&tmcf->updating, nqlk); 308 309 break; 310 } 311 } 312 } 313 314 nxt_queue_add(&tmcf->deleting, &router->sockets); | 422 nxt_queue_remove(oqlk); 423 nxt_queue_insert_tail(&tmcf->keeping, oqlk); 424 425 nxt_queue_remove(nqlk); 426 nxt_queue_insert_tail(&tmcf->updating, nqlk); 427 428 break; 429 } 430 } 431 } 432 433 nxt_queue_add(&tmcf->deleting, &router->sockets); |
434 nxt_queue_init(&router->sockets); |
|
315} 316 317 318static nxt_int_t 319nxt_router_listen_sockets_stub_create(nxt_task_t *task, 320 nxt_router_temp_conf_t *tmcf) 321{ | 435} 436 437 438static nxt_int_t 439nxt_router_listen_sockets_stub_create(nxt_task_t *task, 440 nxt_router_temp_conf_t *tmcf) 441{ |
322 nxt_queue_link_t *qlk, *nqlk; 323 nxt_socket_conf_t *skcf; | 442 nxt_int_t ret; 443 nxt_socket_t s; 444 nxt_queue_link_t *qlk, *nqlk; 445 nxt_socket_conf_t *skcf; 446 nxt_router_socket_t *rtsk; |
324 325 for (qlk = nxt_queue_first(&tmcf->pending); 326 qlk != nxt_queue_tail(&tmcf->pending); 327 qlk = nqlk) 328 { | 447 448 for (qlk = nxt_queue_first(&tmcf->pending); 449 qlk != nxt_queue_tail(&tmcf->pending); 450 qlk = nqlk) 451 { |
452 rtsk = nxt_malloc(sizeof(nxt_router_socket_t)); 453 if (nxt_slow_path(rtsk == NULL)) { 454 return NXT_ERROR; 455 } 456 457 rtsk->count = 0; 458 |
|
329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); | 459 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); |
460 skcf->socket = rtsk; |
|
330 | 461 |
331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { | 462 s = nxt_listen_socket_create0(task, skcf->sockaddr, NXT_NONBLOCK); 463 if (nxt_slow_path(s == -1)) { |
332 return NXT_ERROR; 333 } 334 | 464 return NXT_ERROR; 465 } 466 |
467 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); 468 if (nxt_slow_path(ret != NXT_OK)) { 469 return NXT_ERROR; 470 } 471 472 skcf->listen.socket = s; 473 474 rtsk->fd = s; 475 |
|
335 nqlk = nxt_queue_next(qlk); 336 nxt_queue_remove(qlk); 337 nxt_queue_insert_tail(&tmcf->creating, qlk); 338 } 339 340 return NXT_OK; 341} 342 343 344static nxt_int_t 345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 347{ | 476 nqlk = nxt_queue_next(qlk); 477 nxt_queue_remove(qlk); 478 nxt_queue_insert_tail(&tmcf->creating, qlk); 479 } 480 481 return NXT_OK; 482} 483 484 485static nxt_int_t 486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, 487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) 488{ |
348 nxt_mp_t *mp; | |
349 nxt_int_t ret; 350 nxt_uint_t n, threads; 351 nxt_queue_link_t *qlk; 352 nxt_router_engine_conf_t *recf; 353 | 489 nxt_int_t ret; 490 nxt_uint_t n, threads; 491 nxt_queue_link_t *qlk; 492 nxt_router_engine_conf_t *recf; 493 |
354 mp = tmcf->conf->mem_pool; | |
355 threads = tmcf->conf->threads; 356 357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 358 sizeof(nxt_router_engine_conf_t)); 359 if (nxt_slow_path(tmcf->engines == NULL)) { 360 return NXT_ERROR; 361 } 362 363 n = 0; 364 365 for (qlk = nxt_queue_first(&router->engines); 366 qlk != nxt_queue_tail(&router->engines); 367 qlk = nxt_queue_next(qlk)) 368 { 369 recf = nxt_array_zero_add(tmcf->engines); 370 if (nxt_slow_path(recf == NULL)) { 371 return NXT_ERROR; 372 } 373 | 494 threads = tmcf->conf->threads; 495 496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, 497 sizeof(nxt_router_engine_conf_t)); 498 if (nxt_slow_path(tmcf->engines == NULL)) { 499 return NXT_ERROR; 500 } 501 502 n = 0; 503 504 for (qlk = nxt_queue_first(&router->engines); 505 qlk != nxt_queue_tail(&router->engines); 506 qlk = nxt_queue_next(qlk)) 507 { 508 recf = nxt_array_zero_add(tmcf->engines); 509 if (nxt_slow_path(recf == NULL)) { 510 return NXT_ERROR; 511 } 512 |
374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); | 513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); |
375 // STUB 376 recf->task = recf->engine->task; 377 378 if (n < threads) { | 514 // STUB 515 recf->task = recf->engine->task; 516 517 if (n < threads) { |
379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); | 518 ret = nxt_router_engine_conf_update(tmcf, recf); |
380 381 } else { | 519 520 } else { |
382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); | 521 ret = nxt_router_engine_conf_delete(tmcf, recf); |
383 } 384 385 if (nxt_slow_path(ret != NXT_OK)) { 386 return ret; 387 } 388 389 n++; 390 } --- 8 unchanged lines hidden (view full) --- 399 400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 401 if (nxt_slow_path(recf->engine == NULL)) { 402 return NXT_ERROR; 403 } 404 // STUB 405 recf->task = recf->engine->task; 406 | 522 } 523 524 if (nxt_slow_path(ret != NXT_OK)) { 525 return ret; 526 } 527 528 n++; 529 } --- 8 unchanged lines hidden (view full) --- 538 539 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); 540 if (nxt_slow_path(recf->engine == NULL)) { 541 return NXT_ERROR; 542 } 543 // STUB 544 recf->task = recf->engine->task; 545 |
407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); | 546 ret = nxt_router_engine_conf_create(tmcf, recf); |
408 if (nxt_slow_path(ret != NXT_OK)) { 409 return ret; 410 } 411 | 547 if (nxt_slow_path(ret != NXT_OK)) { 548 return ret; 549 } 550 |
551 nxt_queue_insert_tail(&router->engines, &recf->engine->link0); 552 |
|
412 n++; 413 } 414 415 return NXT_OK; 416} 417 418 419static nxt_int_t | 553 n++; 554 } 555 556 return NXT_OK; 557} 558 559 560static nxt_int_t |
420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, 421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) | 561nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, 562 nxt_router_engine_conf_t *recf) |
422{ | 563{ |
423 nxt_int_t ret; | 564 nxt_mp_t *mp; 565 nxt_int_t ret; 566 nxt_thread_spinlock_t *lock; |
424 425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 426 if (nxt_slow_path(recf->creating == NULL)) { 427 return NXT_ERROR; 428 } 429 | 567 568 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 569 if (nxt_slow_path(recf->creating == NULL)) { 570 return NXT_ERROR; 571 } 572 |
430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, | 573 mp = tmcf->conf->mem_pool; 574 575 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, |
431 recf->creating, nxt_router_listen_socket_create); 432 if (nxt_slow_path(ret != NXT_OK)) { 433 return ret; 434 } 435 | 576 recf->creating, nxt_router_listen_socket_create); 577 if (nxt_slow_path(ret != NXT_OK)) { 578 return ret; 579 } 580 |
436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, | 581 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, |
437 recf->creating, nxt_router_listen_socket_create); | 582 recf->creating, nxt_router_listen_socket_create); |
583 if (nxt_slow_path(ret != NXT_OK)) { 584 return ret; 585 } 586 587 lock = &tmcf->conf->router->lock; 588 589 nxt_thread_spin_lock(lock); 590 591 nxt_router_engine_socket_count(&tmcf->creating); 592 nxt_router_engine_socket_count(&tmcf->updating); 593 594 nxt_thread_spin_unlock(lock); 595 596 return ret; |
|
438} 439 440 441static nxt_int_t | 597} 598 599 600static nxt_int_t |
442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, 443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) | 601nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, 602 nxt_router_engine_conf_t *recf) |
444{ | 603{ |
445 nxt_int_t ret; | 604 nxt_mp_t *mp; 605 nxt_int_t ret; 606 nxt_thread_spinlock_t *lock; |
446 447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 448 if (nxt_slow_path(recf->creating == NULL)) { 449 return NXT_ERROR; 450 } 451 | 607 608 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 609 if (nxt_slow_path(recf->creating == NULL)) { 610 return NXT_ERROR; 611 } 612 |
452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, | 613 mp = tmcf->conf->mem_pool; 614 615 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, |
453 recf->creating, nxt_router_listen_socket_create); 454 if (nxt_slow_path(ret != NXT_OK)) { 455 return ret; 456 } 457 458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 459 if (nxt_slow_path(recf->updating == NULL)) { 460 return NXT_ERROR; 461 } 462 | 616 recf->creating, nxt_router_listen_socket_create); 617 if (nxt_slow_path(ret != NXT_OK)) { 618 return ret; 619 } 620 621 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 622 if (nxt_slow_path(recf->updating == NULL)) { 623 return NXT_ERROR; 624 } 625 |
463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, | 626 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, |
464 recf->updating, nxt_router_listen_socket_update); 465 if (nxt_slow_path(ret != NXT_OK)) { 466 return ret; 467 } 468 469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 470 if (nxt_slow_path(recf->deleting == NULL)) { 471 return NXT_ERROR; 472 } 473 | 627 recf->updating, nxt_router_listen_socket_update); 628 if (nxt_slow_path(ret != NXT_OK)) { 629 return ret; 630 } 631 632 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 633 if (nxt_slow_path(recf->deleting == NULL)) { 634 return NXT_ERROR; 635 } 636 |
474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 475 recf->deleting); | 637 ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting); 638 if (nxt_slow_path(ret != NXT_OK)) { 639 return ret; 640 } 641 642 lock = &tmcf->conf->router->lock; 643 644 nxt_thread_spin_lock(lock); 645 646 nxt_router_engine_socket_count(&tmcf->creating); 647 648 nxt_thread_spin_unlock(lock); 649 650 return ret; |
476} 477 478 479static nxt_int_t | 651} 652 653 654static nxt_int_t |
480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, 481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) | 655nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, 656 nxt_router_engine_conf_t *recf) |
482{ 483 nxt_int_t ret; 484 485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 486 if (nxt_slow_path(recf->deleting == NULL)) { 487 return NXT_ERROR; 488 } 489 | 657{ 658 nxt_int_t ret; 659 660 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); 661 if (nxt_slow_path(recf->deleting == NULL)) { 662 return NXT_ERROR; 663 } 664 |
490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, 491 recf->deleting); | 665 ret = nxt_router_engine_joints_delete(recf, &tmcf->updating); |
492 if (nxt_slow_path(ret != NXT_OK)) { 493 return ret; 494 } 495 | 666 if (nxt_slow_path(ret != NXT_OK)) { 667 return ret; 668 } 669 |
496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, 497 recf->deleting); | 670 return nxt_router_engine_joints_delete(recf, &tmcf->deleting); |
498} 499 500 501static nxt_int_t | 671} 672 673 674static nxt_int_t |
502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, 503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, | 675nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, 676 nxt_queue_t *sockets, nxt_array_t *array, |
504 nxt_work_handler_t handler) 505{ 506 nxt_work_t *work; 507 nxt_queue_link_t *qlk; 508 nxt_socket_conf_joint_t *joint; 509 510 for (qlk = nxt_queue_first(sockets); 511 qlk != nxt_queue_tail(sockets); --- 14 unchanged lines hidden (view full) --- 526 return NXT_ERROR; 527 } 528 529 work->data = joint; 530 531 joint->count = 1; 532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 533 joint->engine = recf->engine; | 677 nxt_work_handler_t handler) 678{ 679 nxt_work_t *work; 680 nxt_queue_link_t *qlk; 681 nxt_socket_conf_joint_t *joint; 682 683 for (qlk = nxt_queue_first(sockets); 684 qlk != nxt_queue_tail(sockets); --- 14 unchanged lines hidden (view full) --- 699 return NXT_ERROR; 700 } 701 702 work->data = joint; 703 704 joint->count = 1; 705 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 706 joint->engine = recf->engine; |
707 708 nxt_queue_insert_tail(&joint->engine->joints, &joint->link); |
|
534 } 535 536 return NXT_OK; 537} 538 539 | 709 } 710 711 return NXT_OK; 712} 713 714 |
715static void 716nxt_router_engine_socket_count(nxt_queue_t *sockets) 717{ 718 nxt_queue_link_t *qlk; 719 nxt_socket_conf_t *skcf; 720 721 for (qlk = nxt_queue_first(sockets); 722 qlk != nxt_queue_tail(sockets); 723 qlk = nxt_queue_next(qlk)) 724 { 725 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); 726 skcf->socket->count++; 727 } 728} 729 730 |
|
540static nxt_int_t | 731static nxt_int_t |
541nxt_router_engine_joints_delete(nxt_task_t *task, 542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) | 732nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, 733 nxt_queue_t *sockets) |
543{ 544 nxt_work_t *work; 545 nxt_queue_link_t *qlk; 546 547 for (qlk = nxt_queue_first(sockets); 548 qlk != nxt_queue_tail(sockets); 549 qlk = nxt_queue_next(qlk)) 550 { | 734{ 735 nxt_work_t *work; 736 nxt_queue_link_t *qlk; 737 738 for (qlk = nxt_queue_first(sockets); 739 qlk != nxt_queue_tail(sockets); 740 qlk = nxt_queue_next(qlk)) 741 { |
551 work = nxt_array_add(array); | 742 work = nxt_array_add(recf->deleting); |
552 if (nxt_slow_path(work == NULL)) { 553 return NXT_ERROR; 554 } 555 556 work->next = NULL; 557 work->handler = nxt_router_listen_socket_delete; 558 work->task = &recf->task; 559 work->obj = recf->engine; --- 46 unchanged lines hidden (view full) --- 606 link->start = nxt_router_thread_start; 607 link->engine = engine; 608 link->work.handler = nxt_router_thread_exit_handler; 609 link->work.task = task; 610 link->work.data = link; 611 612 nxt_queue_insert_tail(&rt->engines, &engine->link); 613 | 743 if (nxt_slow_path(work == NULL)) { 744 return NXT_ERROR; 745 } 746 747 work->next = NULL; 748 work->handler = nxt_router_listen_socket_delete; 749 work->task = &recf->task; 750 work->obj = recf->engine; --- 46 unchanged lines hidden (view full) --- 797 link->start = nxt_router_thread_start; 798 link->engine = engine; 799 link->work.handler = nxt_router_thread_exit_handler; 800 link->work.task = task; 801 link->work.data = link; 802 803 nxt_queue_insert_tail(&rt->engines, &engine->link); 804 |
614 | |
615 process = nxt_runtime_process_find(rt, nxt_pid); 616 if (nxt_slow_path(process == NULL)) { 617 return NXT_ERROR; 618 } 619 620 port = nxt_process_port_new(process); 621 if (nxt_slow_path(port == NULL)) { 622 return NXT_ERROR; --- 12 unchanged lines hidden (view full) --- 635 port->mem_pool = mp; 636 port->engine = 0; 637 port->type = NXT_PROCESS_ROUTER; 638 639 engine->port = port; 640 641 nxt_runtime_port_add(rt, port); 642 | 805 process = nxt_runtime_process_find(rt, nxt_pid); 806 if (nxt_slow_path(process == NULL)) { 807 return NXT_ERROR; 808 } 809 810 port = nxt_process_port_new(process); 811 if (nxt_slow_path(port == NULL)) { 812 return NXT_ERROR; --- 12 unchanged lines hidden (view full) --- 825 port->mem_pool = mp; 826 port->engine = 0; 827 port->type = NXT_PROCESS_ROUTER; 828 829 engine->port = port; 830 831 nxt_runtime_port_add(rt, port); 832 |
643 | |
644 ret = nxt_thread_create(&handle, link); 645 646 if (nxt_slow_path(ret != NXT_OK)) { 647 nxt_queue_remove(&engine->link); 648 } 649 650 return ret; 651} --- 15 unchanged lines hidden (view full) --- 667 668 669static void 670nxt_router_engine_post(nxt_router_engine_conf_t *recf) 671{ 672 nxt_uint_t n; 673 nxt_work_t *work; 674 | 833 ret = nxt_thread_create(&handle, link); 834 835 if (nxt_slow_path(ret != NXT_OK)) { 836 nxt_queue_remove(&engine->link); 837 } 838 839 return ret; 840} --- 15 unchanged lines hidden (view full) --- 856 857 858static void 859nxt_router_engine_post(nxt_router_engine_conf_t *recf) 860{ 861 nxt_uint_t n; 862 nxt_work_t *work; 863 |
675 work = recf->creating->elts; | 864 if (recf->creating != NULL) { 865 work = recf->creating->elts; |
676 | 866 |
677 for (n = recf->creating->nelts; n != 0; n--) { 678 nxt_event_engine_post(recf->engine, work); 679 work++; | 867 for (n = recf->creating->nelts; n != 0; n--) { 868 nxt_event_engine_post(recf->engine, work); 869 work++; 870 } |
680 } | 871 } |
872 873 if (recf->updating != NULL) { 874 work = recf->updating->elts; 875 876 for (n = recf->updating->nelts; n != 0; n--) { 877 nxt_event_engine_post(recf->engine, work); 878 work++; 879 } 880 } 881 882 if (recf->deleting != NULL) { 883 work = recf->deleting->elts; 884 885 for (n = recf->deleting->nelts; n != 0; n--) { 886 nxt_event_engine_post(recf->engine, work); 887 work++; 888 } 889 } |
|
681} 682 683 684static void 685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 686 687nxt_port_handler_t nxt_router_process_port_handlers[] = { 688 NULL, --- 56 unchanged lines hidden (view full) --- 745 listen->socket.data = joint; 746} 747 748 749nxt_inline nxt_listen_event_t * 750nxt_router_listen_event(nxt_queue_t *listen_connections, 751 nxt_socket_conf_t *skcf) 752{ | 890} 891 892 893static void 894nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 895 896nxt_port_handler_t nxt_router_process_port_handlers[] = { 897 NULL, --- 56 unchanged lines hidden (view full) --- 954 listen->socket.data = joint; 955} 956 957 958nxt_inline nxt_listen_event_t * 959nxt_router_listen_event(nxt_queue_t *listen_connections, 960 nxt_socket_conf_t *skcf) 961{ |
753 nxt_socket_t socket; 754 nxt_queue_link_t *link; | 962 nxt_socket_t fd; 963 nxt_queue_link_t *qlk; |
755 nxt_listen_event_t *listen; 756 | 964 nxt_listen_event_t *listen; 965 |
757 socket = skcf->listen.socket; | 966 fd = skcf->socket->fd; |
758 | 967 |
759 for (link = nxt_queue_first(listen_connections); 760 link != nxt_queue_tail(listen_connections); 761 link = nxt_queue_next(link)) | 968 for (qlk = nxt_queue_first(listen_connections); 969 qlk != nxt_queue_tail(listen_connections); 970 qlk = nxt_queue_next(qlk)) |
762 { | 971 { |
763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link); | 972 listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link); |
764 | 973 |
765 if (socket == listen->socket.fd) { | 974 if (fd == listen->socket.fd) { |
766 return listen; 767 } 768 } 769 770 return NULL; 771} 772 773 --- 55 unchanged lines hidden (view full) --- 829 nxt_router_listen_socket_release(task, joint); 830} 831 832 833static void 834nxt_router_listen_socket_release(nxt_task_t *task, 835 nxt_socket_conf_joint_t *joint) 836{ | 975 return listen; 976 } 977 } 978 979 return NULL; 980} 981 982 --- 55 unchanged lines hidden (view full) --- 1038 nxt_router_listen_socket_release(task, joint); 1039} 1040 1041 1042static void 1043nxt_router_listen_socket_release(nxt_task_t *task, 1044 nxt_socket_conf_joint_t *joint) 1045{ |
837 nxt_socket_t s; 838 nxt_listen_socket_t *ls; | 1046 nxt_router_socket_t *rtsk; |
839 nxt_thread_spinlock_t *lock; 840 | 1047 nxt_thread_spinlock_t *lock; 1048 |
841 s = -1; 842 ls = &joint->socket_conf->listen; | 1049 rtsk = joint->socket_conf->socket; |
843 lock = &joint->socket_conf->router_conf->router->lock; 844 845 nxt_thread_spin_lock(lock); 846 | 1050 lock = &joint->socket_conf->router_conf->router->lock; 1051 1052 nxt_thread_spin_lock(lock); 1053 |
847 if (--ls->count == 0) { 848 s = ls->socket; 849 ls->socket = -1; | 1054 if (--rtsk->count != 0) { 1055 rtsk = NULL; |
850 } 851 852 nxt_thread_spin_unlock(lock); 853 | 1056 } 1057 1058 nxt_thread_spin_unlock(lock); 1059 |
854 if (s != -1) { 855 nxt_socket_close(task, s); | 1060 if (rtsk != NULL) { 1061 nxt_socket_close(task, rtsk->fd); 1062 nxt_free(rtsk); |
856 } 857 858 nxt_router_conf_release(task, joint); 859} 860 861 862static void 863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) --- 25 unchanged lines hidden (view full) --- 889 if (--rtcf->count != 0) { 890 rtcf = NULL; 891 } 892 } 893 894 nxt_thread_spin_unlock(lock); 895 896 if (rtcf != NULL) { | 1063 } 1064 1065 nxt_router_conf_release(task, joint); 1066} 1067 1068 1069static void 1070nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) --- 25 unchanged lines hidden (view full) --- 1096 if (--rtcf->count != 0) { 1097 rtcf = NULL; 1098 } 1099 } 1100 1101 nxt_thread_spin_unlock(lock); 1102 1103 if (rtcf != NULL) { |
1104 nxt_debug(task, "old router conf is destroyed"); |
|
897 nxt_mp_destroy(rtcf->mem_pool); 898 } 899 900 if (nxt_queue_is_empty(&joint->engine->joints)) { 901 nxt_thread_exit(task->thread); 902 } 903} 904 --- 494 unchanged lines hidden --- | 1105 nxt_mp_destroy(rtcf->mem_pool); 1106 } 1107 1108 if (nxt_queue_is_empty(&joint->engine->joints)) { 1109 nxt_thread_exit(task->thread); 1110 } 1111} 1112 --- 494 unchanged lines hidden --- |