Postfix3.3.1
qmgr_queue.c
[詳解]
1 /*++
2 /* NAME
3 /* qmgr_queue 3
4 /* SUMMARY
5 /* per-destination queues
6 /* SYNOPSIS
7 /* #include "qmgr.h"
8 /*
9 /* int qmgr_queue_count;
10 /*
11 /* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
12 /* QMGR_TRANSPORT *transport;
13 /* const char *name;
14 /* const char *nexthop;
15 /*
16 /* void qmgr_queue_done(queue)
17 /* QMGR_QUEUE *queue;
18 /*
19 /* QMGR_QUEUE *qmgr_queue_find(transport, name)
20 /* QMGR_TRANSPORT *transport;
21 /* const char *name;
22 /*
23 /* QMGR_QUEUE *qmgr_queue_select(transport)
24 /* QMGR_TRANSPORT *transport;
25 /*
26 /* void qmgr_queue_throttle(queue, dsn)
27 /* QMGR_QUEUE *queue;
28 /* DSN *dsn;
29 /*
30 /* void qmgr_queue_unthrottle(queue)
31 /* QMGR_QUEUE *queue;
32 /*
33 /* void qmgr_queue_suspend(queue, delay)
34 /* QMGR_QUEUE *queue;
35 /* int delay;
36 /* DESCRIPTION
37 /* These routines add/delete/manipulate per-destination queues.
38 /* Each queue corresponds to a specific transport and destination.
39 /* Each queue has a `todo' list of delivery requests for that
40 /* destination, and a `busy' list of delivery requests in progress.
41 /*
42 /* qmgr_queue_count is a global counter for the total number
43 /* of in-core queue structures.
44 /*
45 /* qmgr_queue_create() creates an empty named queue for the named
46 /* transport and destination. The queue is given an initial
47 /* concurrency limit as specified with the
48 /* \fIinitial_destination_concurrency\fR configuration parameter,
49 /* provided that it does not exceed the transport-specific
50 /* concurrency limit.
51 /*
52 /* qmgr_queue_done() disposes of a per-destination queue after all
53 /* its entries have been taken care of. It is an error to dispose
54 /* of a dead queue.
55 /*
56 /* qmgr_queue_find() looks up the named queue for the named
57 /* transport. A null result means that the queue was not found.
58 /*
59 /* qmgr_queue_select() uses a round-robin strategy to select
60 /* from the named transport one per-destination queue with a
61 /* non-empty `todo' list.
62 /*
63 /* qmgr_queue_throttle() handles a delivery error, and decrements the
64 /* concurrency limit for the destination, with a lower bound of 1.
65 /* When the cohort failure bound is reached, qmgr_queue_throttle()
66 /* sets the concurrency limit to zero and starts a timer
67 /* to re-enable delivery to the destination after a configurable delay.
68 /*
69 /* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
70 /* The concurrency limit for the destination is incremented,
71 /* provided that it does not exceed the destination concurrency
72 /* limit specified for the transport. This routine implements
73 /* "slow open" mode, and eliminates the "thundering herd" problem.
74 /*
75 /* qmgr_queue_suspend() suspends delivery for this destination
76 /* briefly.
77 /* DIAGNOSTICS
78 /* Panic: consistency check failure.
79 /* LICENSE
80 /* .ad
81 /* .fi
82 /* The Secure Mailer license must be distributed with this software.
83 /* AUTHOR(S)
84 /* Wietse Venema
85 /* IBM T.J. Watson Research
86 /* P.O. Box 704
87 /* Yorktown Heights, NY 10598, USA
88 /*--*/
89 
90 /* System library. */
91 
92 #include <sys_defs.h>
93 #include <time.h>
94 
95 /* Utility library. */
96 
97 #include <msg.h>
98 #include <mymalloc.h>
99 #include <events.h>
100 #include <htable.h>
101 
102 /* Global library. */
103 
104 #include <mail_params.h>
105 #include <recipient_list.h>
106 #include <mail_proto.h> /* QMGR_LOG_WINDOW */
107 
108 /* Application-specific. */
109 
110 #include "qmgr.h"
111 
113 
114 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
115  (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
116  || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
117 
118 #define QMGR_LOG_FEEDBACK(feedback) \
119  if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
120  msg_info("%s: feedback %g", myname, feedback);
121 
122 #define QMGR_LOG_WINDOW(queue) \
123  if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
124  msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
125  myname, queue->name, queue->transport->dest_concurrency_limit, \
126  queue->window, queue->success, queue->failure, queue->fail_cohorts);
127 
128 /* qmgr_queue_resume - resume delivery to destination */
129 
130 static void qmgr_queue_resume(int event, void *context)
131 {
132  QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
133  const char *myname = "qmgr_queue_resume";
134 
135  /*
136  * Sanity checks.
137  */
138  if (!QMGR_QUEUE_SUSPENDED(queue))
139  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
140 
141  /*
142  * We can't simply force delivery on this queue: the transport's pending
143  * count may already be maxed out, and there may be other constraints
144  * that definitely should be none of our business. The best we can do is
145  * to play by the same rules as everyone else: let qmgr_active_drain()
146  * and round-robin selection take care of message selection.
147  */
148  queue->window = 1;
149 
150  /*
151  * Every event handler that leaves a queue in the "ready" state should
152  * remove the queue when it is empty.
153  */
154  if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
155  qmgr_queue_done(queue);
156 }
157 
158 /* qmgr_queue_suspend - briefly suspend a destination */
159 
160 void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
161 {
162  const char *myname = "qmgr_queue_suspend";
163 
164  /*
165  * Sanity checks.
166  */
167  if (!QMGR_QUEUE_READY(queue))
168  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
169  if (queue->busy_refcount > 0)
170  msg_panic("%s: queue is busy", myname);
171 
172  /*
173  * Set the queue status to "suspended". No-one is supposed to remove a
174  * queue in suspended state.
175  */
177  event_request_timer(qmgr_queue_resume, (void *) queue, delay);
178 }
179 
180 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
181 
182 static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
183 {
184  QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
185 
186  /*
187  * This routine runs when a wakeup timer goes off; it does not run in the
188  * context of some queue manipulation. Therefore, it is safe to discard
189  * this in-core queue when it is empty and when this site is not dead.
190  */
191  qmgr_queue_unthrottle(queue);
192  if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
193  qmgr_queue_done(queue);
194 }
195 
196 /* qmgr_queue_unthrottle - give this destination another chance */
197 
199 {
200  const char *myname = "qmgr_queue_unthrottle";
201  QMGR_TRANSPORT *transport = queue->transport;
202  double feedback;
203 
204  if (msg_verbose)
205  msg_info("%s: queue %s", myname, queue->name);
206 
207  /*
208  * Sanity checks.
209  */
210  if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue))
211  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
212 
213  /*
214  * Don't restart the negative feedback hysteresis cycle with every
215  * positive feedback. Restart it only when we make a positive concurrency
216  * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
217  * Otherwise negative feedback would be too aggressive: negative feedback
218  * takes effect immediately at the start of its hysteresis cycle.
219  */
220  queue->fail_cohorts = 0;
221 
222  /*
223  * Special case when this site was dead.
224  */
225  if (QMGR_QUEUE_THROTTLED(queue)) {
226  event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
227  if (queue->dsn == 0)
228  msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
229  dsn_free(queue->dsn);
230  queue->dsn = 0;
231  /* Back from the almost grave, best concurrency is anyone's guess. */
232  if (queue->busy_refcount > 0)
233  queue->window = queue->busy_refcount;
234  else
235  queue->window = transport->init_dest_concurrency;
236  queue->success = queue->failure = 0;
237  QMGR_LOG_WINDOW(queue);
238  return;
239  }
240 
241  /*
242  * Increase the destination's concurrency limit until we reach the
243  * transport's concurrency limit. Allow for a margin the size of the
244  * initial destination concurrency, so that we're not too gentle.
245  *
246  * Why is the concurrency increment based on preferred concurrency and not
247  * on the number of outstanding delivery requests? The latter fluctuates
248  * wildly when deliveries complete in bursts (artificial benchmark
249  * measurements), and does not account for cached connections.
250  *
251  * Keep the window within reasonable distance from actual concurrency
252  * otherwise negative feedback will be ineffective. This expression
253  * assumes that busy_refcount changes gradually. This is invalid when
254  * deliveries complete in bursts (artificial benchmark measurements).
255  */
256  if (transport->dest_concurrency_limit == 0
257  || transport->dest_concurrency_limit > queue->window)
258  if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
259  feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
260  QMGR_LOG_FEEDBACK(feedback);
261  queue->success += feedback;
262  /* Prepare for overshoot (feedback > hysteresis, rounding error). */
263  while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
264  queue->window += transport->pos_feedback.hysteresis;
265  queue->success -= transport->pos_feedback.hysteresis;
266  queue->failure = 0;
267  }
268  /* Prepare for overshoot. */
269  if (transport->dest_concurrency_limit > 0
270  && queue->window > transport->dest_concurrency_limit)
271  queue->window = transport->dest_concurrency_limit;
272  }
273  QMGR_LOG_WINDOW(queue);
274 }
275 
276 /* qmgr_queue_throttle - handle destination delivery failure */
277 
279 {
280  const char *myname = "qmgr_queue_throttle";
281  QMGR_TRANSPORT *transport = queue->transport;
282  double feedback;
283 
284  /*
285  * Sanity checks.
286  */
287  if (!QMGR_QUEUE_READY(queue))
288  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
289  if (queue->dsn)
290  msg_panic("%s: queue %s: spurious reason %s",
291  myname, queue->name, queue->dsn->reason);
292  if (msg_verbose)
293  msg_info("%s: queue %s: %s %s",
294  myname, queue->name, dsn->status, dsn->reason);
295 
296  /*
297  * Don't restart the positive feedback hysteresis cycle with every
298  * negative feedback. Restart it only when we make a negative concurrency
299  * adjustment (i.e. at the start of a negative feedback hysteresis
300  * cycle). Otherwise positive feedback would be too weak (positive
301  * feedback does not take effect until the end of its hysteresis cycle).
302  */
303 
304  /*
305  * This queue is declared dead after a configurable number of
306  * pseudo-cohort failures.
307  */
308  if (QMGR_QUEUE_READY(queue)) {
309  queue->fail_cohorts += 1.0 / queue->window;
310  if (transport->fail_cohort_limit > 0
311  && queue->fail_cohorts >= transport->fail_cohort_limit)
313  }
314 
315  /*
316  * Decrease the destination's concurrency limit until we reach 1. Base
317  * adjustments on the concurrency limit itself, instead of using the
318  * actual concurrency. The latter fluctuates wildly when deliveries
319  * complete in bursts (artificial benchmark measurements).
320  *
321  * Even after reaching 1, we maintain the negative hysteresis cycle so that
322  * negative feedback can cancel out positive feedback.
323  */
324  if (QMGR_QUEUE_READY(queue)) {
325  feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
326  QMGR_LOG_FEEDBACK(feedback);
327  queue->failure -= feedback;
328  /* Prepare for overshoot (feedback > hysteresis, rounding error). */
329  while (queue->failure - feedback / 2 < 0) {
330  queue->window -= transport->neg_feedback.hysteresis;
331  queue->success = 0;
332  queue->failure += transport->neg_feedback.hysteresis;
333  }
334  /* Prepare for overshoot. */
335  if (queue->window < 1)
336  queue->window = 1;
337  }
338 
339  /*
340  * Special case for a site that just was declared dead.
341  */
342  if (QMGR_QUEUE_THROTTLED(queue)) {
343  queue->dsn = DSN_COPY(dsn);
344  event_request_timer(qmgr_queue_unthrottle_wrapper,
345  (void *) queue, var_min_backoff_time);
346  queue->dflags = 0;
347  }
348  QMGR_LOG_WINDOW(queue);
349 }
350 
351 /* qmgr_queue_select - select in-core queue for delivery */
352 
354 {
355  QMGR_QUEUE *queue;
356 
357  /*
358  * If we find a suitable site, rotate the list to enforce round-robin
359  * selection. See similar selection code in qmgr_transport_select().
360  */
361  for (queue = transport->queue_list.next; queue; queue = queue->peers.next) {
362  if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
363  QMGR_LIST_ROTATE(transport->queue_list, queue);
364  if (msg_verbose)
365  msg_info("qmgr_queue_select: %s", queue->name);
366  return (queue);
367  }
368  }
369  return (0);
370 }
371 
372 /* qmgr_queue_done - delete in-core queue for site */
373 
375 {
376  const char *myname = "qmgr_queue_done";
377  QMGR_TRANSPORT *transport = queue->transport;
378 
379  /*
380  * Sanity checks. It is an error to delete an in-core queue with pending
381  * messages or timers.
382  */
383  if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
384  msg_panic("%s: refcount: %d", myname,
385  queue->busy_refcount + queue->todo_refcount);
386  if (queue->todo.next || queue->busy.next)
387  msg_panic("%s: queue not empty: %s", myname, queue->name);
388  if (!QMGR_QUEUE_READY(queue))
389  msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
390  if (queue->dsn)
391  msg_panic("%s: queue %s: spurious reason %s",
392  myname, queue->name, queue->dsn->reason);
393 
394  /*
395  * Clean up this in-core queue.
396  */
397  QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue);
398  htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
399  myfree(queue->name);
400  myfree(queue->nexthop);
402  myfree((void *) queue);
403 }
404 
405 /* qmgr_queue_create - create in-core queue for site */
406 
407 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
408  const char *nexthop)
409 {
410  QMGR_QUEUE *queue;
411 
412  /*
413  * If possible, choose an initial concurrency of > 1 so that one bad
414  * message or one bad network won't slow us down unnecessarily.
415  */
416 
417  queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
419  queue->dflags = 0;
420  queue->last_done = 0;
421  queue->name = mystrdup(name);
422  queue->nexthop = mystrdup(nexthop);
423  queue->todo_refcount = 0;
424  queue->busy_refcount = 0;
425  queue->transport = transport;
426  queue->window = transport->init_dest_concurrency;
427  queue->success = queue->failure = queue->fail_cohorts = 0;
428  QMGR_LIST_INIT(queue->todo);
429  QMGR_LIST_INIT(queue->busy);
430  queue->dsn = 0;
431  queue->clog_time_to_warn = 0;
432  QMGR_LIST_PREPEND(transport->queue_list, queue);
433  htable_enter(transport->queue_byname, name, (void *) queue);
434  return (queue);
435 }
436 
437 /* qmgr_queue_find - find in-core named queue */
438 
439 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
440 {
441  return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
442 }
int msg_verbose
Definition: msg.c:177
double success
Definition: qmgr.h:205
int dest_concurrency_limit
Definition: qmgr.h:156
QMGR_ENTRY * next
Definition: qmgr.h:193
void myfree(void *ptr)
Definition: mymalloc.c:207
QMGR_ENTRY_LIST busy
Definition: qmgr.h:210
time_t last_done
Definition: qmgr.h:199
char * mystrdup(const char *str)
Definition: mymalloc.c:225
int fail_cohort_limit
Definition: qmgr.h:165
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
const char * reason
Definition: dsn.h:20
int qmgr_queue_count
Definition: qmgr_queue.c:112
#define QMGR_QUEUE_STAT_THROTTLED
Definition: qmgr.h:239
#define DSN_COPY(dsn)
Definition: dsn.h:68
QMGR_FEEDBACK neg_feedback
Definition: qmgr.h:164
double fail_cohorts
Definition: qmgr.h:207
#define QMGR_QUEUE_STAT_SUSPENDED
Definition: qmgr.h:240
QMGR_QUEUE * qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
Definition: qmgr_queue.c:439
int init_dest_concurrency
Definition: qmgr.h:157
#define QMGR_LIST_INIT(head)
Definition: qmgr.h:88
QMGR_QUEUE * next
Definition: qmgr.h:148
#define QMGR_LIST_ROTATE(head, object)
Definition: qmgr.h:46
int busy_refcount
Definition: qmgr.h:203
QMGR_QUEUE_LIST peers
Definition: qmgr.h:211
#define QMGR_LOG_WINDOW(queue)
Definition: qmgr_queue.c:122
#define QMGR_FEEDBACK_VAL(fb, win)
Definition: qmgr.h:132
struct HTABLE * queue_byname
Definition: qmgr.h:159
#define QMGR_QUEUE_SUSPENDED(q)
Definition: qmgr.h:246
void dsn_free(DSN *dsn)
Definition: dsn.c:179
void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
Definition: qmgr_queue.c:278
char * name
Definition: qmgr.h:200
void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
Definition: qmgr_queue.c:160
QMGR_ENTRY_LIST todo
Definition: qmgr.h:209
int dflags
Definition: qmgr.h:198
void qmgr_queue_unthrottle(QMGR_QUEUE *queue)
Definition: qmgr_queue.c:198
void * htable_find(HTABLE *table, const char *key)
Definition: htable.c:227
#define QMGR_QUEUE_STATUS(q)
Definition: qmgr.h:250
time_t clog_time_to_warn
Definition: qmgr.h:213
QMGR_QUEUE * qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, const char *nexthop)
Definition: qmgr_queue.c:407
int var_min_backoff_time
Definition: qmgr.c:411
const char * status
Definition: dsn.h:18
int todo_refcount
Definition: qmgr.h:202
QMGR_TRANSPORT * transport
Definition: qmgr.h:208
int hysteresis
Definition: qmgr.h:114
#define QMGR_LIST_UNLINK(head, type, object)
Definition: qmgr.h:56
QMGR_QUEUE_LIST queue_list
Definition: qmgr.h:160
#define QMGR_LOG_FEEDBACK(feedback)
Definition: qmgr_queue.c:118
Definition: dsn.h:17
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay)
Definition: events.c:894
#define QMGR_LIST_PREPEND(head, object)
Definition: qmgr.h:77
int window
Definition: qmgr.h:204
QMGR_FEEDBACK pos_feedback
Definition: qmgr.h:163
char * nexthop
Definition: qmgr.h:201
#define QMGR_QUEUE_READY(q)
Definition: qmgr.h:244
void qmgr_queue_done(QMGR_QUEUE *queue)
Definition: qmgr_queue.c:374
void htable_delete(HTABLE *table, const char *key, void(*free_fn)(void *))
Definition: htable.c:257
int event_cancel_timer(EVENT_NOTIFY_TIME_FN callback, void *context)
Definition: events.c:965
DSN * dsn
Definition: qmgr.h:212
double failure
Definition: qmgr.h:206
QMGR_QUEUE * qmgr_queue_select(QMGR_TRANSPORT *transport)
Definition: qmgr_queue.c:353
#define QMGR_QUEUE_THROTTLED(q)
Definition: qmgr.h:245
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
HTABLE_INFO * htable_enter(HTABLE *table, const char *key, void *value)
Definition: htable.c:212
void msg_info(const char *fmt,...)
Definition: msg.c:199