Postfix3.3.1
qmgr_entry.c
[詳解]
1 /*++
2 /* NAME
3 /* qmgr_entry 3
4 /* SUMMARY
5 /* per-site queue entries
6 /* SYNOPSIS
7 /* #include "qmgr.h"
8 /*
9 /* QMGR_ENTRY *qmgr_entry_create(queue, message)
10 /* QMGR_QUEUE *queue;
11 /* QMGR_MESSAGE *message;
12 /*
13 /* void qmgr_entry_done(entry, which)
14 /* QMGR_ENTRY *entry;
15 /* int which;
16 /*
17 /* QMGR_ENTRY *qmgr_entry_select(queue)
18 /* QMGR_QUEUE *queue;
19 /*
20 /* void qmgr_entry_unselect(queue, entry)
21 /* QMGR_QUEUE *queue;
22 /* QMGR_ENTRY *entry;
23 /*
24 /* void qmgr_entry_move_todo(dst, entry)
25 /* QMGR_QUEUE *dst;
26 /* QMGR_ENTRY *entry;
27 /* DESCRIPTION
28 /* These routines add/delete/manipulate per-site message
29 /* delivery requests.
30 /*
31 /* qmgr_entry_create() creates an entry for the named queue and
32 /* message, and appends the entry to the queue's todo list.
33 /* Filling in and cleaning up the recipients is the responsibility
34 /* of the caller.
35 /*
36 /* qmgr_entry_done() discards a per-site queue entry. The
37 /* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
38 /* of the site's `busy' list (i.e. queue entries that have been
39 /* selected for actual delivery), or QMGR_QUEUE_TODO for an entry
40 /* of the site's `todo' list (i.e. queue entries awaiting selection
41 /* for actual delivery).
42 /*
43 /* qmgr_entry_done() triggers cleanup of the per-site queue when
44 /* the site has no pending deliveries, and the site is either
45 /* alive, or the site is dead and the number of in-core queues
46 /* exceeds a configurable limit (see qmgr_queue_done()).
47 /*
48 /* qmgr_entry_done() triggers special action when the last in-core
49 /* queue entry for a message is done with: either read more
50 /* recipients from the queue file, delete the queue file, or move
51 /* the queue file to the deferred queue; send bounce reports to the
52 /* message originator (see qmgr_active_done()).
53 /*
54 /* qmgr_entry_select() selects the next entry from the named
55 /* per-site queue's `todo' list for actual delivery. The entry is
56 /* moved to the queue's `busy' list: the list of messages being
57 /* delivered.
58 /*
59 /* qmgr_entry_unselect() takes the named entry off the named
60 /* per-site queue's `busy' list and moves it to the queue's
61 /* `todo' list.
62 /*
63 /* qmgr_entry_move_todo() moves the specified "todo" queue entry
64 /* to the specified "todo" queue.
65 /* DIAGNOSTICS
66 /* Panic: interface violations, internal inconsistencies.
67 /* LICENSE
68 /* .ad
69 /* .fi
70 /* The Secure Mailer license must be distributed with this software.
71 /* AUTHOR(S)
72 /* Wietse Venema
73 /* IBM T.J. Watson Research
74 /* P.O. Box 704
75 /* Yorktown Heights, NY 10598, USA
76 /*--*/
77 
78 /* System library. */
79 
80 #include <sys_defs.h>
81 #include <stdlib.h>
82 #include <time.h>
83 
84 /* Utility library. */
85 
86 #include <msg.h>
87 #include <mymalloc.h>
88 #include <events.h>
89 #include <vstream.h>
90 
91 /* Global library. */
92 
93 #include <mail_params.h>
94 #include <deliver_request.h> /* opportunistic session caching */
95 
96 /* Application-specific. */
97 
98 #include "qmgr.h"
99 
100 /* qmgr_entry_select - select queue entry for delivery */
101 
103 {
104  const char *myname = "qmgr_entry_select";
105  QMGR_ENTRY *entry;
106 
107  if ((entry = queue->todo.prev) != 0) {
108  QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
109  queue->todo_refcount--;
110  QMGR_LIST_APPEND(queue->busy, entry);
111  queue->busy_refcount++;
112 
113  /*
114  * With opportunistic session caching, the delivery agent must not
115  * only 1) save a session upon completion, but also 2) reuse a cached
116  * session upon the next delivery request. In order to not miss out
117  * on 2), we have to make caching sticky or else we get silly
118  * behavior when the in-memory queue drains. Specifically, new
119  * connections must not be made as long as cached connections exist.
120  *
121  * Safety: don't enable opportunistic session caching unless the queue
122  * manager is able to schedule concurrent or back-to-back deliveries
123  * (we need to recognize back-to-back deliveries for transports with
124  * concurrency 1).
125  *
126  * If caching has previously been enabled, but is not now, fetch any
127  * existing entries from the cache, but don't add new ones.
128  */
129 #define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
130  (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
131 
132 #define BACK_TO_BACK_DELIVERY() \
133  (queue->last_done + 1 >= event_time())
134 
135  /*
136  * Turn on session caching after we get up to speed. Don't enable
137  * session caching just because we have concurrent deliveries. This
138  * prevents unnecessary session caching when we have a burst of mail
139  * <= the initial concurrency limit.
140  */
141  if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
142  if (BACK_TO_BACK_DELIVERY()) {
143  if (msg_verbose)
144  msg_info("%s: allowing on-demand session caching for %s",
145  myname, queue->name);
146  queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
147  }
148  }
149 
150  /*
151  * Turn off session caching when concurrency drops and we're running
152  * out of steam. This is what prevents from turning off session
153  * caching too early, and from making new connections while old ones
154  * are still cached.
155  */
156  else {
158  if (msg_verbose)
159  msg_info("%s: disallowing on-demand session caching for %s",
160  myname, queue->name);
161  queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
162  }
163  }
164  }
165  return (entry);
166 }
167 
168 /* qmgr_entry_unselect - unselect queue entry for delivery */
169 
171 {
172  QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
173  queue->busy_refcount--;
174  QMGR_LIST_APPEND(queue->todo, entry);
175  queue->todo_refcount++;
176 }
177 
178 /* qmgr_entry_move_todo - move entry between todo queues */
179 
181 {
182  const char *myname = "qmgr_entry_move_todo";
183  QMGR_MESSAGE *message = entry->message;
184  QMGR_QUEUE *src = entry->queue;
185  QMGR_ENTRY *new_entry;
186 
187  if (entry->stream != 0)
188  msg_panic("%s: queue %s entry is busy", myname, src->name);
189  if (QMGR_QUEUE_THROTTLED(dst))
190  msg_panic("%s: destination queue %s is throttled", myname, dst->name);
192  msg_panic("%s: destination transport %s is throttled",
193  myname, dst->transport->name);
194 
195  /*
196  * Create new entry, swap the recipients between the old and new entries,
197  * then dispose of the old entry. This gives us any end-game actions that
198  * are implemented by qmgr_entry_done(), so we don't have to duplicate
199  * those actions here.
200  *
201  * XXX This does not enforce the per-entry recipient limit, but that is not
202  * a problem as long as qmgr_entry_move_todo() is called only to bounce
203  * or defer mail.
204  */
205  new_entry = qmgr_entry_create(dst, message);
206  recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
208 }
209 
210 /* qmgr_entry_done - dispose of queue entry */
211 
212 void qmgr_entry_done(QMGR_ENTRY *entry, int which)
213 {
214  const char *myname = "qmgr_entry_done";
215  QMGR_QUEUE *queue = entry->queue;
216  QMGR_MESSAGE *message = entry->message;
217  QMGR_TRANSPORT *transport = queue->transport;
218 
219  /*
220  * Take this entry off the in-core queue.
221  */
222  if (entry->stream != 0)
223  msg_panic("%s: file is open", myname);
224  if (which == QMGR_QUEUE_BUSY) {
225  QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry);
226  queue->busy_refcount--;
227  } else if (which == QMGR_QUEUE_TODO) {
228  QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry);
229  queue->todo_refcount--;
230  } else {
231  msg_panic("%s: bad queue spec: %d", myname, which);
232  }
233 
234  /*
235  * Free the recipient list and decrease the in-core recipient count
236  * accordingly.
237  */
240 
241  myfree((void *) entry);
242 
243  /*
244  * Maintain back-to-back delivery status.
245  */
246  if (which == QMGR_QUEUE_BUSY)
247  queue->last_done = event_time();
248 
249  /*
250  * Suspend a rate-limited queue, so that mail trickles out.
251  */
252  if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
253  if (queue->window > 1)
254  msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
255  myname, transport->name, queue->name, queue->window);
256  if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */
257  qmgr_queue_unthrottle(queue);
258  if (QMGR_QUEUE_READY(queue))
259  qmgr_queue_suspend(queue, transport->rate_delay);
260  }
261 
262  /*
263  * When the in-core queue for this site is empty and when this site is
264  * not dead, discard the in-core queue. When this site is dead, but the
265  * number of in-core queues exceeds some threshold, get rid of this
266  * in-core queue anyway, in order to avoid running out of memory.
267  *
268  * See also: qmgr_entry_move_todo().
269  */
270  if (queue->todo.next == 0 && queue->busy.next == 0) {
272  qmgr_queue_unthrottle(queue);
273  if (QMGR_QUEUE_READY(queue))
274  qmgr_queue_done(queue);
275  }
276 
277  /*
278  * Update the in-core message reference count. When the in-core message
279  * structure has no more references, dispose of the message.
280  *
281  * When the in-core recipient count falls below a threshold, and this
282  * message has more recipients, read more recipients now. If we read more
283  * recipients as soon as the recipient count falls below the in-core
284  * recipient limit, we do not give other messages a chance until this
285  * message is delivered. That's good for mailing list deliveries, bad for
286  * one-to-one mail. If we wait until the in-core recipient count drops
287  * well below the in-core recipient limit, we give other mail a chance,
288  * but we also allow list deliveries to become interleaved. In the worst
289  * case, people near the start of a mailing list get a burst of postings
290  * today, while people near the end of the list get that same burst of
291  * postings a whole day later.
292  */
293 #define FUDGE(x) ((x) * (var_qmgr_fudge / 100.0))
294  message->refcount--;
295  if (message->rcpt_offset > 0
297  qmgr_message_realloc(message);
298  if (message->refcount == 0)
299  qmgr_active_done(message);
300 }
301 
302 /* qmgr_entry_create - create queue todo entry */
303 
305 {
306  QMGR_ENTRY *entry;
307 
308  /*
309  * Sanity check.
310  */
311  if (QMGR_QUEUE_THROTTLED(queue))
312  msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
313 
314  /*
315  * Create the delivery request.
316  */
317  entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
318  entry->stream = 0;
319  entry->message = message;
321  message->refcount++;
322  entry->queue = queue;
323  QMGR_LIST_APPEND(queue->todo, entry);
324  queue->todo_refcount++;
325 
326  /*
327  * Warn if a destination is falling behind while the active queue
328  * contains a non-trivial amount of single-recipient email. When a
329  * destination takes up more and more space in the active queue, then
330  * other mail will not get through and delivery performance will suffer.
331  *
332  * XXX At this point in the code, the busy reference count is still less
333  * than the concurrency limit (otherwise this code would not be invoked
334  * in the first place) so we have to make make some awkward adjustments
335  * below.
336  *
337  * XXX The queue length test below looks at the active queue share of an
338  * individual destination. This catches the case where mail for one
339  * destination is falling behind because it has to round-robin compete
340  * with many other destinations. However, Postfix will also perform
341  * poorly when most of the active queue is tied up by a small number of
342  * concurrency limited destinations. The queue length test below detects
343  * such conditions only indirectly.
344  *
345  * XXX This code does not detect the case that the active queue is being
346  * starved because incoming mail is pounding the disk.
347  */
349  int queue_length = queue->todo_refcount + queue->busy_refcount;
350  time_t now;
351  QMGR_TRANSPORT *transport;
352  double active_share;
353 
354  if (queue_length > var_qmgr_active_limit / 5
355  && (now = event_time()) >= queue->clog_time_to_warn) {
356  active_share = queue_length / (double) qmgr_message_count;
357  msg_warn("mail for %s is using up %d of %d active queue entries",
358  queue->nexthop, queue_length, qmgr_message_count);
359  if (active_share < 0.9)
360  msg_warn("this may slow down other mail deliveries");
361  transport = queue->transport;
362  if (transport->dest_concurrency_limit > 0
363  && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
364  msg_warn("you may need to increase the main.cf %s%s from %d",
365  transport->name, _DEST_CON_LIMIT,
366  transport->dest_concurrency_limit);
367  else if (queue->window > var_qmgr_active_limit * active_share)
368  msg_warn("you may need to increase the main.cf %s from %d",
370  else if (queue->peers.next != queue->peers.prev)
371  msg_warn("you may need a separate master.cf transport for %s",
372  queue->nexthop);
373  else {
374  msg_warn("you may need to reduce %s connect and helo timeouts",
375  transport->name);
376  msg_warn("so that Postfix quickly skips unavailable hosts");
377  msg_warn("you may need to increase the main.cf %s and %s",
379  msg_warn("so that Postfix wastes less time on undeliverable mail");
380  msg_warn("you may need to increase the master.cf %s process limit",
381  transport->name);
382  }
383  msg_warn("please avoid flushing the whole queue when you have");
384  msg_warn("lots of deferred mail, that is bad for performance");
385  msg_warn("to turn off these warnings specify: %s = 0",
388  }
389  }
390  return (entry);
391 }
int msg_verbose
Definition: msg.c:177
int dest_concurrency_limit
Definition: qmgr.h:156
QMGR_ENTRY * next
Definition: qmgr.h:193
void myfree(void *ptr)
Definition: mymalloc.c:207
QMGR_ENTRY_LIST busy
Definition: qmgr.h:210
time_t last_done
Definition: qmgr.h:199
QMGR_QUEUE * queue
Definition: qmgr.h:266
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
#define QMGR_QUEUE_BUSY
Definition: qmgr.h:217
int var_qmgr_clog_warn_time
Definition: qmgr.c:426
QMGR_ENTRY * prev
Definition: qmgr.h:194
#define VAR_MAX_BACKOFF_TIME
Definition: mail_params.h:749
void qmgr_queue_suspend(QMGR_QUEUE *, int)
Definition: qmgr_queue.c:160
#define QMGR_TRANSPORT_THROTTLED(t)
Definition: qmgr.h:181
#define DEL_REQ_FLAG_CONN_MASK
int var_qmgr_active_limit
Definition: qmgr.c:415
int var_qmgr_rcpt_limit
Definition: qmgr.c:416
QMGR_QUEUE * next
Definition: qmgr.h:148
#define QMGR_LIST_APPEND(head, object)
Definition: qmgr.h:66
void qmgr_queue_unthrottle(QMGR_QUEUE *)
Definition: qmgr_queue.c:198
#define QMGR_QUEUE_TODO
Definition: qmgr.h:216
#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY()
void qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry)
Definition: qmgr_entry.c:170
int busy_refcount
Definition: qmgr.h:203
QMGR_QUEUE_LIST peers
Definition: qmgr.h:211
void qmgr_entry_done(QMGR_ENTRY *entry, int which)
Definition: qmgr_entry.c:212
QMGR_MESSAGE * qmgr_message_realloc(QMGR_MESSAGE *)
#define VAR_QMGR_CLOG_WARN_TIME
Definition: mail_params.h:882
char * name
Definition: qmgr.h:200
QMGR_ENTRY_LIST todo
Definition: qmgr.h:209
void msg_warn(const char *fmt,...)
Definition: msg.c:215
void qmgr_queue_done(QMGR_QUEUE *)
Definition: qmgr_queue.c:374
int dflags
Definition: qmgr.h:198
void qmgr_active_done(QMGR_MESSAGE *)
Definition: qmgr_active.c:258
#define RCPT_LIST_INIT_QUEUE
long rcpt_offset
Definition: qmgr.h:310
QMGR_QUEUE * prev
Definition: qmgr.h:149
time_t clog_time_to_warn
Definition: qmgr.h:213
char * name
Definition: qmgr.h:155
int var_helpful_warnings
Definition: mail_params.c:231
QMGR_MESSAGE * message
Definition: qmgr.h:264
int todo_refcount
Definition: qmgr.h:202
QMGR_TRANSPORT * transport
Definition: qmgr.h:208
#define FUDGE(x)
time_t event_time(void)
Definition: events.c:647
void recipient_list_init(RECIPIENT_LIST *list, int variant)
#define QMGR_LIST_UNLINK(head, type, object)
Definition: qmgr.h:56
void recipient_list_swap(RECIPIENT_LIST *a, RECIPIENT_LIST *b)
#define VAR_QMGR_ACT_LIMIT
Definition: mail_params.h:776
int qmgr_recipient_count
Definition: qmgr_message.c:151
int qmgr_queue_count
Definition: qmgr_queue.c:112
void recipient_list_free(RECIPIENT_LIST *list)
QMGR_ENTRY * qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message)
Definition: qmgr_entry.c:304
#define DEL_REQ_FLAG_CONN_STORE
VSTREAM * stream
Definition: qmgr.h:263
RECIPIENT_LIST rcpt_list
Definition: qmgr.h:265
int window
Definition: qmgr.h:204
int rate_delay
Definition: qmgr.h:167
char * nexthop
Definition: qmgr.h:201
void qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry)
Definition: qmgr_entry.c:180
#define QMGR_QUEUE_READY(q)
Definition: qmgr.h:244
#define VAR_MIN_BACKOFF_TIME
Definition: mail_params.h:745
int qmgr_message_count
Definition: qmgr_message.c:150
QMGR_ENTRY * qmgr_entry_select(QMGR_QUEUE *queue)
Definition: qmgr_entry.c:102
#define BACK_TO_BACK_DELIVERY()
int refcount
Definition: qmgr.h:289
#define QMGR_QUEUE_THROTTLED(q)
Definition: qmgr.h:245
#define _DEST_CON_LIMIT
Definition: mail_params.h:844
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
void msg_info(const char *fmt,...)
Definition: msg.c:199