xref: /unit/src/nxt_cache.c (revision 2084:7d479274f334)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /* A cache time resolution is 10ms. */
11 #define nxt_cache_time(thr)                                                   \
12     (uint64_t) (nxt_thread_time(thr) * 100)
13 
14 
15 static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data);
16 static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache,
17     nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
18 static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache,
19     nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
20 static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache,
21     nxt_cache_query_t *q);
22 
23 static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data);
24 static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data);
25 static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data);
26 static ssize_t nxt_cache_release_locked(nxt_cache_t *cache,
27     nxt_cache_query_t *q, u_char *buf, size_t size);
28 
29 static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache);
30 static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node,
31     nxt_bool_t fast);
32 static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache,
33     nxt_bool_t *slow);
34 static void nxt_cache_query_wait_free(nxt_cache_t *cache,
35     nxt_cache_query_wait_t *qw);
36 
37 
38 /* STUB */
39 nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool);
40 static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc);
41 /**/
42 
43 
44 nxt_int_t
nxt_cache_shm_create(nxt_mem_zone_t * mz)45 nxt_cache_shm_create(nxt_mem_zone_t *mz)
46 {
47     nxt_cache_t  *cache;
48 
49     static const nxt_lvlhsh_proto_t  proto  nxt_aligned(64) = {
50         NXT_LVLHSH_LARGE_SLAB,
51         0,
52         nxt_cache_lvlhsh_test,
53         (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc,
54         (nxt_lvlhsh_free_t) nxt_mem_zone_free,
55     };
56 
57     cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t));
58 
59     if (cache == NULL) {
60         return NXT_ERROR;
61     }
62 
63     cache->proto = &proto;
64     cache->pool = mz;
65 
66     cache->start_time = nxt_cache_time(nxt_thread());
67 
68     return NXT_OK;
69 }
70 
71 
72 static void *
nxt_cache_shm_alloc(void * data,size_t size,nxt_uint_t nalloc)73 nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc)
74 {
75     return nxt_mem_zone_align(data, size, size);
76 }
77 
78 
79 void
nxt_cache_init(nxt_cache_t * cache)80 nxt_cache_init(nxt_cache_t *cache)
81 {
82     static const nxt_lvlhsh_proto_t  proto  nxt_aligned(64) = {
83         NXT_LVLHSH_LARGE_MEMALIGN,
84         0,
85         nxt_cache_lvlhsh_test,
86         nxt_lvlhsh_alloc,
87         nxt_lvlhsh_free,
88     };
89 
90     cache->proto = &proto;
91 
92     cache->start_time = nxt_cache_time(nxt_thread());
93 }
94 
95 
96 static nxt_int_t
nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t * lhq,void * data)97 nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data)
98 {
99     nxt_cache_node_t  *node;
100 
101     node = data;
102 
103     if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) {
104         return NXT_OK;
105     }
106 
107     return NXT_DECLINED;
108 }
109 
110 
111 nxt_inline void
nxt_cache_lock(nxt_cache_t * cache)112 nxt_cache_lock(nxt_cache_t *cache)
113 {
114     if (cache->shared) {
115         nxt_thread_spin_lock(&cache->lock);
116     }
117 }
118 
119 
120 nxt_inline void
nxt_cache_unlock(nxt_cache_t * cache)121 nxt_cache_unlock(nxt_cache_t *cache)
122 {
123     if (cache->shared) {
124         nxt_thread_spin_unlock(&cache->lock);
125     }
126 }
127 
128 
129 void
nxt_cache_query(nxt_cache_t * cache,nxt_cache_query_t * q)130 nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q)
131 {
132     nxt_thread_t        *thr;
133     nxt_lvlhsh_query_t  lhq;
134     nxt_work_handler_t  handler;
135 
136     thr = nxt_thread();
137 
138     if (cache != NULL) {
139         lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len);
140         lhq.replace = 0;
141         lhq.key.len = q->key_len;
142         lhq.key.data = q->key_data;
143         lhq.proto = cache->proto;
144         lhq.pool = cache->pool;
145 
146         q->now = nxt_cache_time(thr);
147 
148         nxt_cache_lock(cache);
149 
150         handler = nxt_cache_query_locked(cache, q, &lhq);
151 
152         nxt_cache_unlock(cache);
153 
154     } else {
155         handler = q->state->nocache_handler;
156     }
157 
158     handler(thr, q, NULL);
159 }
160 
161 
162 static nxt_work_handler_t
nxt_cache_query_locked(nxt_cache_t * cache,nxt_cache_query_t * q,nxt_lvlhsh_query_t * lhq)163 nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
164     nxt_lvlhsh_query_t *lhq)
165 {
166     nxt_int_t                ret;
167     nxt_time_t               expiry;
168     nxt_cache_node_t         *node;
169     nxt_cache_query_state_t  *state;
170 
171     if (q->hold) {
172         return nxt_cache_node_hold(cache, q, lhq);
173     }
174 
175     ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq);
176 
177     state = q->state;
178 
179     if (ret != NXT_OK) {
180         /* NXT_DECLINED */
181         return state->nocache_handler;
182     }
183 
184     node = lhq->value;
185     node->count++;
186     q->node = node;
187 
188     expiry = cache->start_time + node->expiry;
189 
190     if (q->now < expiry) {
191         return state->ready_handler;
192     }
193 
194     q->stale = 1;
195 
196     return state->stale_handler;
197 }
198 
199 
200 static nxt_work_handler_t
nxt_cache_node_hold(nxt_cache_t * cache,nxt_cache_query_t * q,nxt_lvlhsh_query_t * lhq)201 nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q,
202     nxt_lvlhsh_query_t *lhq)
203 {
204     nxt_int_t                ret;
205     nxt_bool_t               slow;
206     nxt_cache_node_t         *node, *sentinel;
207     nxt_work_handler_t       handler;
208     nxt_cache_query_wait_t   *qw;
209     nxt_cache_query_state_t  *state;
210 
211     state = q->state;
212     sentinel = nxt_cache_node_alloc(cache);
213 
214     if (nxt_slow_path(sentinel == NULL)) {
215         return state->error_handler;
216     }
217 
218     sentinel->key_data = q->key_data;
219     sentinel->key_len = q->key_len;
220     lhq->value = sentinel;
221 
222     /*
223      * Try to insert an empty sentinel node to hold updating
224      * process if there is no existent cache node in cache.
225      */
226     ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq);
227 
228     if (ret == NXT_OK) {
229         /* The sentinel node was successully added. */
230 
231         q->node = sentinel;
232         sentinel->updating = 1;
233         return state->update_handler;
234     }
235 
236     nxt_cache_node_free(cache, sentinel, 1);
237 
238     if (ret == NXT_ERROR) {
239         return state->error_handler;
240     }
241 
242     /* NXT_DECLINED: a cache node exists. */
243 
244     node = lhq->value;
245     node->count++;
246     q->node = node;
247 
248     handler = nxt_cache_node_test(cache, q);
249     if (handler != NULL) {
250         return handler;
251     }
252 
253     /* Add the node to a wait queue. */
254 
255     qw = nxt_cache_query_wait_alloc(cache, &slow);
256     if (nxt_slow_path(qw == NULL)) {
257         return state->error_handler;
258     }
259 
260     if (slow) {
261         /* The node state may have been changed during slow allocation. */
262 
263         handler = nxt_cache_node_test(cache, q);
264         if (handler != NULL) {
265             nxt_cache_query_wait_free(cache, qw);
266             return handler;
267         }
268     }
269 
270     qw->query = q;
271     qw->next = node->waiting;
272     qw->busy = 0;
273     qw->deleted = 0;
274     qw->pid = nxt_pid;
275     qw->engine = nxt_thread_event_engine();
276     qw->handler = nxt_cache_wake_handler;
277     qw->cache = cache;
278 
279     node->waiting = qw;
280 
281     return nxt_cache_wait_handler;
282 }
283 
284 
285 static nxt_work_handler_t
nxt_cache_node_test(nxt_cache_t * cache,nxt_cache_query_t * q)286 nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q)
287 {
288     nxt_time_t               expiry;
289     nxt_cache_node_t         *node;
290     nxt_cache_query_state_t  *state;
291 
292     q->stale = 0;
293     state = q->state;
294     node = q->node;
295 
296     expiry = cache->start_time + node->expiry;
297 
298     if (q->now < expiry) {
299         return state->ready_handler;
300     }
301 
302     /*
303      * A valid stale or empty sentinel cache node.
304      * The sentinel node can be only in updating state.
305      */
306 
307     if (node->updating) {
308 
309         if (node->expiry != 0) {
310             /* A valid stale cache node. */
311 
312             q->stale = 1;
313 
314             if (q->use_stale) {
315                 return state->stale_handler;
316             }
317         }
318 
319         /* A sentinel node. */
320         return NULL;
321     }
322 
323     /* A valid stale cache node is not being updated now. */
324 
325     q->stale = 1;
326 
327     if (q->use_stale) {
328 
329         if (q->update_stale) {
330             node->updating = 1;
331             return state->update_stale_handler;
332         }
333 
334         return state->stale_handler;
335     }
336 
337     node->updating = 1;
338     return state->update_handler;
339 }
340 
341 
342 static void
nxt_cache_wait_handler(nxt_thread_t * thr,void * obj,void * data)343 nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data)
344 {
345     nxt_event_timer_t  *ev;
346     nxt_cache_query_t  *cq;
347 
348     cq = obj;
349 
350     if (cq->timeout != 0) {
351 
352         ev = &cq->timer;
353 
354         if (ev->state == NXT_EVENT_TIMER_DISABLED) {
355             ev->handler = nxt_cache_timeout_handler;
356             nxt_event_timer_ident(ev, -1);
357 
358             nxt_event_timer_add(thr->engine, ev, cq->timeout);
359         }
360     }
361 }
362 
363 
364 static void
nxt_cache_timeout_handler(nxt_thread_t * thr,void * obj,void * data)365 nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data)
366 {
367     nxt_cache_query_t  *cq;
368     nxt_event_timer_t  *ev;
369 
370     ev = obj;
371 
372     cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer);
373 
374     cq->state->timeout_handler(thr, cq, NULL);
375 }
376 
377 
378 static void
nxt_cache_wake_handler(nxt_thread_t * thr,void * obj,void * data)379 nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data)
380 {
381     nxt_cache_t             *cache;
382     nxt_work_handler_t      handler;
383     nxt_cache_query_t       *q;
384     nxt_cache_query_wait_t  *qw;
385 
386     qw = obj;
387     q = qw->query;
388     cache = qw->cache;
389 
390     nxt_cache_lock(cache);
391 
392     handler = nxt_cache_node_test(cache, q);
393 
394     if (handler != NULL) {
395         nxt_cache_query_wait_free(cache, qw);
396 
397     } else {
398         /* Wait again. */
399         qw->next = q->node->waiting;
400         q->node->waiting = qw;
401     }
402 
403     nxt_cache_unlock(cache);
404 
405     handler(thr, q, NULL);
406 }
407 
408 
409 nxt_int_t
nxt_cache_update(nxt_cache_t * cache,nxt_cache_query_t * q)410 nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q)
411 {
412     nxt_int_t           ret;
413     nxt_cache_node_t    *node;
414     nxt_lvlhsh_query_t  lhq;
415 
416     node = q->node;
417 
418     node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time;
419 
420     node->updating = 0;
421     node->count = 1;
422 
423     lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
424     lhq.replace = 1;
425     lhq.key.len = node->key_len;
426     lhq.key.data = node->key_data;
427     lhq.value = node;
428     lhq.proto = cache->proto;
429     lhq.pool = cache->pool;
430 
431     nxt_cache_lock(cache);
432 
433     ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq);
434 
435     if (nxt_fast_path(ret != NXT_OK)) {
436 
437         nxt_queue_insert_head(&cache->expiry_queue, &node->link);
438 
439         node = lhq.value;
440 
441         if (node != NULL) {
442             /* A replaced node. */
443 
444             nxt_queue_remove(&node->link);
445 
446             if (node->count != 0) {
447                 node->deleted = 1;
448 
449             } else {
450                 // delete cache node
451             }
452         }
453     }
454 
455     nxt_cache_unlock(cache);
456 
457     return ret;
458 }
459 
460 
461 void
nxt_cache_release(nxt_cache_t * cache,nxt_cache_query_t * q)462 nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q)
463 {
464     u_char        *p, *data;
465     size_t        size;
466     ssize_t       ret;
467     nxt_thread_t  *thr;
468     u_char        buf[1024];
469 
470     thr = nxt_thread();
471     q->now = nxt_cache_time(thr);
472 
473     p = buf;
474     size = sizeof(buf);
475 
476     for ( ;; ) {
477         nxt_cache_lock(cache);
478 
479         ret = nxt_cache_release_locked(cache, q, p, size);
480 
481         nxt_cache_unlock(cache);
482 
483         if (ret == 0) {
484             return;
485         }
486 
487         size = nxt_abs(ret);
488 
489         data = nxt_malloc(size);
490 
491         if (data == NULL) {
492             /* TODO: retry */
493             return;
494         }
495 
496         if (ret < 0) {
497             p = data;
498             continue;
499         }
500 
501         if (p != data) {
502             nxt_memcpy(data, p, size);
503         }
504 
505         nxt_thread_work_queue_add(thr, &thr->work_queue.main,
506                                   cache->delete_handler, data, NULL, thr->log);
507     }
508 }
509 
510 
511 static ssize_t
nxt_cache_release_locked(nxt_cache_t * cache,nxt_cache_query_t * q,u_char * buf,size_t size)512 nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
513     u_char *buf, size_t size)
514 {
515     ssize_t           ret;
516     nxt_cache_node_t  *node;
517 
518     node = q->node;
519     node->count--;
520 
521     if (node->count != 0) {
522         return 0;
523     }
524 
525     if (!node->deleted) {
526         /*
527          * A cache node is locked whilst its count is non zero.
528          * To minimize number of operations the node's place in expiry
529          * queue can be updated only if the node is not currently used.
530          */
531         node->accessed = q->now - cache->start_time;
532 
533         nxt_queue_remove(&node->link);
534         nxt_queue_insert_head(&cache->expiry_queue, &node->link);
535 
536         return 0;
537     }
538 
539     ret = 0;
540 #if 0
541 
542     ret = cache->delete_copy(cache, node, buf, size);
543 
544     if (ret < 0) {
545         return ret;
546     }
547 
548 #endif
549 
550     nxt_cache_node_free(cache, node, 0);
551 
552     return ret;
553 }
554 
555 
556 static nxt_cache_node_t *
nxt_cache_node_alloc(nxt_cache_t * cache)557 nxt_cache_node_alloc(nxt_cache_t *cache)
558 {
559     nxt_queue_link_t  *link;
560     nxt_cache_node_t  *node;
561 
562     link = nxt_queue_first(&cache->free_nodes);
563 
564     if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) {
565         cache->nfree_nodes--;
566         nxt_queue_remove(link);
567 
568         node = nxt_queue_link_data(link, nxt_cache_node_t, link);
569         nxt_memzero(node, sizeof(nxt_cache_node_t));
570 
571         return node;
572     }
573 
574     nxt_cache_unlock(cache);
575 
576     node = cache->alloc(cache->data, sizeof(nxt_cache_node_t));
577 
578     nxt_cache_lock(cache);
579 
580     return node;
581 }
582 
583 
584 static void
nxt_cache_node_free(nxt_cache_t * cache,nxt_cache_node_t * node,nxt_bool_t fast)585 nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast)
586 {
587     if (fast || cache->nfree_nodes < 32) {
588         nxt_queue_insert_head(&cache->free_nodes, &node->link);
589         cache->nfree_nodes++;
590         return;
591     }
592 
593     nxt_cache_unlock(cache);
594 
595     cache->free(cache->data, node);
596 
597     nxt_cache_lock(cache);
598 }
599 
600 
601 static nxt_cache_query_wait_t *
nxt_cache_query_wait_alloc(nxt_cache_t * cache,nxt_bool_t * slow)602 nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow)
603 {
604     nxt_cache_query_wait_t  *qw;
605 
606     qw = cache->free_query_wait;
607 
608     if (nxt_fast_path(qw != NULL)) {
609         cache->free_query_wait = qw->next;
610         cache->nfree_query_wait--;
611 
612         *slow = 0;
613         return qw;
614     }
615 
616     nxt_cache_unlock(cache);
617 
618     qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t));
619     *slow = 1;
620 
621     nxt_cache_lock(cache);
622 
623     return qw;
624 }
625 
626 
627 static void
nxt_cache_query_wait_free(nxt_cache_t * cache,nxt_cache_query_wait_t * qw)628 nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw)
629 {
630     if (cache->nfree_query_wait < 32) {
631         qw->next = cache->free_query_wait;
632         cache->free_query_wait = qw;
633         cache->nfree_query_wait++;
634         return;
635     }
636 
637     nxt_cache_unlock(cache);
638 
639     cache->free(cache->data, qw);
640 
641     nxt_cache_lock(cache);
642 }
643