239 #define DC_FLAG_DEL_SAVED_CURRENT_KEY (1<<0)
244 #define DC_DEF_LOG_DELAY 1
249 #define DC_SCHEDULE_FOR_DELETE_BEHIND(cp) \
250 ((cp)->cache_flags |= DC_FLAG_DEL_SAVED_CURRENT_KEY)
252 #define DC_MATCH_SAVED_CURRENT_KEY(cp, cache_key) \
253 ((cp)->saved_curr_key && strcmp((cp)->saved_curr_key, (cache_key)) == 0)
255 #define DC_IS_SCHEDULED_FOR_DELETE_BEHIND(cp) \
257 ((cp)->cache_flags & DC_FLAG_DEL_SAVED_CURRENT_KEY) != 0)
259 #define DC_CANCEL_DELETE_BEHIND(cp) \
260 ((cp)->cache_flags &= ~DC_FLAG_DEL_SAVED_CURRENT_KEY)
265 #define DC_LAST_CACHE_CLEANUP_COMPLETED "_LAST_CACHE_CLEANUP_COMPLETED_"
271 const char *myname =
"dict_cache_lookup";
272 const char *cache_val;
282 msg_info(
"%s: key=%s (pretend not found - scheduled for deletion)",
286 cache_val =
dict_get(db, cache_key);
287 if (cache_val == 0 && db->
error != 0)
289 "%s: cache lookup for '%s' failed due to error",
290 cp->
name, cache_key);
292 msg_info(
"%s: key=%s value=%s", myname, cache_key,
293 cache_val ? cache_val : db->
error ?
294 "error" :
"(not found)");
302 const char *cache_val)
304 const char *myname =
"dict_cache_update";
314 msg_info(
"%s: cancel delete-behind for key=%s", myname, cache_key);
318 msg_info(
"%s: key=%s value=%s", myname, cache_key, cache_val);
319 put_res =
dict_put(db, cache_key, cache_val);
322 "%s: could not update entry for %s", cp->
name, cache_key);
330 const char *myname =
"dict_cache_delete";
342 msg_info(
"%s: key=%s (current entry - schedule for delete-behind)",
349 "%s: could not delete entry for %s", cp->
name, cache_key);
351 msg_info(
"%s: key=%s (%s)", myname, cache_key,
352 del_res == 0 ?
"found" :
353 db->
error ?
"error" :
"not found");
361 const char **cache_key,
362 const char **cache_val)
364 const char *myname =
"dict_cache_sequence";
366 const char *raw_cache_key;
367 const char *raw_cache_val;
368 char *previous_curr_key;
369 char *previous_curr_val;
376 seq_res =
dict_seq(db, first_next, &raw_cache_key, &raw_cache_val);
382 msg_info(
"%s: key=%s value=%s", myname,
383 seq_res == 0 ? raw_cache_key : db->
error ?
384 "(error)" :
"(not found)",
385 seq_res == 0 ? raw_cache_val : db->
error ?
386 "(error)" :
"(not found)");
389 "%s: sequence error", cp->
name);
415 msg_info(
"%s: delete-behind key=%s value=%s",
416 myname, previous_curr_key, previous_curr_val);
417 if (
dict_del(db, previous_curr_key) != 0)
419 "%s: could not delete entry for %s",
420 cp->
name, previous_curr_key);
426 if (previous_curr_key)
427 myfree(previous_curr_key);
428 if (previous_curr_val)
429 myfree(previous_curr_val);
434 *cache_key = (cp)->saved_curr_key;
435 *cache_val = (cp)->saved_curr_val;
441 static void dict_cache_delete_behind_reset(
DICT_CACHE *cp)
443 #define FREE_AND_WIPE(s) do { if (s) { myfree(s); (s) = 0; } } while (0)
452 static void dict_cache_clean_stat_log_reset(
DICT_CACHE *cp,
453 const char *full_partial)
456 msg_info(
"cache %s %s cleanup: retained=%d dropped=%d entries",
463 static void dict_cache_clean_event(
int unused_event,
void *cache_context)
465 const char *myname =
"dict_cache_clean_event";
467 const char *cache_key;
468 const char *cache_val;
486 msg_info(
"%s: start %s cache cleanup", myname, cp->
name);
504 msg_info(
"%s: drop %s cache entry for %s",
505 myname, cp->
name, cache_key);
509 msg_info(
"%s: keep %s cache entry for %s",
510 myname, cp->
name, cache_key);
518 else if (cp->
error != 0) {
519 msg_warn(
"%s: cache cleanup scan terminated due to error", cp->
name);
520 dict_cache_clean_stat_log_reset(cp,
"partial");
524 msg_info(
"%s: done %s cache cleanup scan", myname, cp->
name);
525 dict_cache_clean_stat_log_reset(cp,
"full");
540 const char *myname =
"dict_cache_control";
541 const char *last_done;
542 time_t next_interval;
551 while ((name = va_arg(ap,
int)) > 0) {
563 msg_panic(
"%s: bad %s cache cleanup interval %d",
573 msg_panic(
"%s: bad command: %d", myname, name);
586 if (cache_cleanup_is_active)
587 msg_panic(
"%s: %s cache cleanup is already scheduled",
593 #define NEXT_START(last, delta) ((delta) + (unsigned long) atol(last))
594 #define NOW (time((time_t *) 0))
602 msg_info(
"%s cache cleanup will start after %ds",
603 cp->
name, (
int) next_interval);
605 (
int) next_interval);
611 else if (cache_cleanup_is_active) {
613 dict_cache_clean_stat_log_reset(cp,
"partial");
614 dict_cache_delete_behind_reset(cp);
630 dict =
dict_open(dbname, open_flags, dict_flags);
698 #define USAGE "\n\tTo manage settings:" \
699 "\n\tverbose <level> (verbosity level)" \
700 "\n\telapsed <level> (0=don't show elapsed time)" \
701 "\n\tlmdb_map_size <limit> (initial LMDB size limit)" \
702 "\n\tcache <type>:<name> (switch to named database)" \
703 "\n\tstatus (show map size, cache, pending requests)" \
704 "\n\n\tTo manage pending requests:" \
705 "\n\treset (discard pending requests)" \
706 "\n\trun (execute pending requests in interleaved order)" \
707 "\n\n\tTo add a pending request:" \
708 "\n\tquery <key-suffix> <count> (negative to reverse order)" \
709 "\n\tupdate <key-suffix> <count> (negative to reverse order)" \
710 "\n\tdelete <key-suffix> <count> (negative to reverse order)" \
711 "\n\tpurge <key-suffix>" \
712 "\n\tcount <key-suffix>"
718 #define DICT_CACHE_OPEN_FLAGS (DICT_FLAG_DUP_REPLACE | DICT_FLAG_SYNC_UPDATE | \
724 typedef struct DICT_CACHE_SREQ {
734 #define DICT_CACHE_SREQ_FLAG_PURGE (1<<1)
735 #define DICT_CACHE_SREQ_FLAG_REVERSE (1<<2)
737 #define DICT_CACHE_SREQ_LIMIT 10
742 typedef struct DICT_CACHE_TEST {
746 DICT_CACHE_SREQ job_list[1];
749 #define DICT_CACHE_TEST_FLAG_ITER (1<<0)
751 #define STR(x) vstring_str(x)
753 int show_elapsed = 1;
762 static NORETURN usage(
const char *progname)
764 msg_fatal(
"usage: %s (no argument)", progname);
769 static void make_tagged_key(
VSTRING *bp, DICT_CACHE_SREQ *cp)
772 msg_panic(
"make_tagged_key: bad done count: %d", cp->done);
774 msg_panic(
"make_tagged_key: bad todo count: %d", cp->todo);
776 (cp->flags & DICT_CACHE_SREQ_FLAG_REVERSE) ?
777 cp->todo - cp->done - 1 : cp->done, cp->suffix);
782 static DICT_CACHE_TEST *create_requests(
int count)
787 tp = (DICT_CACHE_TEST *)
mymalloc(
sizeof(DICT_CACHE_TEST) +
788 (count - 1) *
sizeof(DICT_CACHE_SREQ));
792 for (cp = tp->job_list; cp < tp->job_list + count; cp++) {
805 static void reset_requests(DICT_CACHE_TEST *tp)
811 for (cp = tp->job_list; cp < tp->job_list + tp->size; cp++) {
829 static void free_requests(DICT_CACHE_TEST *tp)
841 struct timeval start;
842 struct timeval finish;
843 struct timeval elapsed;
849 GETTIMEOFDAY(&start);
852 for (cp = tp->job_list; cp < tp->job_list + tp->used; cp++) {
853 if (cp->done < cp->todo) {
855 cp->action(cp, dp, bp);
859 GETTIMEOFDAY(&finish);
860 timersub(&finish, &start, &elapsed);
863 elapsed.tv_sec + elapsed.tv_usec / 1000000.0);
870 static void show_status(DICT_CACHE_TEST *tp,
DICT_CACHE *dp)
875 vstream_printf(
"lmdb_map_size\t%ld\n", (
long) dict_lmdb_map_size);
883 "cmd",
"dir",
"suffix",
"count",
"done",
"first/next");
885 for (cp = tp->job_list; cp < tp->job_list + tp->used; cp++)
889 (cp->flags & DICT_CACHE_SREQ_FLAG_REVERSE) ?
890 "reverse" :
"forward",
891 cp->suffix ? cp->suffix :
"(null)", cp->todo,
892 cp->done, cp->first_next);
901 make_tagged_key(bp, cp);
904 msg_warn(
"query_action: query failed: %s: %m",
STR(bp));
906 msg_warn(
"query_action: query failed: %s",
STR(bp));
907 }
else if (strcmp(
STR(bp), lookup) != 0) {
908 msg_warn(
"lookup result \"%s\" differs from key \"%s\"",
918 make_tagged_key(bp, cp);
921 msg_warn(
"update_action: update failed: %s: %m",
STR(bp));
923 msg_warn(
"update_action: update failed: %s",
STR(bp));
932 make_tagged_key(bp, cp);
935 msg_warn(
"delete_action: delete failed: %s: %m",
STR(bp));
937 msg_warn(
"delete_action: delete failed: %s",
STR(bp));
946 const char *cache_key;
947 const char *cache_val;
952 if (strcmp(cache_key, cache_val) != 0)
953 msg_warn(
"value \"%s\" differs from key \"%s\"",
954 cache_val, cache_key);
955 suffix = cache_key + strspn(cache_key,
"0123456789");
956 if (suffix[0] ==
'-' && strcmp(suffix + 1, cp->suffix) == 0) {
958 cp->todo = cp->done + 1;
959 if ((cp->flags & DICT_CACHE_SREQ_FLAG_PURGE)
962 msg_warn(
"purge_action: delete failed: %s: %m",
STR(bp));
964 msg_warn(
"purge_action: delete failed: %s",
STR(bp));
969 what = (cp->flags & DICT_CACHE_SREQ_FLAG_PURGE) ?
"purge" :
"count";
971 msg_warn(
"%s error after %d: %m", what, cp->done);
981 typedef struct DICT_CACHE_SREQ_INFO {
987 } DICT_CACHE_SREQ_INFO;
989 static DICT_CACHE_SREQ_INFO req_info[] = {
990 {
"query", 3, query_action},
991 {
"update", 3, update_action},
992 {
"delete", 3, delete_action},
993 {
"count", 2, iter_action, DICT_CACHE_TEST_FLAG_ITER},
994 {
"purge", 2, iter_action, DICT_CACHE_TEST_FLAG_ITER, DICT_CACHE_SREQ_FLAG_PURGE},
1000 static void add_request(DICT_CACHE_TEST *tp,
ARGV *argv)
1002 DICT_CACHE_SREQ_INFO *rp;
1003 DICT_CACHE_SREQ *cp;
1006 char *cmd = argv->
argv[0];
1007 char *suffix = (argv->
argc > 1 ? argv->
argv[1] : 0);
1008 char *todo = (argv->
argc > 2 ? argv->
argv[2] :
"1");
1010 if (tp->used >= tp->size) {
1011 msg_warn(
"%s: request list is full", cmd);
1014 for (rp = req_info; ; rp++) {
1015 if (rp->name == 0) {
1019 if (strcmp(rp->name, argv->
argv[0]) == 0
1020 && rp->argc == argv->
argc)
1023 req_flags = rp->req_flags;
1024 if (todo[0] ==
'-') {
1025 req_flags |= DICT_CACHE_SREQ_FLAG_REVERSE;
1028 if (!
alldig(todo) || (count = atoi(todo)) == 0) {
1029 msg_warn(
"%s: bad count: %s", cmd, todo);
1032 if (tp->flags & rp->test_flags) {
1033 msg_warn(
"%s: command conflicts with other command", cmd);
1036 tp->flags |= rp->test_flags;
1037 cp = tp->job_list + tp->used;
1039 cp->action = rp->action;
1043 cp->flags = req_flags;
1050 int main(
int argc,
char **argv)
1052 DICT_CACHE_TEST *test_job;
1064 test_job = create_requests(DICT_CACHE_SREQ_LIMIT);
1066 stdin_is_tty = isatty(0);
1076 if (!stdin_is_tty) {
1088 if (strcmp(args->
argv[0],
"verbose") == 0 && args->
argc == 2) {
1090 }
else if (strcmp(args->
argv[0],
"elapsed") == 0 && args->
argc == 2) {
1091 show_elapsed = atoi(args->
argv[1]);
1093 }
else if (strcmp(args->
argv[0],
"lmdb_map_size") == 0 && args->
argc == 2) {
1094 dict_lmdb_map_size = atol(args->
argv[1]);
1096 }
else if (strcmp(args->
argv[0],
"cache") == 0 && args->
argc == 2) {
1100 DICT_CACHE_OPEN_FLAGS);
1101 }
else if (strcmp(args->
argv[0],
"reset") == 0 && args->
argc == 1) {
1102 reset_requests(test_job);
1103 }
else if (strcmp(args->
argv[0],
"run") == 0 && args->
argc == 1) {
1104 run_requests(test_job, cache, inbuf);
1105 }
else if (strcmp(args->
argv[0],
"status") == 0 && args->
argc == 1) {
1106 show_status(test_job, cache);
1108 add_request(test_job, args);
1115 free_requests(test_job);
#define vstring_fgets_nonl(s, p)
#define DC_IS_SCHEDULED_FOR_DELETE_BEHIND(cp)
char * mystrdup(const char *str)
#define dict_put(dp, key, val)
#define DC_MATCH_SAVED_CURRENT_KEY(cp, cache_key)
ARGV * argv_free(ARGV *argvp)
#define DC_SCHEDULE_FOR_DELETE_BEHIND(cp)
#define DICT_SEQ_FUN_FIRST
NORETURN msg_panic(const char *fmt,...)
const char * dict_cache_lookup(DICT_CACHE *cp, const char *cache_key)
#define DICT_SEQ_FUN_NEXT
int main(int argc, char **argv)
void msg_rate_delay(time_t *stamp, int delay, void(*log_fn)(const char *,...), const char *fmt,...)
DICT * dict_open(const char *, int, int)
size_t dict_lmdb_map_size
int alldig(const char *string)
int(* DICT_CACHE_VALIDATOR_FN)(const char *, const char *, void *)
#define DICT_CACHE_CTL_VALIDATOR
#define DICT_STAT_SUCCESS
#define DICT_CACHE_FLAG_VERBOSE
#define dict_get(dp, key)
int dict_cache_sequence(DICT_CACHE *cp, int first_next, const char **cache_key, const char **cache_val)
#define DICT_CACHE_CTL_END
#define dict_seq(dp, f, key, val)
VSTREAM * vstream_printf(const char *fmt,...)
void msg_warn(const char *fmt,...)
int dict_cache_delete(DICT_CACHE *cp, const char *cache_key)
VSTRING * vstring_alloc(ssize_t len)
#define DICT_CACHE_CTL_CONTEXT
void dict_cache_close(DICT_CACHE *cp)
VSTRING * vstring_sprintf(VSTRING *vp, const char *format,...)
const char * dict_cache_name(DICT_CACHE *cp)
#define DC_LAST_CACHE_CLEANUP_COMPLETED
#define DC_CANCEL_DELETE_BEHIND(cp)
NORETURN msg_fatal(const char *fmt,...)
#define DICT_ERR_VAL_RETURN(dict, err, val)
int vstream_fflush(VSTREAM *stream)
DICT_CACHE_VALIDATOR_FN exp_validator
#define timersub(a, b, res)
ARGV * argv_split(const char *, const char *)
#define NEXT_START(last, delta)
#define DICT_CACHE_FLAG_STATISTICS
VSTRING * vstring_free(VSTRING *vp)
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay)
void msg_vstream_init(const char *name, VSTREAM *vp)
int dict_cache_update(DICT_CACHE *cp, const char *cache_key, const char *cache_val)
DICT_CACHE * dict_cache_open(const char *dbname, int open_flags, int dict_flags)
#define DICT_CACHE_CTL_INTERVAL
#define dict_del(dp, key)
#define DICT_CACHE_CTL_FLAGS
int event_cancel_timer(EVENT_NOTIFY_TIME_FN callback, void *context)
void * mymalloc(ssize_t len)
void msg_info(const char *fmt,...)
void dict_cache_control(DICT_CACHE *cp,...)