1 /*++
2 /* NAME
3 /* qmgr_queue 3
5 /* per-destination queues
7 /* #include "qmgr.h"
8 /*
9 /* int qmgr_queue_count;
10 /*
11 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
12 /* QMGR_TRANSPORT *transport;
13 /* const char *name;
14 /* const char *nexthop;
15 /*
16 /* void qmgr_queue_done(queue)
17 /* QMGR_QUEUE *queue;
18 /*
19 /* QMGR_QUEUE *qmgr_queue_find(transport, name)
20 /* QMGR_TRANSPORT *transport;
21 /* const char *name;
22 /*
23 /* void qmgr_queue_throttle(queue, dsn)
24 /* QMGR_QUEUE *queue;
25 /* DSN *dsn;
26 /*
27 /* void qmgr_queue_unthrottle(queue)
28 /* QMGR_QUEUE *queue;
29 /*
30 /* void qmgr_queue_suspend(queue, delay)
31 /* QMGR_QUEUE *queue;
32 /* int delay;
34 /* These routines add/delete/manipulate per-destination queues.
35 /* Each queue corresponds to a specific transport and destination.
36 /* Each queue has a `todo' list of delivery requests for that
37 /* destination, and a `busy' list of delivery requests in progress.
38 /*
39 /* qmgr_queue_count is a global counter for the total number
40 /* of in-core queue structures.
41 /*
42 /* qmgr_queue_create() creates an empty named queue for the named
43 /* transport and destination. The queue is given an initial
44 /* concurrency limit as specified with the
45 /* \fIinitial_destination_concurrency\fR configuration parameter,
46 /* provided that it does not exceed the transport-specific
47 /* concurrency limit.
48 /*
49 /* qmgr_queue_done() disposes of a per-destination queue after all
50 /* its entries have been taken care of. It is an error to dispose
51 /* of a dead queue.
52 /*
53 /* qmgr_queue_find() looks up the named queue for the named
54 /* transport. A null result means that the queue was not found.
55 /*
56 /* qmgr_queue_throttle() handles a delivery error, and decrements the
57 /* concurrency limit for the destination, with a lower bound of 1.
58 /* When the cohort failure bound is reached, qmgr_queue_throttle()
59 /* sets the concurrency limit to zero and starts a timer
60 /* to re-enable delivery to the destination after a configurable delay.
61 /*
62 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
63 /* The concurrency limit for the destination is incremented,
64 /* provided that it does not exceed the destination concurrency
65 /* limit specified for the transport. This routine implements
66 /* "slow open" mode, and eliminates the "thundering herd" problem.
67 /*
68 /* qmgr_queue_suspend() suspends delivery for this destination
69 /* briefly. This function invalidates any scheduling decisions
70 /* that are based on the present queue's concurrency window.
71 /* To compensate for work skipped by qmgr_entry_done(), the
72 /* status of blocker jobs is re-evaluated after the queue is
73 /* resumed.
75 /* Panic: consistency check failure.
77 /* .ad
78 /* .fi
79 /* The Secure Mailer license must be distributed with this software.
80 /* AUTHOR(S)
81 /* Wietse Venema
82 /* IBM T.J. Watson Research
83 /* P.O. Box 704
84 /* Yorktown Heights, NY 10598, USA
85 /*
86 /* Pre-emptive scheduler enhancements:
87 /* Patrik Rak
88 /* Modra 6
89 /* 155 00, Prague, Czech Republic
90 /*
91 /* Concurrency scheduler enhancements with:
92 /* Victor Duchovni
93 /* Morgan Stanley
94 /*--*/
96 /* System library. */
98 #include <sys_defs.h>
99 #include <time.h>
101 /* Utility library. */
103 #include <msg.h>
104 #include <mymalloc.h>
105 #include <events.h>
106 #include <htable.h>
108 /* Global library. */
110 #include <mail_params.h>
111 #include <recipient_list.h>
112 #include <mail_proto.h> /* QMGR_LOG_WINDOW */
114 /* Application-specific. */
116 #include "qmgr.h"
120 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
121  (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
122  || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
124 #define QMGR_LOG_FEEDBACK(feedback) \
125  if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126  msg_info("%s: feedback %g", myname, feedback);
128 #define QMGR_LOG_WINDOW(queue) \
129  if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
130  msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
131  myname, queue->name, queue->transport->dest_concurrency_limit, \
132  queue->window, queue->success, queue->failure, queue->fail_cohorts);
134 /* qmgr_queue_resume - resume delivery to destination */
136 static void qmgr_queue_resume(int event, void *context)
137 {
138  QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
139  const char *myname = "qmgr_queue_resume";
141  /*
142  * Sanity checks.
143  */
144  if (!QMGR_QUEUE_SUSPENDED(queue))
145  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
147  /*
148  * We can't simply force delivery on this queue: the transport's pending
149  * count may already be maxed out, and there may be other constraints
150  * that definitely should be none of our business. The best we can do is
151  * to play by the same rules as everyone else: let qmgr_active_drain()
152  * and round-robin selection take care of message selection.
153  */
154  queue->window = 1;
156  /*
157  * Every event handler that leaves a queue in the "ready" state should
158  * remove the queue when it is empty.
159  *
160  * XXX Do not omit the redundant test below. It is here to simplify code
161  * consistency checks. The check is trivially eliminated by the compiler
162  * optimizer. There is no need to sacrifice code clarity for the sake of
163  * performance.
164  *
165  * XXX Do not expose the blocker job logic here. Rate-limited queues are not
166  * a performance-critical feature. Here, too, there is no need to
167  * sacrifice code clarity for the sake of performance.
168  */
169  if (QMGR_QUEUE_READY(queue) && queue-> == 0 && queue-> == 0)
170  qmgr_queue_done(queue);
171  else
173 }
175 /* qmgr_queue_suspend - briefly suspend a destination */
177 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
178 {
179  const char *myname = "qmgr_queue_suspend";
181  /*
182  * Sanity checks.
183  */
184  if (!QMGR_QUEUE_READY(queue))
185  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
186  if (queue->busy_refcount > 0)
187  msg_panic("%s: queue is busy", myname);
189  /*
190  * Set the queue status to "suspended". No-one is supposed to remove a
191  * queue in suspended state.
192  */
194  event_request_timer(qmgr_queue_resume, (void *) queue, delay);
195 }
197 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
199 static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
200 {
201  QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
203  /*
204  * This routine runs when a wakeup timer goes off; it does not run in the
205  * context of some queue manipulation. Therefore, it is safe to discard
206  * this in-core queue when it is empty and when this site is not dead.
207  */
208  qmgr_queue_unthrottle(queue);
209  if (QMGR_QUEUE_READY(queue) && queue-> == 0 && queue-> == 0)
210  qmgr_queue_done(queue);
211 }
213 /* qmgr_queue_unthrottle - give this destination another chance */
216 {
217  const char *myname = "qmgr_queue_unthrottle";
218  QMGR_TRANSPORT *transport = queue->transport;
219  double feedback;
221  if (msg_verbose)
222  msg_info("%s: queue %s", myname, queue->name);
224  /*
225  * Sanity checks.
226  */
227  if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue))
228  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
230  /*
231  * Don't restart the negative feedback hysteresis cycle with every
232  * positive feedback. Restart it only when we make a positive concurrency
233  * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
234  * Otherwise negative feedback would be too aggressive: negative feedback
235  * takes effect immediately at the start of its hysteresis cycle.
236  */
237  queue->fail_cohorts = 0;
239  /*
240  * Special case when this site was dead.
241  */
242  if (QMGR_QUEUE_THROTTLED(queue)) {
243  event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
244  if (queue->dsn == 0)
245  msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
246  dsn_free(queue->dsn);
247  queue->dsn = 0;
248  /* Back from the almost grave, best concurrency is anyone's guess. */
249  if (queue->busy_refcount > 0)
250  queue->window = queue->busy_refcount;
251  else
252  queue->window = transport->init_dest_concurrency;
253  queue->success = queue->failure = 0;
254  QMGR_LOG_WINDOW(queue);
255  return;
256  }
258  /*
259  * Increase the destination's concurrency limit until we reach the
260  * transport's concurrency limit. Allow for a margin the size of the
261  * initial destination concurrency, so that we're not too gentle.
262  *
263  * Why is the concurrency increment based on preferred concurrency and not
264  * on the number of outstanding delivery requests? The latter fluctuates
265  * wildly when deliveries complete in bursts (artificial benchmark
266  * measurements), and does not account for cached connections.
267  *
268  * Keep the window within reasonable distance from actual concurrency
269  * otherwise negative feedback will be ineffective. This expression
270  * assumes that busy_refcount changes gradually. This is invalid when
271  * deliveries complete in bursts (artificial benchmark measurements).
272  */
273  if (transport->dest_concurrency_limit == 0
274  || transport->dest_concurrency_limit > queue->window)
275  if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
276  feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
277  QMGR_LOG_FEEDBACK(feedback);
278  queue->success += feedback;
279  /* Prepare for overshoot (feedback > hysteresis, rounding error). */
280  while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
281  queue->window += transport->pos_feedback.hysteresis;
282  queue->success -= transport->pos_feedback.hysteresis;
283  queue->failure = 0;
284  }
285  /* Prepare for overshoot. */
286  if (transport->dest_concurrency_limit > 0
287  && queue->window > transport->dest_concurrency_limit)
288  queue->window = transport->dest_concurrency_limit;
289  }
290  QMGR_LOG_WINDOW(queue);
291 }
293 /* qmgr_queue_throttle - handle destination delivery failure */
296 {
297  const char *myname = "qmgr_queue_throttle";
298  QMGR_TRANSPORT *transport = queue->transport;
299  double feedback;
301  /*
302  * Sanity checks.
303  */
304  if (!QMGR_QUEUE_READY(queue))
305  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
306  if (queue->dsn)
307  msg_panic("%s: queue %s: spurious reason %s",
308  myname, queue->name, queue->dsn->reason);
309  if (msg_verbose)
310  msg_info("%s: queue %s: %s %s",
311  myname, queue->name, dsn->status, dsn->reason);
313  /*
314  * Don't restart the positive feedback hysteresis cycle with every
315  * negative feedback. Restart it only when we make a negative concurrency
316  * adjustment (i.e. at the start of a negative feedback hysteresis
317  * cycle). Otherwise positive feedback would be too weak (positive
318  * feedback does not take effect until the end of its hysteresis cycle).
319  */
321  /*
322  * This queue is declared dead after a configurable number of
323  * pseudo-cohort failures.
324  */
325  if (QMGR_QUEUE_READY(queue)) {
326  queue->fail_cohorts += 1.0 / queue->window;
327  if (transport->fail_cohort_limit > 0
328  && queue->fail_cohorts >= transport->fail_cohort_limit)
330  }
332  /*
333  * Decrease the destination's concurrency limit until we reach 1. Base
334  * adjustments on the concurrency limit itself, instead of using the
335  * actual concurrency. The latter fluctuates wildly when deliveries
336  * complete in bursts (artificial benchmark measurements).
337  *
338  * Even after reaching 1, we maintain the negative hysteresis cycle so that
339  * negative feedback can cancel out positive feedback.
340  */
341  if (QMGR_QUEUE_READY(queue)) {
342  feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
343  QMGR_LOG_FEEDBACK(feedback);
344  queue->failure -= feedback;
345  /* Prepare for overshoot (feedback > hysteresis, rounding error). */
346  while (queue->failure - feedback / 2 < 0) {
347  queue->window -= transport->neg_feedback.hysteresis;
348  queue->success = 0;
349  queue->failure += transport->neg_feedback.hysteresis;
350  }
351  /* Prepare for overshoot. */
352  if (queue->window < 1)
353  queue->window = 1;
354  }
356  /*
357  * Special case for a site that just was declared dead.
358  */
359  if (QMGR_QUEUE_THROTTLED(queue)) {
360  queue->dsn = DSN_COPY(dsn);
361  event_request_timer(qmgr_queue_unthrottle_wrapper,
362  (void *) queue, var_min_backoff_time);
363  queue->dflags = 0;
364  }
365  QMGR_LOG_WINDOW(queue);
366 }
368 /* qmgr_queue_done - delete in-core queue for site */
371 {
372  const char *myname = "qmgr_queue_done";
373  QMGR_TRANSPORT *transport = queue->transport;
375  /*
376  * Sanity checks. It is an error to delete an in-core queue with pending
377  * messages or timers.
378  */
379  if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
380  msg_panic("%s: refcount: %d", myname,
381  queue->busy_refcount + queue->todo_refcount);
382  if (queue-> || queue->
383  msg_panic("%s: queue not empty: %s", myname, queue->name);
384  if (!QMGR_QUEUE_READY(queue))
385  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
386  if (queue->dsn)
387  msg_panic("%s: queue %s: spurious reason %s",
388  myname, queue->name, queue->dsn->reason);
390  /*
391  * Clean up this in-core queue.
392  */
393  QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers);
394  htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
395  myfree(queue->name);
396  myfree(queue->nexthop);
398  myfree((void *) queue);
399 }
401 /* qmgr_queue_create - create in-core queue for site */
403 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
404  const char *nexthop)
405 {
406  QMGR_QUEUE *queue;
408  /*
409  * If possible, choose an initial concurrency of > 1 so that one bad
410  * message or one bad network won't slow us down unnecessarily.
411  */
413  queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
415  queue->dflags = 0;
416  queue->last_done = 0;
417  queue->name = mystrdup(name);
418  queue->nexthop = mystrdup(nexthop);
419  queue->todo_refcount = 0;
420  queue->busy_refcount = 0;
421  queue->transport = transport;
422  queue->window = transport->init_dest_concurrency;
423  queue->success = queue->failure = queue->fail_cohorts = 0;
424  QMGR_LIST_INIT(queue->todo);
425  QMGR_LIST_INIT(queue->busy);
426  queue->dsn = 0;
427  queue->clog_time_to_warn = 0;
428  queue->blocker_tag = 0;
429  QMGR_LIST_APPEND(transport->queue_list, queue, peers);
430  htable_enter(transport->queue_byname, name, (void *) queue);
431  return (queue);
432 }
434 /* qmgr_queue_find - find in-core named queue */
436 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
437 {
438  return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
439 }
int msg_verbose
Definition: msg.c:177
double success
Definition: qmgr.h:205
int dest_concurrency_limit
Definition: qmgr.h:156
Definition: qmgr.h:193
void myfree(void *ptr)
Definition: mymalloc.c:207
Definition: qmgr.h:210
time_t last_done
Definition: qmgr.h:199
char * mystrdup(const char *str)
Definition: mymalloc.c:225
int fail_cohort_limit
Definition: qmgr.h:165
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
const char * reason
Definition: dsn.h:20
int qmgr_queue_count
Definition: qmgr_queue.c:112
int blocker_tag
Definition: qmgr.h:253
Definition: qmgr.h:239
#define DSN_COPY(dsn)
Definition: dsn.h:68
QMGR_FEEDBACK neg_feedback
Definition: qmgr.h:164
double fail_cohorts
Definition: qmgr.h:207
Definition: qmgr.h:240
QMGR_QUEUE * qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
Definition: qmgr_queue.c:439
int init_dest_concurrency
Definition: qmgr.h:157
#define QMGR_LIST_INIT(head)
Definition: qmgr.h:88
#define QMGR_LOG_FEEDBACK(feedback)
Definition: qmgr_queue.c:124
#define QMGR_LIST_APPEND(head, object)
Definition: qmgr.h:66
#define QMGR_LOG_WINDOW(queue)
Definition: qmgr_queue.c:128
int busy_refcount
Definition: qmgr.h:203
#define QMGR_FEEDBACK_VAL(fb, win)
Definition: qmgr.h:132
struct HTABLE * queue_byname
Definition: qmgr.h:159
Definition: qmgr.h:246
void dsn_free(DSN *dsn)
Definition: dsn.c:179
void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
Definition: qmgr_queue.c:278
char * name
Definition: qmgr.h:200
void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
Definition: qmgr_queue.c:160
Definition: qmgr.h:209
void qmgr_job_blocker_update(QMGR_QUEUE *)
Definition: qmgr_job.c:950
int dflags
Definition: qmgr.h:198
void qmgr_queue_unthrottle(QMGR_QUEUE *queue)
Definition: qmgr_queue.c:198
void * htable_find(HTABLE *table, const char *key)
Definition: htable.c:227
Definition: qmgr.h:250
time_t clog_time_to_warn
Definition: qmgr.h:213
QMGR_QUEUE * qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, const char *nexthop)
Definition: qmgr_queue.c:407
int var_min_backoff_time
Definition: qmgr.c:411
const char * status
Definition: dsn.h:18
int todo_refcount
Definition: qmgr.h:202
QMGR_TRANSPORT * transport
Definition: qmgr.h:208
int hysteresis
Definition: qmgr.h:114
#define QMGR_LIST_UNLINK(head, type, object)
Definition: qmgr.h:56
QMGR_QUEUE_LIST queue_list
Definition: qmgr.h:160
Definition: dsn.h:17
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay)
Definition: events.c:894
int window
Definition: qmgr.h:204
QMGR_FEEDBACK pos_feedback
Definition: qmgr.h:163
char * nexthop
Definition: qmgr.h:201
Definition: qmgr.h:244
void qmgr_queue_done(QMGR_QUEUE *queue)
Definition: qmgr_queue.c:374
void htable_delete(HTABLE *table, const char *key, void(*free_fn)(void *))
Definition: htable.c:257
int event_cancel_timer(EVENT_NOTIFY_TIME_FN callback, void *context)
Definition: events.c:965
DSN * dsn
Definition: qmgr.h:212
double failure
Definition: qmgr.h:206
Definition: qmgr.h:245
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
HTABLE_INFO * htable_enter(HTABLE *table, const char *key, void *value)
Definition: htable.c:212
void msg_info(const char *fmt,...)
Definition: msg.c:199