1 /*++
2 /* NAME
3 /* qmgr_entry 3
5 /* per-site queue entries
7 /* #include "qmgr.h"
8 /*
9 /* QMGR_ENTRY *qmgr_entry_create(peer, message)
10 /* QMGR_PEER *peer;
11 /* QMGR_MESSAGE *message;
12 /*
13 /* void qmgr_entry_done(entry, which)
14 /* QMGR_ENTRY *entry;
15 /* int which;
16 /*
17 /* QMGR_ENTRY *qmgr_entry_select(queue)
18 /* QMGR_QUEUE *queue;
19 /*
20 /* void qmgr_entry_unselect(queue, entry)
21 /* QMGR_QUEUE *queue;
22 /* QMGR_ENTRY *entry;
23 /*
24 /* void qmgr_entry_move_todo(dst, entry)
25 /* QMGR_QUEUE *dst;
26 /* QMGR_ENTRY *entry;
28 /* These routines add/delete/manipulate per-site message
29 /* delivery requests.
30 /*
31 /* qmgr_entry_create() creates an entry for the named peer and message,
32 /* and appends the entry to the peer's list and its queue's todo list.
33 /* Filling in and cleaning up the recipients is the responsibility
34 /* of the caller.
35 /*
36 /* qmgr_entry_done() discards a per-site queue entry. The
37 /* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
38 /* of the site's `busy' list (i.e. queue entries that have been
39 /* selected for actual delivery), or QMGR_QUEUE_TODO for an entry
40 /* of the site's `todo' list (i.e. queue entries awaiting selection
41 /* for actual delivery).
42 /*
43 /* qmgr_entry_done() discards its peer structure when the peer
44 /* is not referenced anymore.
45 /*
46 /* qmgr_entry_done() triggers cleanup of the per-site queue when
47 /* the site has no pending deliveries, and the site is either
48 /* alive, or the site is dead and the number of in-core queues
49 /* exceeds a configurable limit (see qmgr_queue_done()).
50 /*
51 /* qmgr_entry_done() triggers special action when the last in-core
52 /* queue entry for a message is done with: either read more
53 /* recipients from the queue file, delete the queue file, or move
54 /* the queue file to the deferred queue; send bounce reports to the
55 /* message originator (see qmgr_active_done()).
56 /*
57 /* qmgr_entry_select() selects first entry from the named
58 /* per-site queue's `todo' list for actual delivery. The entry is
59 /* moved to the queue's `busy' list: the list of messages being
60 /* delivered. The entry is also removed from its peer list.
61 /*
62 /* qmgr_entry_unselect() takes the named entry off the named
63 /* per-site queue's `busy' list and moves it to the queue's
64 /* `todo' list. The entry is also prepended to its peer list again.
65 /*
66 /* qmgr_entry_move_todo() moves the specified "todo" queue entry
67 /* to the specified "todo" queue.
69 /* Panic: interface violations, internal inconsistencies.
71 /* .ad
72 /* .fi
73 /* The Secure Mailer license must be distributed with this software.
74 /* AUTHOR(S)
75 /* Wietse Venema
76 /* IBM T.J. Watson Research
77 /* P.O. Box 704
78 /* Yorktown Heights, NY 10598, USA
79 /*
80 /* Preemptive scheduler enhancements:
81 /* Patrik Rak
82 /* Modra 6
83 /* 155 00, Prague, Czech Republic
84 /*--*/
86 /* System library. */
88 #include <sys_defs.h>
89 #include <stdlib.h>
90 #include <time.h>
92 /* Utility library. */
94 #include <msg.h>
95 #include <mymalloc.h>
96 #include <events.h>
97 #include <vstream.h>
99 /* Global library. */
101 #include <mail_params.h>
102 #include <deliver_request.h> /* opportunistic session caching */
104 /* Application-specific. */
106 #include "qmgr.h"
108 /* qmgr_entry_select - select queue entry for delivery */
111 {
112  const char *myname = "qmgr_entry_select";
113  QMGR_ENTRY *entry;
114  QMGR_QUEUE *queue;
116  if ((entry = peer-> != 0) {
117  queue = entry->queue;
118  QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
119  queue->todo_refcount--;
120  QMGR_LIST_APPEND(queue->busy, entry, queue_peers);
121  queue->busy_refcount++;
122  QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
123  peer->job->selected_entries++;
125  /*
126  * With opportunistic session caching, the delivery agent must not
127  * only 1) save a session upon completion, but also 2) reuse a cached
128  * session upon the next delivery request. In order to not miss out
129  * on 2), we have to make caching sticky or else we get silly
130  * behavior when the in-memory queue drains. Specifically, new
131  * connections must not be made as long as cached connections exist.
132  *
133  * Safety: don't enable opportunistic session caching unless the queue
134  * manager is able to schedule concurrent or back-to-back deliveries
135  * (we need to recognize back-to-back deliveries for transports with
136  * concurrency 1).
137  *
138  * If caching has previously been enabled, but is not now, fetch any
139  * existing entries from the cache, but don't add new ones.
140  */
142  (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
144 #define BACK_TO_BACK_DELIVERY() \
145  (queue->last_done + 1 >= event_time())
147  /*
148  * Turn on session caching after we get up to speed. Don't enable
149  * session caching just because we have concurrent deliveries. This
150  * prevents unnecessary session caching when we have a burst of mail
151  * <= the initial concurrency limit.
152  */
153  if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
155  if (msg_verbose)
156  msg_info("%s: allowing on-demand session caching for %s",
157  myname, queue->name);
158  queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
159  }
160  }
162  /*
163  * Turn off session caching when concurrency drops and we're running
164  * out of steam. This is what prevents from turning off session
165  * caching too early, and from making new connections while old ones
166  * are still cached.
167  */
168  else {
170  if (msg_verbose)
171  msg_info("%s: disallowing on-demand session caching for %s",
172  myname, queue->name);
173  queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
174  }
175  }
176  }
177  return (entry);
178 }
180 /* qmgr_entry_unselect - unselect queue entry for delivery */
183 {
184  QMGR_PEER *peer = entry->peer;
185  QMGR_QUEUE *queue = entry->queue;
187  /*
188  * Move the entry back to the todo lists. In case of the peer list, put
189  * it back to the beginning, so the select()/unselect() does not reorder
190  * entries. We use this in qmgr_message_assign() to put recipients into
191  * existing entries when possible.
192  */
193  QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
194  queue->busy_refcount--;
195  QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
196  queue->todo_refcount++;
197  QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers);
198  peer->job->selected_entries--;
199 }
201 /* qmgr_entry_move_todo - move entry between todo queues */
204 {
205  const char *myname = "qmgr_entry_move_todo";
206  QMGR_TRANSPORT *dst_transport = dst_queue->transport;
207  QMGR_MESSAGE *message = entry->message;
208  QMGR_QUEUE *src_queue = entry->queue;
209  QMGR_PEER *dst_peer, *src_peer = entry->peer;
210  QMGR_JOB *dst_job, *src_job = src_peer->job;
211  QMGR_ENTRY *new_entry;
212  int rcpt_count = entry->rcpt_list.len;
214  if (entry->stream != 0)
215  msg_panic("%s: queue %s entry is busy", myname, src_queue->name);
216  if (QMGR_QUEUE_THROTTLED(dst_queue))
217  msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name);
218  if (QMGR_TRANSPORT_THROTTLED(dst_transport))
219  msg_panic("%s: destination transport %s is throttled",
220  myname, dst_transport->name);
222  /*
223  * Create new entry, swap the recipients between the two entries,
224  * adjusting the job counters accordingly, then dispose of the old entry.
225  *
226  * Note that qmgr_entry_done() will also take care of adjusting the
227  * recipient limits of all the message jobs, so we do not have to do that
228  * explicitly for the new job here.
229  *
230  * XXX This does not enforce the per-entry recipient limit, but that is not
231  * a problem as long as qmgr_entry_move_todo() is called only to bounce
232  * or defer mail.
233  */
234  dst_job = qmgr_job_obtain(message, dst_transport);
235  dst_peer = qmgr_peer_obtain(dst_job, dst_queue);
237  new_entry = qmgr_entry_create(dst_peer, message);
239  recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
241  src_job->rcpt_count -= rcpt_count;
242  dst_job->rcpt_count += rcpt_count;
245 }
247 /* qmgr_entry_done - dispose of queue entry */
249 void qmgr_entry_done(QMGR_ENTRY *entry, int which)
250 {
251  const char *myname = "qmgr_entry_done";
252  QMGR_QUEUE *queue = entry->queue;
253  QMGR_MESSAGE *message = entry->message;
254  QMGR_PEER *peer = entry->peer;
255  QMGR_JOB *sponsor, *job = peer->job;
256  QMGR_TRANSPORT *transport = job->transport;
258  /*
259  * Take this entry off the in-core queue.
260  */
261  if (entry->stream != 0)
262  msg_panic("%s: file is open", myname);
263  if (which == QMGR_QUEUE_BUSY) {
264  QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
265  queue->busy_refcount--;
266  } else if (which == QMGR_QUEUE_TODO) {
267  QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
268  job->selected_entries++;
269  QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
270  queue->todo_refcount--;
271  } else {
272  msg_panic("%s: bad queue spec: %d", myname, which);
273  }
275  /*
276  * Decrease the in-core recipient counts and free the recipient list and
277  * the structure itself.
278  */
279  job->rcpt_count -= entry->rcpt_list.len;
280  message->rcpt_count -= entry->rcpt_list.len;
283  myfree((void *) entry);
285  /*
286  * Make sure that the transport of any retired or finishing job that
287  * donated recipient slots to this message gets them back first. Then, if
288  * possible, pass the remaining unused recipient slots to the next job on
289  * the job list.
290  */
291  for (sponsor = message->; sponsor; sponsor = sponsor-> {
292  if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job)
293  continue;
294  if (sponsor->stack_level < 0 || message->rcpt_offset == 0)
295  qmgr_job_move_limits(sponsor);
296  }
297  if (message->rcpt_offset == 0) {
299  }
301  /*
302  * We implement a rate-limited queue by emulating a slow delivery
303  * channel. We insert the artificial delays with qmgr_queue_suspend().
304  *
305  * When a queue is suspended, we must postpone any job scheduling decisions
306  * until the queue is resumed. Otherwise, we make those decisions now.
307  * The job scheduling decisions are made by qmgr_job_blocker_update().
308  */
309  if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
310  if (queue->window > 1)
311  msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
312  myname, transport->name, queue->name, queue->window);
313  if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */
314  qmgr_queue_unthrottle(queue);
315  if (QMGR_QUEUE_READY(queue))
316  qmgr_queue_suspend(queue, transport->rate_delay);
317  }
318  if (!QMGR_QUEUE_SUSPENDED(queue)
319  && queue->blocker_tag == transport->blocker_tag)
322  /*
323  * When there are no more entries for this peer, discard the peer
324  * structure.
325  */
326  peer->refcount--;
327  if (peer->refcount == 0)
328  qmgr_peer_free(peer);
330  /*
331  * Maintain back-to-back delivery status.
332  */
333  if (which == QMGR_QUEUE_BUSY)
334  queue->last_done = event_time();
336  /*
337  * When the in-core queue for this site is empty and when this site is
338  * not dead or suspended, discard the in-core queue. When this site is
339  * dead, but the number of in-core queues exceeds some threshold, get rid
340  * of this in-core queue anyway, in order to avoid running out of memory.
341  */
342  if (queue-> == 0 && queue-> == 0) {
344  qmgr_queue_unthrottle(queue);
345  if (QMGR_QUEUE_READY(queue))
346  qmgr_queue_done(queue);
347  }
349  /*
350  * Update the in-core message reference count. When the in-core message
351  * structure has no more references, dispose of the message.
352  */
353  message->refcount--;
354  if (message->refcount == 0)
355  qmgr_active_done(message);
356 }
358 /* qmgr_entry_create - create queue todo entry */
361 {
362  QMGR_ENTRY *entry;
363  QMGR_QUEUE *queue = peer->queue;
365  /*
366  * Sanity check.
367  */
368  if (QMGR_QUEUE_THROTTLED(queue))
369  msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
371  /*
372  * Create the delivery request.
373  */
374  entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
375  entry->stream = 0;
376  entry->message = message;
378  message->refcount++;
379  entry->peer = peer;
380  QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers);
381  peer->refcount++;
382  entry->queue = queue;
383  QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
384  queue->todo_refcount++;
385  peer->job->read_entries++;
387  /*
388  * Warn if a destination is falling behind while the active queue
389  * contains a non-trivial amount of single-recipient email. When a
390  * destination takes up more and more space in the active queue, then
391  * other mail will not get through and delivery performance will suffer.
392  *
393  * XXX At this point in the code, the busy reference count is still less
394  * than the concurrency limit (otherwise this code would not be invoked
395  * in the first place) so we have to make make some awkward adjustments
396  * below.
397  *
398  * XXX The queue length test below looks at the active queue share of an
399  * individual destination. This catches the case where mail for one
400  * destination is falling behind because it has to round-robin compete
401  * with many other destinations. However, Postfix will also perform
402  * poorly when most of the active queue is tied up by a small number of
403  * concurrency limited destinations. The queue length test below detects
404  * such conditions only indirectly.
405  *
406  * XXX This code does not detect the case that the active queue is being
407  * starved because incoming mail is pounding the disk.
408  */
410  int queue_length = queue->todo_refcount + queue->busy_refcount;
411  time_t now;
412  QMGR_TRANSPORT *transport;
413  double active_share;
415  if (queue_length > var_qmgr_active_limit / 5
416  && (now = event_time()) >= queue->clog_time_to_warn) {
417  active_share = queue_length / (double) qmgr_message_count;
418  msg_warn("mail for %s is using up %d of %d active queue entries",
419  queue->nexthop, queue_length, qmgr_message_count);
420  if (active_share < 0.9)
421  msg_warn("this may slow down other mail deliveries");
422  transport = queue->transport;
423  if (transport->dest_concurrency_limit > 0
424  && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
425  msg_warn("you may need to increase the %s%s from %d",
426  transport->name, _DEST_CON_LIMIT,
427  transport->dest_concurrency_limit);
428  else if (queue->window > var_qmgr_active_limit * active_share)
429  msg_warn("you may need to increase the %s from %d",
431  else if (queue-> != queue->peers.prev)
432  msg_warn("you may need a separate transport for %s",
433  queue->nexthop);
434  else {
435  msg_warn("you may need to reduce %s connect and helo timeouts",
436  transport->name);
437  msg_warn("so that Postfix quickly skips unavailable hosts");
438  msg_warn("you may need to increase the %s and %s",
440  msg_warn("so that Postfix wastes less time on undeliverable mail");
441  msg_warn("you may need to increase the %s process limit",
442  transport->name);
443  }
444  msg_warn("please avoid flushing the whole queue when you have");
445  msg_warn("lots of deferred mail, that is bad for performance");
446  msg_warn("to turn off these warnings specify: %s = 0",
449  }
450  }
451  return (entry);
452 }
int msg_verbose
Definition: msg.c:177
int dest_concurrency_limit
Definition: qmgr.h:156
Definition: qmgr.h:193
void myfree(void *ptr)
Definition: mymalloc.c:207
void qmgr_peer_free(QMGR_PEER *)
Definition: qmgr_peer.c:94
Definition: qmgr.h:210
int stack_level
Definition: qmgr.h:417
time_t last_done
Definition: qmgr.h:199
QMGR_JOB_LIST message_peers
Definition: qmgr.h:411
QMGR_QUEUE * queue
Definition: qmgr.h:266
int selected_entries
Definition: qmgr.h:426
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
Definition: qmgr.h:217
int blocker_tag
Definition: qmgr.h:253
int var_qmgr_clog_warn_time
Definition: qmgr.c:426
Definition: mail_params.h:749
void qmgr_queue_suspend(QMGR_QUEUE *, int)
Definition: qmgr_queue.c:160
int read_entries
Definition: qmgr.h:428
Definition: qmgr.h:181
int var_qmgr_active_limit
Definition: qmgr.c:415
int var_qmgr_rcpt_limit
Definition: qmgr.c:416
Definition: qmgr.h:148
#define QMGR_LIST_APPEND(head, object)
Definition: qmgr.h:66
void qmgr_queue_unthrottle(QMGR_QUEUE *)
Definition: qmgr_queue.c:198
QMGR_JOB_LIST job_list
Definition: qmgr.h:370
Definition: qmgr.h:216
void qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry)
Definition: qmgr_entry.c:170
int busy_refcount
Definition: qmgr.h:203
Definition: qmgr.h:211
Definition: qmgr.h:246
Definition: qmgr_job.c:246
void qmgr_entry_done(QMGR_ENTRY *entry, int which)
Definition: qmgr_entry.c:212
Definition: mail_params.h:882
char * name
Definition: qmgr.h:200
QMGR_PEER * qmgr_peer_obtain(QMGR_JOB *, QMGR_QUEUE *)
Definition: qmgr_peer.c:122
int refcount
Definition: qmgr.h:436
Definition: qmgr.h:209
int rcpt_count
Definition: qmgr.h:429
void msg_warn(const char *fmt,...)
Definition: msg.c:215
void qmgr_queue_done(QMGR_QUEUE *)
Definition: qmgr_queue.c:374
void qmgr_job_blocker_update(QMGR_QUEUE *)
Definition: qmgr_job.c:950
int dflags
Definition: qmgr.h:198
void qmgr_active_done(QMGR_MESSAGE *)
Definition: qmgr_active.c:258
long rcpt_offset
Definition: qmgr.h:310
Definition: qmgr.h:149
time_t clog_time_to_warn
Definition: qmgr.h:213
QMGR_TRANSPORT * transport
Definition: qmgr.h:410
char * name
Definition: qmgr.h:155
int rcpt_count
Definition: qmgr.h:367
int var_helpful_warnings
Definition: mail_params.c:231
int rcpt_limit
Definition: qmgr.h:430
Definition: qmgr.h:408
QMGR_MESSAGE * message
Definition: qmgr.h:264
int todo_refcount
Definition: qmgr.h:202
QMGR_TRANSPORT * transport
Definition: qmgr.h:208
time_t event_time(void)
Definition: events.c:647
void recipient_list_init(RECIPIENT_LIST *list, int variant)
#define QMGR_LIST_UNLINK(head, type, object)
Definition: qmgr.h:56
void recipient_list_swap(RECIPIENT_LIST *a, RECIPIENT_LIST *b)
Definition: mail_params.h:776
int qmgr_recipient_count
Definition: qmgr_message.c:151
QMGR_QUEUE * queue
Definition: qmgr.h:435
int qmgr_queue_count
Definition: qmgr_queue.c:112
QMGR_JOB * job
Definition: qmgr.h:434
void recipient_list_free(RECIPIENT_LIST *list)
QMGR_PEER * peer
Definition: qmgr.h:306
QMGR_ENTRY * qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message)
Definition: qmgr_entry.c:304
VSTREAM * stream
Definition: qmgr.h:263
Definition: qmgr.h:265
#define QMGR_LIST_PREPEND(head, object)
Definition: qmgr.h:77
QMGR_JOB * next
Definition: qmgr.h:165
int window
Definition: qmgr.h:204
int rate_delay
Definition: qmgr.h:167
char * nexthop
Definition: qmgr.h:201
void qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry)
Definition: qmgr_entry.c:180
Definition: qmgr.h:244
Definition: mail_params.h:745
QMGR_ENTRY_LIST entry_list
Definition: qmgr.h:437
int blocker_tag
Definition: qmgr.h:201
int qmgr_message_count
Definition: qmgr_message.c:150
void qmgr_job_move_limits(QMGR_JOB *)
Definition: qmgr_job.c:278
QMGR_ENTRY * qmgr_entry_select(QMGR_QUEUE *queue)
Definition: qmgr_entry.c:102
int refcount
Definition: qmgr.h:289
Definition: qmgr.h:245
Definition: mail_params.h:844
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
void msg_info(const char *fmt,...)
Definition: msg.c:199