161 #include <sys/stat.h>
162 #include <sys/time.h>
224 #define FLUSH_DUP_FILTER_SIZE 10000
229 #define STR(x) vstring_str(x)
230 #define STREQ(x,y) ((x) == (y) || strcmp(x,y) == 0)
236 static int flush_add_path(
const char *,
const char *);
237 static int flush_send_path(
const char *,
int);
258 #define REFRESH_ONLY 0
259 #define UNTHROTTLE_BEFORE (1<<0)
260 #define UNTHROTTLE_AFTER (1<<1)
264 static VSTRING *flush_site_to_path(
VSTRING *path,
const char *site)
287 for (ptr = site; (ch = *(
unsigned const char *) ptr) != 0; ptr++)
302 static int flush_add_service(
const char *site,
const char *queue_id)
304 const char *myname =
"flush_add_service";
309 msg_info(
"%s: site %s queue_id %s", myname, site, queue_id);
320 if ((site_path = flush_site_to_path((
VSTRING *) 0, site)) == 0)
321 return (FLUSH_STAT_DENY);
322 status = flush_add_path(
STR(site_path), queue_id);
330 static int flush_add_path(
const char *path,
const char *queue_id)
332 const char *myname =
"flush_add_path";
345 O_CREAT | O_APPEND | O_WRONLY, 0600)) == 0)
346 msg_fatal(
"%s: open fast flush logfile %s: %m", myname, path);
354 msg_fatal(
"%s: lock fast flush logfile %s: %m", myname, path);
364 msg_warn(
"write fast flush logfile %s: %m", path);
370 msg_fatal(
"%s: unlock fast flush logfile %s: %m", myname, path);
372 msg_warn(
"write fast flush logfile %s: %m", path);
379 static int flush_send_service(
const char *site,
int how)
381 const char *myname =
"flush_send_service";
386 msg_info(
"%s: site %s", myname, site);
397 if ((site_path = flush_site_to_path((
VSTRING *) 0, site)) == 0)
398 return (FLUSH_STAT_DENY);
399 status = flush_send_path(
STR(site_path), how);
407 static int flush_one_file(
const char *queue_id,
VSTRING *queue_file,
408 struct utimbuf * tbuf,
int how)
410 const char *myname =
"flush_one_file";
411 const char *queue_name;
421 if (utime(path, tbuf) == 0)
424 msg_warn(
"%s: update %s time stamps: %m", myname, path);
451 if ((fp =
safe_open(path, O_RDWR, 0, &st, -1, -1, why)) != 0)
454 msg_warn(
"%s: open %s: %s", myname, path,
STR(why));
466 msg_warn(
"%s: fchmod %s: %m", myname, path);
476 msg_warn(
"%s: rename from %s to %s: %m",
487 static int flush_send_path(
const char *path,
int how)
489 const char *myname =
"flush_send_path";
494 static char qmgr_flush_trigger[] = {
497 static char qmgr_scan_trigger[] = {
515 msg_fatal(
"%s: open fast flush logfile %s: %m", myname, path);
526 msg_fatal(
"%s: lock fast flush logfile %s: %m", myname, path);
543 qmgr_flush_trigger,
sizeof(qmgr_flush_trigger));
568 msg_warn(
"bad queue id \"%.30s...\" in fast flush logfile %s",
569 STR(queue_id), path);
575 msg_info(
"%s: logfile %s: update queue file %s time stamps",
576 myname, path,
STR(queue_id));
579 count += flush_one_file(
STR(queue_id), queue_file, &tbuf, how);
582 msg_info(
"%s: logfile %s: skip queue file %s as duplicate",
583 myname, path,
STR(queue_file));
594 msg_fatal(
"%s: truncate fast flush logfile %s: %m", myname, path);
605 msg_fatal(
"%s: unlock fast flush logfile %s: %m", myname, path);
607 msg_warn(
"%s: read fast flush logfile %s: %m", myname, path);
610 msg_info(
"%s: requesting delivery for logfile %s", myname, path);
612 qmgr_scan_trigger,
sizeof(qmgr_scan_trigger));
619 static int flush_send_file_service(
const char *queue_id)
621 const char *myname =
"flush_send_file_service";
624 static char qmgr_scan_trigger[] = {
635 msg_info(
"%s: requesting delivery for queue_id %s", myname, queue_id);
639 if (flush_one_file(queue_id, queue_file, &tbuf, UNTHROTTLE_AFTER) > 0)
641 qmgr_scan_trigger,
sizeof(qmgr_scan_trigger));
649 static int flush_refresh_service(
int max_age)
651 const char *myname =
"flush_refresh_service";
662 if (
stat(
STR(path), &st) < 0) {
669 if (st.st_size == 0) {
671 if (unlink(
STR(path)) < 0)
674 msg_info(
"%s: unlink %s, empty and unchanged for %d days",
677 msg_info(
"%s: skip logfile %s - empty log", myname, site_path);
678 }
else if (st.st_atime + max_age <
event_time()) {
680 msg_info(
"%s: flush logfile %s", myname, site_path);
684 msg_info(
"%s: skip logfile %s, unread for <%d hours(s) ",
685 myname, site_path, max_age / 3600);
696 static int flush_request_receive(
VSTREAM *client_stream,
VSTRING *request)
704 msg_warn(
"timeout while waiting for data from %s",
709 msg_warn(
"cannot examine read buffer of %s: %m",
719 msg_warn(
"end-of-input while reading request from %s: %m",
741 static void flush_service(
VSTREAM *client_stream,
char *unused_service,
747 static char wakeup[] = {
757 msg_fatal(
"unexpected command-line argument: %s", argv[0]);
768 if (flush_request_receive(client_stream, request) == 0) {
777 status = flush_add_service(
STR(site),
STR(queue_id));
786 status = flush_send_service(
STR(site), UNTHROTTLE_BEFORE);
795 status = flush_send_file_service(
STR(queue_id));
811 (void) flush_refresh_service(0);
826 static void pre_jail_init(
char *unused_name,
char **unused_argv)
837 int main(
int argc,
char **argv)
void htable_free(HTABLE *table, void(*free_fn)(void *))
#define CA_MAIL_SERVER_UNLIMITED
#define MATCH_FLAG_RETURN
char * var_fflush_domains
int match_parent_style(const char *name)
int vstring_get_null(VSTRING *vp, VSTREAM *fp)
int vstring_get_nonl(VSTRING *vp, VSTREAM *fp)
int mail_queue_id_ok(const char *queue_id)
#define DEF_FFLUSH_REFRESH
#define domain_list_match
#define MYFLOCK_OP_EXCLUSIVE
#define MAIL_QUEUE_STAT_READY
int mail_trigger(const char *, const char *, const char *, ssize_t)
#define MAIL_QUEUE_STAT_UNTHROTTLE
#define VSTRING_TERMINATE(vp)
const char * mail_queue_path(VSTRING *buf, const char *queue_name, const char *queue_id)
SCAN_DIR * scan_dir_open(const char *path)
HTABLE * htable_create(ssize_t size)
#define MAIL_CLASS_PUBLIC
#define VAR_FFLUSH_REFRESH
#define VSTRING_ADDCH(vp, ch)
VSTREAM * vstream_fprintf(VSTREAM *stream, const char *fmt,...)
int main(int argc, char **argv)
int vstream_fclose(VSTREAM *stream)
SCAN_DIR * scan_dir_close(SCAN_DIR *scan)
VSTREAM * mail_queue_open(const char *queue_name, const char *queue_id, int flags, mode_t mode)
int mail_queue_rename(const char *queue_id, const char *old_queue, const char *new_queue)
#define MAIL_QUEUE_INCOMING
#define read_wait(fd, timeout)
void msg_warn(const char *fmt,...)
#define FLUSH_REQ_SEND_SITE
VSTRING * vstring_alloc(ssize_t len)
int myflock(int fd, int lock_style, int operation)
#define FLUSH_REQ_SEND_FILE
#define MAIL_VERSION_STAMP_ALLOCATE
void * htable_find(HTABLE *table, const char *key)
#define CA_MAIL_SERVER_TIME_TABLE(v)
#define SEND_ATTR_INT(name, val)
NORETURN msg_fatal(const char *fmt,...)
#define FLUSH_DUP_FILTER_SIZE
#define domain_list_init(o, f, p)
int vstream_fflush(VSTREAM *stream)
#define FLUSH_REQ_REFRESH
#define QMGR_REQ_SCAN_INCOMING
#define MAIL_QUEUE_DEFERRED
const char * midna_domain_to_ascii(const char *name)
VSTREAM * safe_open(const char *path, int flags, mode_t mode, struct stat *st, uid_t user, gid_t group, VSTRING *why)
#define TRIGGER_REQ_WAKEUP
VSTRING * vstring_free(VSTRING *vp)
NORETURN single_server_main(int, char **, SINGLE_SERVER_FN,...)
#define vstream_fileno(vp)
#define UNTHROTTLE_BEFORE
#define MAIL_ATTR_QUEUEID
#define QMGR_REQ_FLUSH_DEAD
MAIL_VERSION_STAMP_DECLARE
#define VAR_FFLUSH_DOMAINS
char * mail_scan_dir_next(SCAN_DIR *scan)
#define CA_MAIL_SERVER_PRE_INIT(v)
#define RECV_ATTR_STR(name, val)
HTABLE_INFO * htable_enter(HTABLE *table, const char *key, void *value)
void msg_info(const char *fmt,...)