xref: /unit/src/nxt_cache.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /* A cache time resolution is 10ms. */
11 #define                                                                       \
12 nxt_cache_time(thr)                                                           \
13     (uint64_t) (nxt_thread_time(thr) * 100)
14 
15 
16 static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data);
17 static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache,
18     nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
19 static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache,
20     nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
21 static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache,
22     nxt_cache_query_t *q);
23 
24 static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data);
25 static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data);
26 static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data);
27 static ssize_t nxt_cache_release_locked(nxt_cache_t *cache,
28     nxt_cache_query_t *q, u_char *buf, size_t size);
29 
30 static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache);
31 static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node,
32     nxt_bool_t fast);
33 static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache,
34     nxt_bool_t *slow);
35 static void nxt_cache_query_wait_free(nxt_cache_t *cache,
36     nxt_cache_query_wait_t *qw);
37 
38 
39 /* STUB */
40 nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool);
41 static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc);
42 /**/
43 
44 
45 nxt_int_t
46 nxt_cache_shm_create(nxt_mem_zone_t *mz)
47 {
48     nxt_cache_t  *cache;
49 
50     static const nxt_lvlhsh_proto_t  proto  nxt_aligned(64) = {
51         NXT_LVLHSH_LARGE_SLAB,
52         0,
53         nxt_cache_lvlhsh_test,
54         (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc,
55         (nxt_lvlhsh_free_t) nxt_mem_zone_free,
56     };
57 
58     cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t));
59 
60     if (cache == NULL) {
61         return NXT_ERROR;
62     }
63 
64     cache->proto = &proto;
65     cache->pool = mz;
66 
67     cache->start_time = nxt_cache_time(nxt_thread());
68 
69     return NXT_OK;
70 }
71 
72 
73 static void *
74 nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc)
75 {
76     return nxt_mem_zone_align(data, size, size);
77 }
78 
79 
80 void
81 nxt_cache_init(nxt_cache_t *cache)
82 {
83     static const nxt_lvlhsh_proto_t  proto  nxt_aligned(64) = {
84         NXT_LVLHSH_LARGE_MEMALIGN,
85         0,
86         nxt_cache_lvlhsh_test,
87         nxt_lvlhsh_alloc,
88         nxt_lvlhsh_free,
89     };
90 
91     cache->proto = &proto;
92 
93     cache->start_time = nxt_cache_time(nxt_thread());
94 }
95 
96 
97 static nxt_int_t
98 nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data)
99 {
100     nxt_cache_node_t  *node;
101 
102     node = data;
103 
104     if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) {
105         return NXT_OK;
106     }
107 
108     return NXT_DECLINED;
109 }
110 
111 
112 nxt_inline void
113 nxt_cache_lock(nxt_cache_t *cache)
114 {
115     if (cache->shared) {
116         nxt_thread_spin_lock(&cache->lock);
117     }
118 }
119 
120 
121 nxt_inline void
122 nxt_cache_unlock(nxt_cache_t *cache)
123 {
124     if (cache->shared) {
125         nxt_thread_spin_unlock(&cache->lock);
126     }
127 }
128 
129 
130 void
131 nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q)
132 {
133     nxt_thread_t        *thr;
134     nxt_lvlhsh_query_t  lhq;
135     nxt_work_handler_t  handler;
136 
137     thr = nxt_thread();
138 
139     if (cache != NULL) {
140         lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len);
141         lhq.replace = 0;
142         lhq.key.len = q->key_len;
143         lhq.key.data = q->key_data;
144         lhq.proto = cache->proto;
145         lhq.pool = cache->pool;
146 
147         q->now = nxt_cache_time(thr);
148 
149         nxt_cache_lock(cache);
150 
151         handler = nxt_cache_query_locked(cache, q, &lhq);
152 
153         nxt_cache_unlock(cache);
154 
155     } else {
156         handler = q->state->nocache_handler;
157     }
158 
159     handler(thr, q, NULL);
160 }
161 
162 
163 static nxt_work_handler_t
164 nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
165     nxt_lvlhsh_query_t *lhq)
166 {
167     nxt_int_t                ret;
168     nxt_time_t               expiry;
169     nxt_cache_node_t         *node;
170     nxt_cache_query_state_t  *state;
171 
172     if (q->hold) {
173         return nxt_cache_node_hold(cache, q, lhq);
174     }
175 
176     ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq);
177 
178     state = q->state;
179 
180     if (ret != NXT_OK) {
181         /* NXT_DECLINED */
182         return state->nocache_handler;
183     }
184 
185     node = lhq->value;
186     node->count++;
187     q->node = node;
188 
189     expiry = cache->start_time + node->expiry;
190 
191     if (q->now < expiry) {
192         return state->ready_handler;
193     }
194 
195     q->stale = 1;
196 
197     return state->stale_handler;
198 }
199 
200 
201 static nxt_work_handler_t
202 nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q,
203     nxt_lvlhsh_query_t *lhq)
204 {
205     nxt_int_t                ret;
206     nxt_bool_t               slow;
207     nxt_cache_node_t         *node, *sentinel;
208     nxt_work_handler_t       handler;
209     nxt_cache_query_wait_t   *qw;
210     nxt_cache_query_state_t  *state;
211 
212     state = q->state;
213     sentinel = nxt_cache_node_alloc(cache);
214 
215     if (nxt_slow_path(sentinel == NULL)) {
216         return state->error_handler;
217     }
218 
219     sentinel->key_data = q->key_data;
220     sentinel->key_len = q->key_len;
221     lhq->value = sentinel;
222 
223     /*
224      * Try to insert an empty sentinel node to hold updating
225      * process if there is no existent cache node in cache.
226      */
227     ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq);
228 
229     if (ret == NXT_OK) {
230         /* The sentinel node was successully added. */
231 
232         q->node = sentinel;
233         sentinel->updating = 1;
234         return state->update_handler;
235     }
236 
237     nxt_cache_node_free(cache, sentinel, 1);
238 
239     if (ret == NXT_ERROR) {
240         return state->error_handler;
241     }
242 
243     /* NXT_DECLINED: a cache node exists. */
244 
245     node = lhq->value;
246     node->count++;
247     q->node = node;
248 
249     handler = nxt_cache_node_test(cache, q);
250     if (handler != NULL) {
251         return handler;
252     }
253 
254     /* Add the node to a wait queue. */
255 
256     qw = nxt_cache_query_wait_alloc(cache, &slow);
257     if (nxt_slow_path(qw == NULL)) {
258         return state->error_handler;
259     }
260 
261     if (slow) {
262         /* The node state may have been changed during slow allocation. */
263 
264         handler = nxt_cache_node_test(cache, q);
265         if (handler != NULL) {
266             nxt_cache_query_wait_free(cache, qw);
267             return handler;
268         }
269     }
270 
271     qw->query = q;
272     qw->next = node->waiting;
273     qw->busy = 0;
274     qw->deleted = 0;
275     qw->pid = nxt_pid;
276     qw->engine = nxt_thread_event_engine();
277     qw->handler = nxt_cache_wake_handler;
278     qw->cache = cache;
279 
280     node->waiting = qw;
281 
282     return nxt_cache_wait_handler;
283 }
284 
285 
286 static nxt_work_handler_t
287 nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q)
288 {
289     nxt_time_t               expiry;
290     nxt_cache_node_t         *node;
291     nxt_cache_query_state_t  *state;
292 
293     q->stale = 0;
294     state = q->state;
295     node = q->node;
296 
297     expiry = cache->start_time + node->expiry;
298 
299     if (q->now < expiry) {
300         return state->ready_handler;
301     }
302 
303     /*
304      * A valid stale or empty sentinel cache node.
305      * The sentinel node can be only in updating state.
306      */
307 
308     if (node->updating) {
309 
310         if (node->expiry != 0) {
311             /* A valid stale cache node. */
312 
313             q->stale = 1;
314 
315             if (q->use_stale) {
316                 return state->stale_handler;
317             }
318         }
319 
320         /* A sentinel node. */
321         return NULL;
322     }
323 
324     /* A valid stale cache node is not being updated now. */
325 
326     q->stale = 1;
327 
328     if (q->use_stale) {
329 
330         if (q->update_stale) {
331             node->updating = 1;
332             return state->update_stale_handler;
333         }
334 
335         return state->stale_handler;
336     }
337 
338     node->updating = 1;
339     return state->update_handler;
340 }
341 
342 
343 static void
344 nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data)
345 {
346     nxt_event_timer_t  *ev;
347     nxt_cache_query_t  *cq;
348 
349     cq = obj;
350 
351     if (cq->timeout != 0) {
352 
353         ev = &cq->timer;
354 
355         if (ev->state == NXT_EVENT_TIMER_DISABLED) {
356             ev->handler = nxt_cache_timeout_handler;
357             nxt_event_timer_ident(ev, -1);
358 
359             nxt_event_timer_add(thr->engine, ev, cq->timeout);
360         }
361     }
362 }
363 
364 
365 static void
366 nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data)
367 {
368     nxt_cache_query_t  *cq;
369     nxt_event_timer_t  *ev;
370 
371     ev = obj;
372 
373     cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer);
374 
375     cq->state->timeout_handler(thr, cq, NULL);
376 }
377 
378 
379 static void
380 nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data)
381 {
382     nxt_cache_t             *cache;
383     nxt_work_handler_t      handler;
384     nxt_cache_query_t       *q;
385     nxt_cache_query_wait_t  *qw;
386 
387     qw = obj;
388     q = qw->query;
389     cache = qw->cache;
390 
391     nxt_cache_lock(cache);
392 
393     handler = nxt_cache_node_test(cache, q);
394 
395     if (handler != NULL) {
396         nxt_cache_query_wait_free(cache, qw);
397 
398     } else {
399         /* Wait again. */
400         qw->next = q->node->waiting;
401         q->node->waiting = qw;
402     }
403 
404     nxt_cache_unlock(cache);
405 
406     handler(thr, q, NULL);
407 }
408 
409 
410 nxt_int_t
411 nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q)
412 {
413     nxt_int_t           ret;
414     nxt_cache_node_t    *node;
415     nxt_lvlhsh_query_t  lhq;
416 
417     node = q->node;
418 
419     node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time;
420 
421     node->updating = 0;
422     node->count = 1;
423 
424     lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
425     lhq.replace = 1;
426     lhq.key.len = node->key_len;
427     lhq.key.data = node->key_data;
428     lhq.value = node;
429     lhq.proto = cache->proto;
430     lhq.pool = cache->pool;
431 
432     nxt_cache_lock(cache);
433 
434     ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq);
435 
436     if (nxt_fast_path(ret != NXT_OK)) {
437 
438         nxt_queue_insert_head(&cache->expiry_queue, &node->link);
439 
440         node = lhq.value;
441 
442         if (node != NULL) {
443             /* A replaced node. */
444 
445             nxt_queue_remove(&node->link);
446 
447             if (node->count != 0) {
448                 node->deleted = 1;
449 
450             } else {
451                 // delete cache node
452             }
453         }
454     }
455 
456     nxt_cache_unlock(cache);
457 
458     return ret;
459 }
460 
461 
462 void
463 nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q)
464 {
465     u_char        *p, *data;
466     size_t        size;
467     ssize_t       ret;
468     nxt_thread_t  *thr;
469     u_char        buf[1024];
470 
471     thr = nxt_thread();
472     q->now = nxt_cache_time(thr);
473 
474     p = buf;
475     size = sizeof(buf);
476 
477     for ( ;; ) {
478         nxt_cache_lock(cache);
479 
480         ret = nxt_cache_release_locked(cache, q, p, size);
481 
482         nxt_cache_unlock(cache);
483 
484         if (ret == 0) {
485             return;
486         }
487 
488         size = nxt_abs(ret);
489 
490         data = nxt_malloc(size);
491 
492         if (data == NULL) {
493             /* TODO: retry */
494             return;
495         }
496 
497         if (ret < 0) {
498             p = data;
499             continue;
500         }
501 
502         if (p != data) {
503             nxt_memcpy(data, p, size);
504         }
505 
506         nxt_thread_work_queue_add(thr, &thr->work_queue.main,
507                                   cache->delete_handler, data, NULL, thr->log);
508     }
509 }
510 
511 
512 static ssize_t
513 nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
514     u_char *buf, size_t size)
515 {
516     ssize_t           ret;
517     nxt_cache_node_t  *node;
518 
519     node = q->node;
520     node->count--;
521 
522     if (node->count != 0) {
523         return 0;
524     }
525 
526     if (!node->deleted) {
527         /*
528          * A cache node is locked whilst its count is non zero.
529          * To minimize number of operations the node's place in expiry
530          * queue can be updated only if the node is not currently used.
531          */
532         node->accessed = q->now - cache->start_time;
533 
534         nxt_queue_remove(&node->link);
535         nxt_queue_insert_head(&cache->expiry_queue, &node->link);
536 
537         return 0;
538     }
539 
540     ret = 0;
541 #if 0
542 
543     ret = cache->delete_copy(cache, node, buf, size);
544 
545     if (ret < 0) {
546         return ret;
547     }
548 
549 #endif
550 
551     nxt_cache_node_free(cache, node, 0);
552 
553     return ret;
554 }
555 
556 
557 static nxt_cache_node_t *
558 nxt_cache_node_alloc(nxt_cache_t *cache)
559 {
560     nxt_queue_link_t  *link;
561     nxt_cache_node_t  *node;
562 
563     link = nxt_queue_first(&cache->free_nodes);
564 
565     if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) {
566         cache->nfree_nodes--;
567         nxt_queue_remove(link);
568 
569         node = nxt_queue_link_data(link, nxt_cache_node_t, link);
570         nxt_memzero(node, sizeof(nxt_cache_node_t));
571 
572         return node;
573     }
574 
575     nxt_cache_unlock(cache);
576 
577     node = cache->alloc(cache->data, sizeof(nxt_cache_node_t));
578 
579     nxt_cache_lock(cache);
580 
581     return node;
582 }
583 
584 
585 static void
586 nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast)
587 {
588     if (fast || cache->nfree_nodes < 32) {
589         nxt_queue_insert_head(&cache->free_nodes, &node->link);
590         cache->nfree_nodes++;
591         return;
592     }
593 
594     nxt_cache_unlock(cache);
595 
596     cache->free(cache->data, node);
597 
598     nxt_cache_lock(cache);
599 }
600 
601 
602 static nxt_cache_query_wait_t *
603 nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow)
604 {
605     nxt_cache_query_wait_t  *qw;
606 
607     qw = cache->free_query_wait;
608 
609     if (nxt_fast_path(qw != NULL)) {
610         cache->free_query_wait = qw->next;
611         cache->nfree_query_wait--;
612 
613         *slow = 0;
614         return qw;
615     }
616 
617     nxt_cache_unlock(cache);
618 
619     qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t));
620     *slow = 1;
621 
622     nxt_cache_lock(cache);
623 
624     return qw;
625 }
626 
627 
628 static void
629 nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw)
630 {
631     if (cache->nfree_query_wait < 32) {
632         qw->next = cache->free_query_wait;
633         cache->free_query_wait = qw;
634         cache->nfree_query_wait++;
635         return;
636     }
637 
638     nxt_cache_unlock(cache);
639 
640     cache->free(cache->data, qw);
641 
642     nxt_cache_lock(cache);
643 }
644