1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /* A cache time resolution is 10ms. */
11 #define nxt_cache_time(thr) \
12 (uint64_t) (nxt_thread_time(thr) * 100)
13
14
15 static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data);
16 static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache,
17 nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
18 static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache,
19 nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
20 static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache,
21 nxt_cache_query_t *q);
22
23 static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data);
24 static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data);
25 static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data);
26 static ssize_t nxt_cache_release_locked(nxt_cache_t *cache,
27 nxt_cache_query_t *q, u_char *buf, size_t size);
28
29 static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache);
30 static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node,
31 nxt_bool_t fast);
32 static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache,
33 nxt_bool_t *slow);
34 static void nxt_cache_query_wait_free(nxt_cache_t *cache,
35 nxt_cache_query_wait_t *qw);
36
37
38 /* STUB */
39 nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool);
40 static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc);
41 /**/
42
43
44 nxt_int_t
nxt_cache_shm_create(nxt_mem_zone_t * mz)45 nxt_cache_shm_create(nxt_mem_zone_t *mz)
46 {
47 nxt_cache_t *cache;
48
49 static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = {
50 NXT_LVLHSH_LARGE_SLAB,
51 0,
52 nxt_cache_lvlhsh_test,
53 (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc,
54 (nxt_lvlhsh_free_t) nxt_mem_zone_free,
55 };
56
57 cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t));
58
59 if (cache == NULL) {
60 return NXT_ERROR;
61 }
62
63 cache->proto = &proto;
64 cache->pool = mz;
65
66 cache->start_time = nxt_cache_time(nxt_thread());
67
68 return NXT_OK;
69 }
70
71
72 static void *
nxt_cache_shm_alloc(void * data,size_t size,nxt_uint_t nalloc)73 nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc)
74 {
75 return nxt_mem_zone_align(data, size, size);
76 }
77
78
79 void
nxt_cache_init(nxt_cache_t * cache)80 nxt_cache_init(nxt_cache_t *cache)
81 {
82 static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = {
83 NXT_LVLHSH_LARGE_MEMALIGN,
84 0,
85 nxt_cache_lvlhsh_test,
86 nxt_lvlhsh_alloc,
87 nxt_lvlhsh_free,
88 };
89
90 cache->proto = &proto;
91
92 cache->start_time = nxt_cache_time(nxt_thread());
93 }
94
95
96 static nxt_int_t
nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t * lhq,void * data)97 nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data)
98 {
99 nxt_cache_node_t *node;
100
101 node = data;
102
103 if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) {
104 return NXT_OK;
105 }
106
107 return NXT_DECLINED;
108 }
109
110
111 nxt_inline void
nxt_cache_lock(nxt_cache_t * cache)112 nxt_cache_lock(nxt_cache_t *cache)
113 {
114 if (cache->shared) {
115 nxt_thread_spin_lock(&cache->lock);
116 }
117 }
118
119
120 nxt_inline void
nxt_cache_unlock(nxt_cache_t * cache)121 nxt_cache_unlock(nxt_cache_t *cache)
122 {
123 if (cache->shared) {
124 nxt_thread_spin_unlock(&cache->lock);
125 }
126 }
127
128
129 void
nxt_cache_query(nxt_cache_t * cache,nxt_cache_query_t * q)130 nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q)
131 {
132 nxt_thread_t *thr;
133 nxt_lvlhsh_query_t lhq;
134 nxt_work_handler_t handler;
135
136 thr = nxt_thread();
137
138 if (cache != NULL) {
139 lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len);
140 lhq.replace = 0;
141 lhq.key.len = q->key_len;
142 lhq.key.data = q->key_data;
143 lhq.proto = cache->proto;
144 lhq.pool = cache->pool;
145
146 q->now = nxt_cache_time(thr);
147
148 nxt_cache_lock(cache);
149
150 handler = nxt_cache_query_locked(cache, q, &lhq);
151
152 nxt_cache_unlock(cache);
153
154 } else {
155 handler = q->state->nocache_handler;
156 }
157
158 handler(thr, q, NULL);
159 }
160
161
162 static nxt_work_handler_t
nxt_cache_query_locked(nxt_cache_t * cache,nxt_cache_query_t * q,nxt_lvlhsh_query_t * lhq)163 nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
164 nxt_lvlhsh_query_t *lhq)
165 {
166 nxt_int_t ret;
167 nxt_time_t expiry;
168 nxt_cache_node_t *node;
169 nxt_cache_query_state_t *state;
170
171 if (q->hold) {
172 return nxt_cache_node_hold(cache, q, lhq);
173 }
174
175 ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq);
176
177 state = q->state;
178
179 if (ret != NXT_OK) {
180 /* NXT_DECLINED */
181 return state->nocache_handler;
182 }
183
184 node = lhq->value;
185 node->count++;
186 q->node = node;
187
188 expiry = cache->start_time + node->expiry;
189
190 if (q->now < expiry) {
191 return state->ready_handler;
192 }
193
194 q->stale = 1;
195
196 return state->stale_handler;
197 }
198
199
200 static nxt_work_handler_t
nxt_cache_node_hold(nxt_cache_t * cache,nxt_cache_query_t * q,nxt_lvlhsh_query_t * lhq)201 nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q,
202 nxt_lvlhsh_query_t *lhq)
203 {
204 nxt_int_t ret;
205 nxt_bool_t slow;
206 nxt_cache_node_t *node, *sentinel;
207 nxt_work_handler_t handler;
208 nxt_cache_query_wait_t *qw;
209 nxt_cache_query_state_t *state;
210
211 state = q->state;
212 sentinel = nxt_cache_node_alloc(cache);
213
214 if (nxt_slow_path(sentinel == NULL)) {
215 return state->error_handler;
216 }
217
218 sentinel->key_data = q->key_data;
219 sentinel->key_len = q->key_len;
220 lhq->value = sentinel;
221
222 /*
223 * Try to insert an empty sentinel node to hold updating
224 * process if there is no existent cache node in cache.
225 */
226 ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq);
227
228 if (ret == NXT_OK) {
229 /* The sentinel node was successully added. */
230
231 q->node = sentinel;
232 sentinel->updating = 1;
233 return state->update_handler;
234 }
235
236 nxt_cache_node_free(cache, sentinel, 1);
237
238 if (ret == NXT_ERROR) {
239 return state->error_handler;
240 }
241
242 /* NXT_DECLINED: a cache node exists. */
243
244 node = lhq->value;
245 node->count++;
246 q->node = node;
247
248 handler = nxt_cache_node_test(cache, q);
249 if (handler != NULL) {
250 return handler;
251 }
252
253 /* Add the node to a wait queue. */
254
255 qw = nxt_cache_query_wait_alloc(cache, &slow);
256 if (nxt_slow_path(qw == NULL)) {
257 return state->error_handler;
258 }
259
260 if (slow) {
261 /* The node state may have been changed during slow allocation. */
262
263 handler = nxt_cache_node_test(cache, q);
264 if (handler != NULL) {
265 nxt_cache_query_wait_free(cache, qw);
266 return handler;
267 }
268 }
269
270 qw->query = q;
271 qw->next = node->waiting;
272 qw->busy = 0;
273 qw->deleted = 0;
274 qw->pid = nxt_pid;
275 qw->engine = nxt_thread_event_engine();
276 qw->handler = nxt_cache_wake_handler;
277 qw->cache = cache;
278
279 node->waiting = qw;
280
281 return nxt_cache_wait_handler;
282 }
283
284
285 static nxt_work_handler_t
nxt_cache_node_test(nxt_cache_t * cache,nxt_cache_query_t * q)286 nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q)
287 {
288 nxt_time_t expiry;
289 nxt_cache_node_t *node;
290 nxt_cache_query_state_t *state;
291
292 q->stale = 0;
293 state = q->state;
294 node = q->node;
295
296 expiry = cache->start_time + node->expiry;
297
298 if (q->now < expiry) {
299 return state->ready_handler;
300 }
301
302 /*
303 * A valid stale or empty sentinel cache node.
304 * The sentinel node can be only in updating state.
305 */
306
307 if (node->updating) {
308
309 if (node->expiry != 0) {
310 /* A valid stale cache node. */
311
312 q->stale = 1;
313
314 if (q->use_stale) {
315 return state->stale_handler;
316 }
317 }
318
319 /* A sentinel node. */
320 return NULL;
321 }
322
323 /* A valid stale cache node is not being updated now. */
324
325 q->stale = 1;
326
327 if (q->use_stale) {
328
329 if (q->update_stale) {
330 node->updating = 1;
331 return state->update_stale_handler;
332 }
333
334 return state->stale_handler;
335 }
336
337 node->updating = 1;
338 return state->update_handler;
339 }
340
341
342 static void
nxt_cache_wait_handler(nxt_thread_t * thr,void * obj,void * data)343 nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data)
344 {
345 nxt_event_timer_t *ev;
346 nxt_cache_query_t *cq;
347
348 cq = obj;
349
350 if (cq->timeout != 0) {
351
352 ev = &cq->timer;
353
354 if (ev->state == NXT_EVENT_TIMER_DISABLED) {
355 ev->handler = nxt_cache_timeout_handler;
356 nxt_event_timer_ident(ev, -1);
357
358 nxt_event_timer_add(thr->engine, ev, cq->timeout);
359 }
360 }
361 }
362
363
364 static void
nxt_cache_timeout_handler(nxt_thread_t * thr,void * obj,void * data)365 nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data)
366 {
367 nxt_cache_query_t *cq;
368 nxt_event_timer_t *ev;
369
370 ev = obj;
371
372 cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer);
373
374 cq->state->timeout_handler(thr, cq, NULL);
375 }
376
377
378 static void
nxt_cache_wake_handler(nxt_thread_t * thr,void * obj,void * data)379 nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data)
380 {
381 nxt_cache_t *cache;
382 nxt_work_handler_t handler;
383 nxt_cache_query_t *q;
384 nxt_cache_query_wait_t *qw;
385
386 qw = obj;
387 q = qw->query;
388 cache = qw->cache;
389
390 nxt_cache_lock(cache);
391
392 handler = nxt_cache_node_test(cache, q);
393
394 if (handler != NULL) {
395 nxt_cache_query_wait_free(cache, qw);
396
397 } else {
398 /* Wait again. */
399 qw->next = q->node->waiting;
400 q->node->waiting = qw;
401 }
402
403 nxt_cache_unlock(cache);
404
405 handler(thr, q, NULL);
406 }
407
408
409 nxt_int_t
nxt_cache_update(nxt_cache_t * cache,nxt_cache_query_t * q)410 nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q)
411 {
412 nxt_int_t ret;
413 nxt_cache_node_t *node;
414 nxt_lvlhsh_query_t lhq;
415
416 node = q->node;
417
418 node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time;
419
420 node->updating = 0;
421 node->count = 1;
422
423 lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
424 lhq.replace = 1;
425 lhq.key.len = node->key_len;
426 lhq.key.data = node->key_data;
427 lhq.value = node;
428 lhq.proto = cache->proto;
429 lhq.pool = cache->pool;
430
431 nxt_cache_lock(cache);
432
433 ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq);
434
435 if (nxt_fast_path(ret != NXT_OK)) {
436
437 nxt_queue_insert_head(&cache->expiry_queue, &node->link);
438
439 node = lhq.value;
440
441 if (node != NULL) {
442 /* A replaced node. */
443
444 nxt_queue_remove(&node->link);
445
446 if (node->count != 0) {
447 node->deleted = 1;
448
449 } else {
450 // delete cache node
451 }
452 }
453 }
454
455 nxt_cache_unlock(cache);
456
457 return ret;
458 }
459
460
461 void
nxt_cache_release(nxt_cache_t * cache,nxt_cache_query_t * q)462 nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q)
463 {
464 u_char *p, *data;
465 size_t size;
466 ssize_t ret;
467 nxt_thread_t *thr;
468 u_char buf[1024];
469
470 thr = nxt_thread();
471 q->now = nxt_cache_time(thr);
472
473 p = buf;
474 size = sizeof(buf);
475
476 for ( ;; ) {
477 nxt_cache_lock(cache);
478
479 ret = nxt_cache_release_locked(cache, q, p, size);
480
481 nxt_cache_unlock(cache);
482
483 if (ret == 0) {
484 return;
485 }
486
487 size = nxt_abs(ret);
488
489 data = nxt_malloc(size);
490
491 if (data == NULL) {
492 /* TODO: retry */
493 return;
494 }
495
496 if (ret < 0) {
497 p = data;
498 continue;
499 }
500
501 if (p != data) {
502 nxt_memcpy(data, p, size);
503 }
504
505 nxt_thread_work_queue_add(thr, &thr->work_queue.main,
506 cache->delete_handler, data, NULL, thr->log);
507 }
508 }
509
510
511 static ssize_t
nxt_cache_release_locked(nxt_cache_t * cache,nxt_cache_query_t * q,u_char * buf,size_t size)512 nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
513 u_char *buf, size_t size)
514 {
515 ssize_t ret;
516 nxt_cache_node_t *node;
517
518 node = q->node;
519 node->count--;
520
521 if (node->count != 0) {
522 return 0;
523 }
524
525 if (!node->deleted) {
526 /*
527 * A cache node is locked whilst its count is non zero.
528 * To minimize number of operations the node's place in expiry
529 * queue can be updated only if the node is not currently used.
530 */
531 node->accessed = q->now - cache->start_time;
532
533 nxt_queue_remove(&node->link);
534 nxt_queue_insert_head(&cache->expiry_queue, &node->link);
535
536 return 0;
537 }
538
539 ret = 0;
540 #if 0
541
542 ret = cache->delete_copy(cache, node, buf, size);
543
544 if (ret < 0) {
545 return ret;
546 }
547
548 #endif
549
550 nxt_cache_node_free(cache, node, 0);
551
552 return ret;
553 }
554
555
556 static nxt_cache_node_t *
nxt_cache_node_alloc(nxt_cache_t * cache)557 nxt_cache_node_alloc(nxt_cache_t *cache)
558 {
559 nxt_queue_link_t *link;
560 nxt_cache_node_t *node;
561
562 link = nxt_queue_first(&cache->free_nodes);
563
564 if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) {
565 cache->nfree_nodes--;
566 nxt_queue_remove(link);
567
568 node = nxt_queue_link_data(link, nxt_cache_node_t, link);
569 nxt_memzero(node, sizeof(nxt_cache_node_t));
570
571 return node;
572 }
573
574 nxt_cache_unlock(cache);
575
576 node = cache->alloc(cache->data, sizeof(nxt_cache_node_t));
577
578 nxt_cache_lock(cache);
579
580 return node;
581 }
582
583
584 static void
nxt_cache_node_free(nxt_cache_t * cache,nxt_cache_node_t * node,nxt_bool_t fast)585 nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast)
586 {
587 if (fast || cache->nfree_nodes < 32) {
588 nxt_queue_insert_head(&cache->free_nodes, &node->link);
589 cache->nfree_nodes++;
590 return;
591 }
592
593 nxt_cache_unlock(cache);
594
595 cache->free(cache->data, node);
596
597 nxt_cache_lock(cache);
598 }
599
600
601 static nxt_cache_query_wait_t *
nxt_cache_query_wait_alloc(nxt_cache_t * cache,nxt_bool_t * slow)602 nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow)
603 {
604 nxt_cache_query_wait_t *qw;
605
606 qw = cache->free_query_wait;
607
608 if (nxt_fast_path(qw != NULL)) {
609 cache->free_query_wait = qw->next;
610 cache->nfree_query_wait--;
611
612 *slow = 0;
613 return qw;
614 }
615
616 nxt_cache_unlock(cache);
617
618 qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t));
619 *slow = 1;
620
621 nxt_cache_lock(cache);
622
623 return qw;
624 }
625
626
627 static void
nxt_cache_query_wait_free(nxt_cache_t * cache,nxt_cache_query_wait_t * qw)628 nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw)
629 {
630 if (cache->nfree_query_wait < 32) {
631 qw->next = cache->free_query_wait;
632 cache->free_query_wait = qw;
633 cache->nfree_query_wait++;
634 return;
635 }
636
637 nxt_cache_unlock(cache);
638
639 cache->free(cache->data, qw);
640
641 nxt_cache_lock(cache);
642 }
643