142 #include <sys/socket.h>
143 #include <netinet/in.h>
144 #include <arpa/inet.h>
152 #include <postgres_ext.h>
153 #include <libpq-fe.h>
176 #define STATACTIVE (1<<0)
177 #define STATFAIL (1<<1)
178 #define STATUNTRIED (1<<2)
180 #define TYPEUNIX (1<<0)
181 #define TYPEINET (1<<1)
182 #define TYPECONNSTRING (1<<2)
184 #define RETRY_CONN_MAX 100
185 #define RETRY_CONN_INTV 60
186 #define IDLE_CONN_INTV 60
221 #define PGSQL_RES PGresult
224 static PLPGSQL *plpgsql_init(
ARGV *);
225 static PGSQL_RES *plpgsql_query(DICT_PGSQL *,
const char *,
VSTRING *,
char *,
227 static void plpgsql_dealloc(PLPGSQL *);
228 static void plpgsql_close_host(HOST *);
229 static void plpgsql_down_host(HOST *);
230 static void plpgsql_connect_single(HOST *,
char *,
char *,
char *);
231 static const char *dict_pgsql_lookup(
DICT *,
const char *);
233 static void dict_pgsql_close(
DICT *);
234 static HOST *host_init(
const char *);
238 static void dict_pgsql_quote(
DICT *dict,
const char *name,
VSTRING *result)
240 DICT_PGSQL *dict_pgsql = (DICT_PGSQL *) dict;
241 HOST *active_host = dict_pgsql->active_host;
242 char *myname =
"dict_pgsql_quote";
243 size_t len = strlen(name);
247 if (active_host == 0)
248 msg_panic(
"%s: bogus dict_pgsql->active_host", myname);
255 msg_panic(
"%s: arithmetic overflow in %lu+2*%lu+1",
257 (
unsigned long) len);
258 buflen = 2 * len + 1;
265 if (active_host->stat == STATFAIL)
300 PQescapeStringConn(active_host->db,
vstring_end(result), name, len, &err);
310 msg_warn(
"dict pgsql: (host %s) cannot escape input string: %s",
311 active_host->hostname, PQerrorMessage(active_host->db));
312 active_host->stat = STATFAIL;
319 static const char *dict_pgsql_lookup(
DICT *dict,
const char *name)
321 const char *myname =
"dict_pgsql_lookup";
322 PGSQL_RES *query_res;
323 DICT_PGSQL *dict_pgsql;
334 dict_pgsql = (DICT_PGSQL *) dict;
336 #define INIT_VSTR(buf, len) do { \
338 buf = vstring_alloc(len); \
339 VSTRING_RESET(buf); \
340 VSTRING_TERMINATE(buf); \
343 INIT_VSTR(query, 10);
344 INIT_VSTR(result, 10);
365 msg_info(
"%s: Skipping lookup of '%s'", myname, name);
384 if ((query_res = plpgsql_query(dict_pgsql, name, query,
386 dict_pgsql->username,
387 dict_pgsql->password)) == 0) {
391 numrows = PQntuples(query_res);
393 msg_info(
"%s: retrieved %d rows", myname, numrows);
398 numcols = PQnfields(query_res);
400 for (expansion = i = 0; i < numrows && dict->
error == 0; i++) {
401 for (j = 0; j < numcols; j++) {
402 r = PQgetvalue(query_res, i, j);
405 && dict_pgsql->expansion_limit > 0
406 && ++expansion > dict_pgsql->expansion_limit) {
407 msg_warn(
"%s: %s: Expansion limit exceeded for key: '%s'",
408 myname, dict_pgsql->parser->name, name);
416 return ((dict->
error == 0 && *r) ? r : 0);
421 static int dict_pgsql_check_stat(HOST *host,
unsigned stat,
unsigned type,
424 if ((host->stat & stat) && (!type || host->type & type)) {
426 if (host->stat == STATFAIL && host->ts > 0 && host->ts >= t)
435 static HOST *dict_pgsql_find_host(PLPGSQL *PLDB,
unsigned stat,
unsigned type)
442 t = time((time_t *) 0);
443 for (i = 0; i < PLDB->len_hosts; i++) {
444 if (dict_pgsql_check_stat(PLDB->db_hosts[i], stat, type, t))
452 for (i = 0; i < PLDB->len_hosts; i++) {
453 if (dict_pgsql_check_stat(PLDB->db_hosts[i], stat, type, t) &&
455 return PLDB->db_hosts[i];
463 static HOST *dict_pgsql_get_active(PLPGSQL *PLDB,
char *dbname,
466 const char *myname =
"dict_pgsql_get_active";
468 int count = RETRY_CONN_MAX;
471 if ((host = dict_pgsql_find_host(PLDB, STATACTIVE, TYPEUNIX)) != NULL ||
472 (host = dict_pgsql_find_host(PLDB, STATACTIVE, TYPEINET)) != NULL ||
473 (host = dict_pgsql_find_host(PLDB, STATACTIVE, TYPECONNSTRING)) != NULL) {
475 msg_info(
"%s: found active connection to host %s", myname,
485 while (--count > 0 &&
486 ((host = dict_pgsql_find_host(PLDB, STATUNTRIED | STATFAIL,
487 TYPEUNIX)) != NULL ||
488 (host = dict_pgsql_find_host(PLDB, STATUNTRIED | STATFAIL,
489 TYPEINET)) != NULL ||
490 (host = dict_pgsql_find_host(PLDB, STATUNTRIED | STATFAIL,
491 TYPECONNSTRING)) != NULL)) {
493 msg_info(
"%s: attempting to connect to host %s", myname,
495 plpgsql_connect_single(host, dbname, username, password);
496 if (host->stat == STATACTIVE)
506 static void dict_pgsql_event(
int unused_event,
void *context)
508 HOST *host = (HOST *) context;
511 plpgsql_close_host(host);
521 static PGSQL_RES *plpgsql_query(DICT_PGSQL *dict_pgsql,
528 PLPGSQL *PLDB = dict_pgsql->pldb;
531 ExecStatusType status;
533 while ((host = dict_pgsql_get_active(PLDB, dbname, username, password)) != NULL) {
539 dict_pgsql->active_host = host;
543 name, 0, query, dict_pgsql_quote);
544 dict_pgsql->active_host = 0;
547 if (host->stat == STATFAIL) {
548 plpgsql_down_host(host);
562 if ((res = PQexec(host->db,
vstring_str(query))) != 0) {
578 switch ((status = PQresultStatus(res))) {
579 case PGRES_TUPLES_OK:
580 case PGRES_COMMAND_OK:
583 msg_info(
"dict_pgsql: successful query from host %s",
588 case PGRES_FATAL_ERROR:
589 msg_warn(
"pgsql query failed: fatal error from host %s: %s",
590 host->hostname, PQresultErrorMessage(res));
592 case PGRES_BAD_RESPONSE:
593 msg_warn(
"pgsql query failed: protocol error, host %s",
597 msg_warn(
"pgsql query failed: unknown code 0x%lx from host %s",
598 (
unsigned long) status, host->hostname);
608 msg_warn(
"pgsql query failed: fatal error from host %s: %s",
609 host->hostname, PQerrorMessage(host->db));
617 plpgsql_down_host(host);
628 static void plpgsql_connect_single(HOST *host,
char *dbname,
char *username,
char *password)
630 if (host->type == TYPECONNSTRING) {
631 host->db = PQconnectdb(host->name);
633 host->db = PQsetdbLogin(host->name, host->port, NULL, NULL,
634 dbname, username, password);
636 if (host->db == NULL || PQstatus(host->db) != CONNECTION_OK) {
637 msg_warn(
"connect to pgsql server %s: %s",
638 host->hostname, PQerrorMessage(host->db));
639 plpgsql_down_host(host);
643 msg_info(
"dict_pgsql: successful connection to host %s",
651 if (PQsetClientEncoding(host->db,
"LATIN1") != 0) {
652 msg_warn(
"dict_pgsql: cannot set the encoding to LATIN1, skipping %s",
654 plpgsql_down_host(host);
658 host->stat = STATACTIVE;
663 static void plpgsql_close_host(HOST *host)
668 host->stat = STATUNTRIED;
675 static void plpgsql_down_host(HOST *host)
680 host->ts = time((time_t *) 0) + RETRY_CONN_INTV;
681 host->stat = STATFAIL;
687 static void pgsql_parse_config(DICT_PGSQL *dict_pgsql,
const char *pgsqlcf)
689 const char *myname =
"pgsql_parse_config";
693 char *select_function;
695 dict_pgsql->username =
cfg_get_str(p,
"user",
"", 0, 0);
696 dict_pgsql->password =
cfg_get_str(p,
"password",
"", 0, 0);
697 dict_pgsql->dbname =
cfg_get_str(p,
"dbname",
"", 1, 0);
698 dict_pgsql->result_format =
cfg_get_str(p,
"result_format",
"%s", 1, 0);
704 dict_pgsql->expansion_limit =
cfg_get_int(dict_pgsql->parser,
705 "expansion_limit", 0, 0, 0);
707 if ((dict_pgsql->query =
cfg_get_str(p,
"query", 0, 0, 0)) == 0) {
714 select_function =
cfg_get_str(p,
"select_function", 0, 0, 0);
715 if (select_function != 0) {
728 dict_pgsql->query, 1);
729 (void)
db_common_parse(0, &dict_pgsql->ctx, dict_pgsql->result_format, 0);
746 if (dict_pgsql->hosts->argc == 0) {
750 msg_info(
"%s: %s: no hostnames specified, defaulting to '%s'",
751 myname, pgsqlcf, dict_pgsql->hosts->argv[0]);
760 DICT_PGSQL *dict_pgsql;
766 if (open_flags != O_RDONLY)
768 "%s:%s map requires O_RDONLY access mode",
776 "open %s: %m", name));
780 dict_pgsql->dict.
lookup = dict_pgsql_lookup;
781 dict_pgsql->dict.close = dict_pgsql_close;
782 dict_pgsql->dict.flags = dict_flags;
783 dict_pgsql->parser = parser;
784 pgsql_parse_config(dict_pgsql, name);
785 dict_pgsql->active_host = 0;
786 dict_pgsql->pldb = plpgsql_init(dict_pgsql->hosts);
787 if (dict_pgsql->pldb == NULL)
788 msg_fatal(
"couldn't initialize pldb!\n");
795 static PLPGSQL *plpgsql_init(
ARGV *hosts)
800 PLDB = (PLPGSQL *)
mymalloc(
sizeof(PLPGSQL));
801 PLDB->len_hosts = hosts->
argc;
802 PLDB->db_hosts = (HOST **)
mymalloc(
sizeof(HOST *) * hosts->
argc);
803 for (i = 0; i < hosts->
argc; i++)
804 PLDB->db_hosts[i] = host_init(hosts->
argv[i]);
812 static HOST *host_init(
const char *hostname)
814 const char *myname =
"pgsql host_init";
815 HOST *host = (HOST *)
mymalloc(
sizeof(HOST));
816 const char *d = hostname;
819 host->hostname =
mystrdup(hostname);
820 host->stat = STATUNTRIED;
826 if (strncmp(d,
"postgresql:", 11) == 0) {
827 host->type = TYPECONNSTRING;
838 if (strncmp(d,
"unix:", 5) == 0 || strncmp(d,
"inet:", 5) == 0)
841 if (host->name[0] && host->name[0] !=
'/') {
842 host->type = TYPEINET;
845 host->type = TYPEUNIX;
850 msg_info(
"%s: host=%s, port=%s, type=%s", myname, host->name,
851 host->port ? host->port :
"",
852 host->type == TYPEUNIX ?
"unix" :
853 host->type == TYPEINET ?
"inet" :
860 static void dict_pgsql_close(
DICT *dict)
862 DICT_PGSQL *dict_pgsql = (DICT_PGSQL *) dict;
864 plpgsql_dealloc(dict_pgsql->pldb);
866 myfree(dict_pgsql->username);
867 myfree(dict_pgsql->password);
868 myfree(dict_pgsql->dbname);
869 myfree(dict_pgsql->query);
870 myfree(dict_pgsql->result_format);
871 if (dict_pgsql->hosts)
882 static void plpgsql_dealloc(PLPGSQL *PLDB)
886 for (i = 0; i < PLDB->len_hosts; i++) {
888 if (PLDB->db_hosts[i]->db)
889 PQfinish(PLDB->db_hosts[i]->db);
890 myfree(PLDB->db_hosts[i]->hostname);
891 myfree(PLDB->db_hosts[i]->name);
892 myfree((
void *) PLDB->db_hosts[i]);
894 myfree((
void *) PLDB->db_hosts);
char * mystrdup(const char *str)
ARGV * argv_free(ARGV *argvp)
NORETURN msg_panic(const char *fmt,...)
void db_common_sql_build_query(VSTRING *query, CFG_PARSER *parser)
void argv_add(ARGV *argvp,...)
int cfg_get_int(const CFG_PARSER *parser, const char *name, int defval, int min, int max)
#define DICT_FLAG_FOLD_FIX
void db_common_free_ctx(void *ctxPtr)
VSTRING * vstring_strcpy(VSTRING *vp, const char *src)
#define VSTRING_TERMINATE(vp)
int db_common_dict_partial(void *ctxPtr)
void db_common_parse_domain(CFG_PARSER *parser, void *ctxPtr)
int db_common_expand(void *ctxArg, const char *format, const char *value, const char *key, VSTRING *result, db_quote_callback_t quote_func)
#define VSTRING_RESET(vp)
void msg_warn(const char *fmt,...)
VSTRING * vstring_alloc(ssize_t len)
const char * username(void)
DICT * dict_pgsql_open(const char *name, int unused_flags, int dict_flags)
VSTRING * vstring_sprintf(VSTRING *vp, const char *format,...)
int db_common_check_domain(void *ctxPtr, const char *addr)
char * lowercase(char *string)
const char *(* lookup)(struct DICT *, const char *)
NORETURN msg_fatal(const char *fmt,...)
#define DICT_ERR_VAL_RETURN(dict, err, val)
char * cfg_get_str(const CFG_PARSER *parser, const char *name, const char *defval, int min, int max)
#define DICT_FLAG_PATTERN
ARGV * argv_split(const char *, const char *)
CFG_PARSER * cfg_parser_free(CFG_PARSER *parser)
#define VSTRING_SPACE(vp, len)
CFG_PARSER * cfg_parser_alloc(const char *pname)
VSTRING * vstring_free(VSTRING *vp)
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay)
int db_common_parse(DICT *dict, void **ctxPtr, const char *format, int query)
DICT * dict_alloc(const char *, const char *, ssize_t)
int event_cancel_timer(EVENT_NOTIFY_TIME_FN callback, void *context)
char * split_at_right(char *string, int delimiter)
#define cfg_get_owner(cfg)
char * vstring_export(VSTRING *vp)
DICT * dict_surrogate(const char *dict_type, const char *dict_name, int open_flags, int dict_flags, const char *fmt,...)
void * mymalloc(ssize_t len)
void argv_terminate(ARGV *argvp)
void msg_info(const char *fmt,...)