Postfix3.3.1
qmgr_job.c
[詳解]
1 /*++
2 /* NAME
3 /* qmgr_job 3
4 /* SUMMARY
5 /* per-transport jobs
6 /* SYNOPSIS
7 /* #include "qmgr.h"
8 /*
9 /* QMGR_JOB *qmgr_job_obtain(message, transport)
10 /* QMGR_MESSAGE *message;
11 /* QMGR_TRANSPORT *transport;
12 /*
13 /* void qmgr_job_free(job)
14 /* QMGR_JOB *job;
15 /*
16 /* void qmgr_job_move_limits(job)
17 /* QMGR_JOB *job;
18 /*
19 /* QMGR_ENTRY *qmgr_job_entry_select(transport)
20 /* QMGR_TRANSPORT *transport;
21 /*
22 /* void qmgr_job_blocker_update(queue)
23 /* QMGR_QUEUE *queue;
24 /* DESCRIPTION
25 /* These routines add/delete/manipulate per-transport jobs.
26 /* Each job corresponds to a specific transport and message.
27 /* Each job has a peer list containing all pending delivery
28 /* requests for that message.
29 /*
30 /* qmgr_job_obtain() finds an existing job for named message and
31 /* transport combination. New empty job is created if no existing can
32 /* be found. In either case, the job is prepared for assignment of
33 /* (more) message recipients.
34 /*
35 /* qmgr_job_free() disposes of a per-transport job after all
36 /* its entries have been taken care of. It is an error to dispose
37 /* of a job that is still in use.
38 /*
39 /* qmgr_job_entry_select() attempts to find the next entry suitable
40 /* for delivery. The job preempting algorithm is also exercised.
41 /* If necessary, an attempt to read more recipients into core is made.
42 /* This can result in creation of more job, queue and entry structures.
43 /*
44 /* qmgr_job_blocker_update() updates the status of blocked
45 /* jobs after a decrease in the queue's concurrency level,
46 /* after the queue is throttled, or after the queue is resumed
47 /* from suspension.
48 /*
49 /* qmgr_job_move_limits() takes care of proper distribution of the
50 /* per-transport recipients limit among the per-transport jobs.
51 /* Should be called whenever a job's recipient slot becomes available.
52 /* DIAGNOSTICS
53 /* Panic: consistency check failure.
54 /* LICENSE
55 /* .ad
56 /* .fi
57 /* The Secure Mailer license must be distributed with this software.
58 /* AUTHOR(S)
59 /* Patrik Rak
60 /* patrik@raxoft.cz
61 /*--*/
62 
63 /* System library. */
64 
65 #include <sys_defs.h>
66 
67 /* Utility library. */
68 
69 #include <msg.h>
70 #include <htable.h>
71 #include <mymalloc.h>
72 #include <sane_time.h>
73 
74 /* Application-specific. */
75 
76 #include "qmgr.h"
77 
78 /* Forward declarations */
79 
80 static void qmgr_job_pop(QMGR_JOB *);
81 
82 /* Helper macros */
83 
84 #define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
85 
86 /*
87  * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
88  * because we don't know if all those unread recipients go to our transport yet.
89  */
90 
91 #define MIN_ENTRIES(job) ((job)->read_entries)
92 #define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
93 
94 #define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
95 
96 #define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
97 
98 /* qmgr_job_create - create and initialize message job structure */
99 
100 static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
101 {
102  QMGR_JOB *job;
103 
104  job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
105  job->message = message;
106  QMGR_LIST_APPEND(message->job_list, job, message_peers);
107  htable_enter(transport->job_byname, message->queue_id, (void *) job);
108  job->transport = transport;
111  job->stack_parent = 0;
114  job->stack_level = -1;
115  job->blocker_tag = 0;
116  job->peer_byname = htable_create(0);
118  job->slots_used = 0;
119  job->slots_available = 0;
120  job->selected_entries = 0;
121  job->read_entries = 0;
122  job->rcpt_count = 0;
123  job->rcpt_limit = 0;
124  return (job);
125 }
126 
127 /* qmgr_job_link - append the job to the job lists based on the time it was queued */
128 
129 static void qmgr_job_link(QMGR_JOB *job)
130 {
131  QMGR_TRANSPORT *transport = job->transport;
132  QMGR_MESSAGE *message = job->message;
133  QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current;
134  int delay;
135 
136  /*
137  * Sanity checks.
138  */
139  if (job->stack_level >= 0)
140  msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level);
141 
142  /*
143  * Traverse the time list and the scheduler list from the end and stop
144  * when we found job older than the one being linked.
145  *
146  * During the traversals keep track if we have come across either the
147  * current job or the first unread job on the job list. If this is the
148  * case, these pointers will be adjusted below as required.
149  *
150  * Although both lists are exactly the same when only jobs on the stack
151  * level zero are considered, it's easier to traverse them separately.
152  * Otherwise it's impossible to keep track of the current job pointer
153  * effectively.
154  *
155  * This may look inefficient but under normal operation it is expected that
156  * the loops will stop right away, resulting in normal list appends
157  * below. However, this code is necessary for reviving retired jobs and
158  * for jobs which are created long after the first chunk of recipients
159  * was read in-core (either of these can happen only for multi-transport
160  * messages).
161  *
162  * XXX Note that we test stack_parent rather than stack_level below. This
163  * subtle difference allows us to enqueue the job in correct time order
164  * with respect to orphaned children even after their original parent on
165  * level zero is gone. Consequently, the early loop stop in candidate
166  * selection works reliably, too. These are the reasons why we care to
167  * bother with children adoption at all.
168  */
169  current = transport->job_current;
170  for (next = 0, prev = transport->job_list.prev; prev;
171  next = prev, prev = prev->transport_peers.prev) {
172  if (prev->stack_parent == 0) {
173  delay = message->queued_time - prev->message->queued_time;
174  if (delay >= 0)
175  break;
176  }
177  if (current == prev)
178  current = 0;
179  }
180  list_prev = prev;
181  list_next = next;
182 
183  unread = transport->job_next_unread;
184  for (next = 0, prev = transport->job_bytime.prev; prev;
185  next = prev, prev = prev->time_peers.prev) {
186  delay = message->queued_time - prev->message->queued_time;
187  if (delay >= 0)
188  break;
189  if (unread == prev)
190  unread = 0;
191  }
192 
193  /*
194  * Link the job into the proper place on the job lists and mark it so we
195  * know it has been linked.
196  */
197  job->stack_level = 0;
198  QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers);
199  QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers);
200 
201  /*
202  * Update the current job pointer if necessary.
203  */
204  if (current == 0)
205  transport->job_current = job;
206 
207  /*
208  * Update the pointer to the first unread job on the job list and steal
209  * the unused recipient slots from the old one.
210  */
211  if (unread == 0) {
212  unread = transport->job_next_unread;
213  transport->job_next_unread = job;
214  if (unread != 0)
215  qmgr_job_move_limits(unread);
216  }
217 
218  /*
219  * Get as much recipient slots as possible. The excess will be returned
220  * to the transport pool as soon as the exact amount required is known
221  * (which is usually after all recipients have been read in core).
222  */
223  if (transport->rcpt_unused > 0) {
224  job->rcpt_limit += transport->rcpt_unused;
225  message->rcpt_limit += transport->rcpt_unused;
226  transport->rcpt_unused = 0;
227  }
228 }
229 
230 /* qmgr_job_find - lookup job associated with named message and transport */
231 
232 static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
233 {
234 
235  /*
236  * Instead of traversing the message job list, we use single per
237  * transport hash table. This is better (at least with respect to memory
238  * usage) than having single hash table (usually almost empty) for each
239  * message.
240  */
241  return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id));
242 }
243 
244 /* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
245 
247 {
248  QMGR_JOB *job;
249 
250  /*
251  * Try finding an existing job, reviving it if it was already retired.
252  * Create a new job for this transport/message combination otherwise. In
253  * either case, the job ends linked on the job lists.
254  */
255  if ((job = qmgr_job_find(message, transport)) == 0)
256  job = qmgr_job_create(message, transport);
257  if (job->stack_level < 0)
258  qmgr_job_link(job);
259 
260  /*
261  * Reset the candidate cache because of the new expected recipients. Make
262  * sure the job is not marked as a blocker for the same reason. Note that
263  * this can result in having a non-blocker followed by more blockers.
264  * Consequently, we can't just update the current job pointer, we have to
265  * reset it. Fortunately qmgr_job_entry_select() will easily deal with
266  * this and will lookup the real current job for us.
267  */
268  RESET_CANDIDATE_CACHE(transport);
269  if (IS_BLOCKER(job, transport)) {
270  job->blocker_tag = 0;
271  transport->job_current = transport->job_list.next;
272  }
273  return (job);
274 }
275 
276 /* qmgr_job_move_limits - move unused recipient slots to the next unread job */
277 
279 {
280  QMGR_TRANSPORT *transport = job->transport;
281  QMGR_MESSAGE *message = job->message;
282  QMGR_JOB *next = transport->job_next_unread;
283  int rcpt_unused, msg_rcpt_unused;
284 
285  /*
286  * Find next unread job on the job list if necessary. Cache it for later.
287  * This makes the amortized efficiency of this routine O(1) per job. Note
288  * that we use the time list whose ordering doesn't change over time.
289  */
290  if (job == next) {
291  for (next = next->time_peers.next; next; next = next->time_peers.next)
292  if (next->message->rcpt_offset != 0)
293  break;
294  transport->job_next_unread = next;
295  }
296 
297  /*
298  * Calculate the number of available unused slots.
299  */
300  rcpt_unused = job->rcpt_limit - job->rcpt_count;
301  msg_rcpt_unused = message->rcpt_limit - message->rcpt_count;
302  if (msg_rcpt_unused < rcpt_unused)
303  rcpt_unused = msg_rcpt_unused;
304 
305  /*
306  * Transfer the unused recipient slots back to the transport pool and to
307  * the next not-fully-read job. Job's message limits are adjusted
308  * accordingly. Note that the transport pool can be negative if we used
309  * some of the rcpt_per_stack slots.
310  */
311  if (rcpt_unused > 0) {
312  job->rcpt_limit -= rcpt_unused;
313  message->rcpt_limit -= rcpt_unused;
314  transport->rcpt_unused += rcpt_unused;
315  if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) {
316  next->rcpt_limit += rcpt_unused;
317  next->message->rcpt_limit += rcpt_unused;
318  transport->rcpt_unused = 0;
319  }
320  }
321 }
322 
323 /* qmgr_job_parent_gone - take care of orphaned stack children */
324 
325 static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent)
326 {
327  QMGR_JOB *child;
328 
329  while ((child = job->stack_children.next) != 0) {
330  QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings);
331  if (parent != 0)
332  QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings);
333  child->stack_parent = parent;
334  }
335 }
336 
337 /* qmgr_job_unlink - unlink the job from the job lists */
338 
339 static void qmgr_job_unlink(QMGR_JOB *job)
340 {
341  const char *myname = "qmgr_job_unlink";
342  QMGR_TRANSPORT *transport = job->transport;
343 
344  /*
345  * Sanity checks.
346  */
347  if (job->stack_level != 0)
348  msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level);
349  if (job->stack_parent != 0)
350  msg_panic("%s: parent present", myname);
351  if (job->stack_siblings.next != 0)
352  msg_panic("%s: siblings present", myname);
353 
354  /*
355  * Make sure that children of job on zero stack level are informed that
356  * their parent is gone too.
357  */
358  qmgr_job_parent_gone(job, 0);
359 
360  /*
361  * Update the current job pointer if necessary.
362  */
363  if (transport->job_current == job)
364  transport->job_current = job->transport_peers.next;
365 
366  /*
367  * Invalidate the candidate selection cache if necessary.
368  */
369  if (job == transport->candidate_cache
370  || job == transport->candidate_cache_current)
371  RESET_CANDIDATE_CACHE(transport);
372 
373  /*
374  * Remove the job from the job lists and mark it as unlinked.
375  */
376  QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
377  QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers);
378  job->stack_level = -1;
379 }
380 
381 /* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
382 
383 static void qmgr_job_retire(QMGR_JOB *job)
384 {
385  if (msg_verbose)
386  msg_info("qmgr_job_retire: %s", job->message->queue_id);
387 
388  /*
389  * Pop the job from the job stack if necessary.
390  */
391  if (job->stack_level > 0)
392  qmgr_job_pop(job);
393 
394  /*
395  * Make sure this job is not cached as the next unread job for this
396  * transport. The qmgr_entry_done() will make sure that the slots donated
397  * by this job are moved back to the transport pool as soon as possible.
398  */
400 
401  /*
402  * Remove the job from the job lists. Note that it remains on the message
403  * job list, though, and that it can be revived by using
404  * qmgr_job_obtain(). Also note that the available slot counter is left
405  * intact.
406  */
407  qmgr_job_unlink(job);
408 }
409 
410 /* qmgr_job_free - release the job structure */
411 
413 {
414  const char *myname = "qmgr_job_free";
415  QMGR_MESSAGE *message = job->message;
416  QMGR_TRANSPORT *transport = job->transport;
417 
418  if (msg_verbose)
419  msg_info("%s: %s %s", myname, message->queue_id, transport->name);
420 
421  /*
422  * Sanity checks.
423  */
424  if (job->rcpt_count)
425  msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count);
426 
427  /*
428  * Pop the job from the job stack if necessary.
429  */
430  if (job->stack_level > 0)
431  qmgr_job_pop(job);
432 
433  /*
434  * Return any remaining recipient slots back to the recipient slots pool.
435  */
437  if (job->rcpt_limit)
438  msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit);
439 
440  /*
441  * Unlink and discard the structure. Check if the job is still linked on
442  * the job lists or if it was already retired before unlinking it.
443  */
444  if (job->stack_level >= 0)
445  qmgr_job_unlink(job);
446  QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers);
447  htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0);
448  htable_free(job->peer_byname, (void (*) (void *)) 0);
449  myfree((void *) job);
450 }
451 
452 /* qmgr_job_count_slots - maintain the delivery slot counters */
453 
454 static void qmgr_job_count_slots(QMGR_JOB *job)
455 {
456 
457  /*
458  * Count the number of delivery slots used during the delivery of the
459  * selected job. Also count the number of delivery slots available for
460  * its preemption.
461  *
462  * Despite its trivial look, this is one of the key parts of the theory
463  * behind this preempting scheduler.
464  */
465  job->slots_available++;
466  job->slots_used++;
467 
468  /*
469  * If the selected job is not the original current job, reset the
470  * candidate cache because the change above have slightly increased the
471  * chance of this job becoming a candidate next time.
472  *
473  * Don't expect that the change of the current jobs this turn will render
474  * the candidate cache invalid the next turn - it can happen that the
475  * next turn the original current job will be selected again and the
476  * cache would be considered valid in such case.
477  */
478  if (job != job->transport->candidate_cache_current)
480 }
481 
482 /* qmgr_job_candidate - find best job candidate for preempting given job */
483 
484 static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current)
485 {
486  QMGR_TRANSPORT *transport = current->transport;
487  QMGR_JOB *job, *best_job = 0;
488  double score, best_score = 0.0;
489  int max_slots, max_needed_entries, max_total_entries;
490  int delay;
491  time_t now = sane_time();
492 
493  /*
494  * Fetch the result directly from the cache if the cache is still valid.
495  *
496  * Note that we cache negative results too, so the cache must be invalidated
497  * by resetting the cached current job pointer, not the candidate pointer
498  * itself.
499  *
500  * In case the cache is valid and contains no candidate, we can ignore the
501  * time change, as it affects only which candidate is the best, not if
502  * one exists. However, this feature requires that we no longer relax the
503  * cache resetting rules, depending on the automatic cache timeout.
504  */
505  if (transport->candidate_cache_current == current
506  && (transport->candidate_cache_time == now
507  || transport->candidate_cache == 0))
508  return (transport->candidate_cache);
509 
510  /*
511  * Estimate the minimum amount of delivery slots that can ever be
512  * accumulated for the given job. All jobs that won't fit into these
513  * slots are excluded from the candidate selection.
514  */
515  max_slots = (MIN_ENTRIES(current) - current->selected_entries
516  + current->slots_available) / transport->slot_cost;
517 
518  /*
519  * Select the candidate with best time_since_queued/total_recipients
520  * score. In addition to jobs which don't meet the max_slots limit, skip
521  * also jobs which don't have any selectable entries at the moment.
522  *
523  * Instead of traversing the whole job list we traverse it just from the
524  * current job forward. This has several advantages. First, we skip some
525  * of the blocker jobs and the current job itself right away. But the
526  * really important advantage is that we are sure that we don't consider
527  * any jobs that are already stack children of the current job. Thanks to
528  * this we can easily include all encountered jobs which are leaf
529  * children of some of the preempting stacks as valid candidates. All we
530  * need to do is to make sure we do not include any of the stack parents.
531  * And, because the leaf children are not ordered by the time since
532  * queued, we have to exclude them from the early loop end test.
533  *
534  * However, don't bother searching if we can't find anything suitable
535  * anyway.
536  */
537  if (max_slots > 0) {
538  for (job = current->transport_peers.next; job; job = job->transport_peers.next) {
539  if (job->stack_children.next != 0 || IS_BLOCKER(job, transport))
540  continue;
541  max_total_entries = MAX_ENTRIES(job);
542  max_needed_entries = max_total_entries - job->selected_entries;
543  delay = now - job->message->queued_time + 1;
544  if (max_needed_entries > 0 && max_needed_entries <= max_slots) {
545  score = (double) delay / max_total_entries;
546  if (score > best_score) {
547  best_score = score;
548  best_job = job;
549  }
550  }
551 
552  /*
553  * Stop early if the best score is as good as it can get.
554  */
555  if (delay <= best_score && job->stack_level == 0)
556  break;
557  }
558  }
559 
560  /*
561  * Cache the result for later use.
562  */
563  transport->candidate_cache = best_job;
564  transport->candidate_cache_current = current;
565  transport->candidate_cache_time = now;
566 
567  return (best_job);
568 }
569 
570 /* qmgr_job_preempt - preempt large message with smaller one */
571 
572 static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current)
573 {
574  const char *myname = "qmgr_job_preempt";
575  QMGR_TRANSPORT *transport = current->transport;
576  QMGR_JOB *job, *prev;
577  int expected_slots;
578  int rcpt_slots;
579 
580  /*
581  * Suppress preempting completely if the current job is not big enough to
582  * accumulate even the minimal number of slots required.
583  *
584  * Also, don't look for better job candidate if there are no available slots
585  * yet (the count can get negative due to the slot loans below).
586  */
587  if (current->slots_available <= 0
588  || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost)
589  return (current);
590 
591  /*
592  * Find best candidate for preempting the current job.
593  *
594  * Note that the function also takes care that the candidate fits within the
595  * number of delivery slots which the current job is still able to
596  * accumulate.
597  */
598  if ((job = qmgr_job_candidate(current)) == 0)
599  return (current);
600 
601  /*
602  * Sanity checks.
603  */
604  if (job == current)
605  msg_panic("%s: attempt to preempt itself", myname);
606  if (job->stack_children.next != 0)
607  msg_panic("%s: already on the job stack (%d)", myname, job->stack_level);
608  if (job->stack_level < 0)
609  msg_panic("%s: not on the job list (%d)", myname, job->stack_level);
610 
611  /*
612  * Check if there is enough available delivery slots accumulated to
613  * preempt the current job.
614  *
615  * The slot loaning scheme improves the average message response time. Note
616  * that the loan only allows the preemption happen earlier, though. It
617  * doesn't affect how many slots have to be "paid" - in either case the
618  * full number of slots required has to be accumulated later before the
619  * current job can be preempted again.
620  */
621  expected_slots = MAX_ENTRIES(job) - job->selected_entries;
622  if (current->slots_available / transport->slot_cost + transport->slot_loan
623  < expected_slots * transport->slot_loan_factor / 100.0)
624  return (current);
625 
626  /*
627  * Preempt the current job.
628  *
629  * This involves placing the selected candidate in front of the current job
630  * on the job list and updating the stack parent/child/sibling pointers
631  * appropriately. But first we need to make sure that the candidate is
632  * taken from its previous job stack which it might be top of.
633  */
634  if (job->stack_level > 0)
635  qmgr_job_pop(job);
636  QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
637  prev = current->transport_peers.prev;
638  QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers);
639  job->stack_parent = current;
640  QMGR_LIST_APPEND(current->stack_children, job, stack_siblings);
641  job->stack_level = current->stack_level + 1;
642 
643  /*
644  * Update the current job pointer and explicitly reset the candidate
645  * cache.
646  */
647  transport->job_current = job;
648  RESET_CANDIDATE_CACHE(transport);
649 
650  /*
651  * Since the single job can be preempted by several jobs at the same
652  * time, we have to adjust the available slot count now to prevent using
653  * the same slots multiple times. To do that we subtract the number of
654  * slots the preempting job will supposedly use. This number will be
655  * corrected later when that job is popped from the stack to reflect the
656  * number of slots really used.
657  *
658  * As long as we don't need to keep track of how many slots were really
659  * used, we can (ab)use the slots_used counter for counting the
660  * difference between the real and expected amounts instead of the
661  * absolute amount.
662  */
663  current->slots_available -= expected_slots * transport->slot_cost;
664  job->slots_used = -expected_slots;
665 
666  /*
667  * Add part of extra recipient slots reserved for preempting jobs to the
668  * new current job if necessary.
669  *
670  * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
671  * case.
672  */
673  if (job->message->rcpt_offset != 0) {
674  rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2;
675  job->rcpt_limit += rcpt_slots;
676  job->message->rcpt_limit += rcpt_slots;
677  transport->rcpt_unused -= rcpt_slots;
678  }
679  if (msg_verbose)
680  msg_info("%s: %s by %s, level %d", myname, current->message->queue_id,
681  job->message->queue_id, job->stack_level);
682 
683  return (job);
684 }
685 
686 /* qmgr_job_pop - remove the job from its job preemption stack */
687 
688 static void qmgr_job_pop(QMGR_JOB *job)
689 {
690  const char *myname = "qmgr_job_pop";
691  QMGR_TRANSPORT *transport = job->transport;
692  QMGR_JOB *parent;
693 
694  if (msg_verbose)
695  msg_info("%s: %s", myname, job->message->queue_id);
696 
697  /*
698  * Sanity checks.
699  */
700  if (job->stack_level <= 0)
701  msg_panic("%s: not on the job stack (%d)", myname, job->stack_level);
702 
703  /*
704  * Adjust the number of delivery slots available to preempt job's parent.
705  * Note that the -= actually adds back any unused slots, as we have
706  * already subtracted the expected amount of slots from both counters
707  * when we did the preemption.
708  *
709  * Note that we intentionally do not adjust slots_used of the parent. Doing
710  * so would decrease the maximum per message inflation factor if the
711  * preemption appeared near the end of parent delivery.
712  *
713  * For the same reason we do not adjust parent's slots_available if the
714  * parent is not the original parent that was preempted by this job
715  * (i.e., the original parent job has already completed).
716  *
717  * This is another key part of the theory behind this preempting scheduler.
718  */
719  if ((parent = job->stack_parent) != 0
720  && job->stack_level == parent->stack_level + 1)
721  parent->slots_available -= job->slots_used * transport->slot_cost;
722 
723  /*
724  * Remove the job from its parent's children list.
725  */
726  if (parent != 0) {
727  QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings);
728  job->stack_parent = 0;
729  }
730 
731  /*
732  * If there is a parent, let it adopt all those orphaned children.
733  * Otherwise at least notify the children that their parent is gone.
734  */
735  qmgr_job_parent_gone(job, parent);
736 
737  /*
738  * Put the job back to stack level zero.
739  */
740  job->stack_level = 0;
741 
742  /*
743  * Explicitly reset the candidate cache. It's not worth trying to skip
744  * this under some complicated conditions - in most cases the popped job
745  * is the current job so we would have to reset it anyway.
746  */
747  RESET_CANDIDATE_CACHE(transport);
748 
749  /*
750  * Here we leave the remaining work involving the proper placement on the
751  * job list to the caller. The most important reason for this is that it
752  * allows us not to look up where exactly to place the job.
753  *
754  * The caller is also made responsible for invalidating the current job
755  * cache if necessary.
756  */
757 #if 0
758  QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
759  QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers);
760 
761  if (transport->job_current == job)
762  transport->job_current = job->transport_peers.next;
763 #endif
764 }
765 
766 /* qmgr_job_peer_select - select next peer suitable for delivery */
767 
768 static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job)
769 {
770  QMGR_PEER *peer;
771  QMGR_MESSAGE *message = job->message;
772 
773  /*
774  * Try reading in more recipients. We do that as soon as possible
775  * (almost, see below), to make sure there is enough new blood pouring
776  * in. Otherwise single recipient for slow destination might starve the
777  * entire message delivery, leaving lot of fast destination recipients
778  * sitting idle in the queue file.
779  *
780  * Ideally we would like to read in recipients whenever there is a space,
781  * but to prevent excessive I/O, we read them only when enough time has
782  * passed or we can read enough of them at once.
783  *
784  * Note that even if we read the recipients few at a time, the message
785  * loading code tries to put them to existing recipient entries whenever
786  * possible, so the per-destination recipient grouping is not grossly
787  * affected.
788  *
789  * XXX Workaround for logic mismatch. The message->refcount test needs
790  * explanation. If the refcount is zero, it means that qmgr_active_done()
791  * is being completed asynchronously. In such case, we can't read in
792  * more recipients as bad things would happen after qmgr_active_done()
793  * continues processing. Note that this results in the given job being
794  * stalled for some time, but fortunately this particular situation is so
795  * rare that it is not critical. Still we seek for better solution.
796  */
797  if (message->rcpt_offset != 0
798  && message->refcount > 0
799  && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit
800  || (message->rcpt_limit > message->rcpt_count
801  && sane_time() - message->refill_time >= job->transport->refill_delay)))
802  qmgr_message_realloc(message);
803 
804  /*
805  * Get the next suitable peer, if there is any.
806  */
807  if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0)
808  return (peer);
809 
810  /*
811  * There is no suitable peer in-core, so try reading in more recipients
812  * if possible. This is our last chance to get suitable peer before
813  * giving up on this job for now.
814  *
815  * XXX For message->refcount, see above.
816  */
817  if (message->rcpt_offset != 0
818  && message->refcount > 0
819  && message->rcpt_limit > message->rcpt_count) {
820  qmgr_message_realloc(message);
821  if (HAS_ENTRIES(job))
822  return (qmgr_peer_select(job));
823  }
824  return (0);
825 }
826 
827 /* qmgr_job_entry_select - select next entry suitable for delivery */
828 
830 {
831  QMGR_JOB *job, *next;
832  QMGR_PEER *peer;
833  QMGR_ENTRY *entry;
834 
835  /*
836  * Get the current job if there is one.
837  */
838  if ((job = transport->job_current) == 0)
839  return (0);
840 
841  /*
842  * Exercise the preempting algorithm if enabled.
843  *
844  * The slot_cost equal to 1 causes the algorithm to degenerate and is
845  * therefore disabled too.
846  */
847  if (transport->slot_cost >= 2)
848  job = qmgr_job_preempt(job);
849 
850  /*
851  * Select next entry suitable for delivery. In case the current job can't
852  * provide one because of the per-destination concurrency limits, we mark
853  * it as a "blocker" job and continue with the next job on the job list.
854  *
855  * Note that the loop also takes care of getting the "stall" jobs (job with
856  * no entries currently available) out of the way if necessary. Stall
857  * jobs can appear in case of multi-transport messages whose recipients
858  * don't fit in-core at once. Some jobs created by such message may have
859  * only few recipients and would stay on the job list until all other
860  * jobs of that message are delivered, blocking precious recipient slots
861  * available to this transport. Or it can happen that the job has some
862  * more entries but suddenly they all get deferred. Whatever the reason,
863  * we retire such jobs below if we happen to come across some.
864  */
865  for ( /* empty */ ; job; job = next) {
866  next = job->transport_peers.next;
867 
868  /*
869  * Don't bother if the job is known to have no available entries
870  * because of the per-destination concurrency limits.
871  */
872  if (IS_BLOCKER(job, transport))
873  continue;
874 
875  if ((peer = qmgr_job_peer_select(job)) != 0) {
876 
877  /*
878  * We have found a suitable peer. Select one of its entries and
879  * adjust the delivery slot counters.
880  */
881  entry = qmgr_entry_select(peer);
882  qmgr_job_count_slots(job);
883 
884  /*
885  * Remember the current job for the next time so we don't have to
886  * crawl over all those blockers again. They will be reconsidered
887  * when the concurrency limit permits.
888  */
889  transport->job_current = job;
890 
891  /*
892  * In case we selected the very last job entry, remove the job
893  * from the job lists right now.
894  *
895  * This action uses the assumption that once the job entry has been
896  * selected, it can be unselected only before the message ifself
897  * is deferred. Thus the job with all entries selected can't
898  * re-appear with more entries available for selection again
899  * (without reading in more entries from the queue file, which in
900  * turn invokes qmgr_job_obtain() which re-links the job back on
901  * the lists if necessary).
902  *
903  * Note that qmgr_job_move_limits() transfers the recipients slots
904  * correctly even if the job is unlinked from the job list thanks
905  * to the job_next_unread caching.
906  */
907  if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0)
908  qmgr_job_retire(job);
909 
910  /*
911  * Finally. Hand back the fruit of our tedious effort.
912  */
913  return (entry);
914  } else if (HAS_ENTRIES(job)) {
915 
916  /*
917  * The job can't be selected due the concurrency limits. Mark it
918  * together with its queues so we know they are blocking the job
919  * list and they get the appropriate treatment. In particular,
920  * all blockers will be reconsidered when one of the problematic
921  * queues will accept more deliveries. And the job itself will be
922  * reconsidered if it is assigned some more entries.
923  */
924  job->blocker_tag = transport->blocker_tag;
925  for (peer = job->peer_list.next; peer; peer = peer->peers.next)
926  if (peer->entry_list.next != 0)
927  peer->queue->blocker_tag = transport->blocker_tag;
928  } else {
929 
930  /*
931  * The job is "stalled". Retire it until it either gets freed or
932  * gets more entries later.
933  */
934  qmgr_job_retire(job);
935  }
936  }
937 
938  /*
939  * We have not found any entry we could use for delivery. Well, things
940  * must have changed since this transport was selected for asynchronous
941  * allocation. Never mind. Clear the current job pointer and reluctantly
942  * report back that we have failed in our task.
943  */
944  transport->job_current = 0;
945  return (0);
946 }
947 
948 /* qmgr_job_blocker_update - update "blocked job" status */
949 
951 {
952  QMGR_TRANSPORT *transport = queue->transport;
953 
954  /*
955  * If the queue was blocking some of the jobs on the job list, check if
956  * the concurrency limit has lifted. If there are still some pending
957  * deliveries, give it a try and unmark all transport blockers at once.
958  * The qmgr_job_entry_select() will do the rest. In either case make sure
959  * the queue is not marked as a blocker anymore, with extra handling of
960  * queues which were declared dead.
961  *
962  * Note that changing the blocker status also affects the candidate cache.
963  * Most of the cases would be automatically recognized by the current job
964  * change, but we play safe and reset the cache explicitly below.
965  *
966  * Keeping the transport blocker tag odd is an easy way to make sure the tag
967  * never matches jobs that are not explicitly marked as blockers.
968  */
969  if (queue->blocker_tag == transport->blocker_tag) {
970  if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
971  transport->blocker_tag += 2;
972  transport->job_current = transport->job_list.next;
973  transport->candidate_cache_current = 0;
974  }
975  if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue))
976  queue->blocker_tag = 0;
977  }
978 }
struct HTABLE * job_byname
Definition: qmgr.h:190
int msg_verbose
Definition: msg.c:177
int blocker_tag
Definition: qmgr.h:419
void htable_free(HTABLE *table, void(*free_fn)(void *))
Definition: htable.c:287
time_t sane_time(void)
Definition: sane_time.c:61
QMGR_JOB_LIST job_bytime
Definition: qmgr.h:193
QMGR_ENTRY * next
Definition: qmgr.h:193
void myfree(void *ptr)
Definition: mymalloc.c:207
int stack_level
Definition: qmgr.h:417
QMGR_JOB * job_current
Definition: qmgr.h:194
QMGR_JOB * qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
Definition: qmgr_job.c:246
int rcpt_unused
Definition: qmgr.h:178
char * queue_id
Definition: qmgr.h:298
int selected_entries
Definition: qmgr.h:426
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
QMGR_PEER_LIST peers
Definition: qmgr.h:439
void qmgr_job_free(QMGR_JOB *job)
Definition: qmgr_job.c:412
int slot_cost
Definition: qmgr.h:182
int blocker_tag
Definition: qmgr.h:253
QMGR_JOB * stack_parent
Definition: qmgr.h:414
int slots_available
Definition: qmgr.h:424
int read_entries
Definition: qmgr.h:428
struct HTABLE * peer_byname
Definition: qmgr.h:420
QMGR_JOB * prev
Definition: qmgr.h:166
#define HAS_ENTRIES(job)
Definition: qmgr_job.c:84
time_t refill_time
Definition: qmgr.h:337
QMGR_JOB_LIST job_list
Definition: qmgr.h:191
#define MAX_ENTRIES(job)
Definition: qmgr_job.c:92
#define QMGR_LIST_INIT(head)
Definition: qmgr.h:88
#define QMGR_LIST_APPEND(head, object)
Definition: qmgr.h:66
HTABLE * htable_create(ssize_t size)
Definition: htable.c:179
int slot_loan
Definition: qmgr.h:184
int rcpt_per_stack
Definition: qmgr.h:176
QMGR_JOB_LIST job_list
Definition: qmgr.h:370
int busy_refcount
Definition: qmgr.h:203
QMGR_PEER * qmgr_peer_select(QMGR_JOB *)
Definition: qmgr_peer.c:133
QMGR_MESSAGE * qmgr_message_realloc(QMGR_MESSAGE *)
QMGR_JOB_LIST stack_siblings
Definition: qmgr.h:416
QMGR_ENTRY_LIST todo
Definition: qmgr.h:209
int rcpt_count
Definition: qmgr.h:429
QMGR_JOB_LIST stack_children
Definition: qmgr.h:415
long rcpt_offset
Definition: qmgr.h:310
QMGR_ENTRY * qmgr_entry_select(QMGR_QUEUE *)
Definition: qmgr_entry.c:102
QMGR_JOB * job_next_unread
Definition: qmgr.h:195
void * htable_find(HTABLE *table, const char *key)
Definition: htable.c:227
#define IS_BLOCKER(job, transport)
Definition: qmgr_job.c:96
#define RESET_CANDIDATE_CACHE(transport)
Definition: qmgr_job.c:94
QMGR_TRANSPORT * transport
Definition: qmgr.h:410
QMGR_JOB * candidate_cache
Definition: qmgr.h:196
char * name
Definition: qmgr.h:155
int rcpt_count
Definition: qmgr.h:367
time_t candidate_cache_time
Definition: qmgr.h:199
#define MIN_ENTRIES(job)
Definition: qmgr_job.c:91
int rcpt_limit
Definition: qmgr.h:430
Definition: qmgr.h:408
QMGR_TRANSPORT * transport
Definition: qmgr.h:208
void qmgr_job_blocker_update(QMGR_QUEUE *queue)
Definition: qmgr_job.c:950
int slots_used
Definition: qmgr.h:423
#define QMGR_LIST_UNLINK(head, type, object)
Definition: qmgr.h:56
#define QMGR_LIST_LINK(head, pred, object, succ, peers)
Definition: qmgr.h:69
QMGR_QUEUE * queue
Definition: qmgr.h:435
int rcpt_limit
Definition: qmgr.h:368
QMGR_PEER_LIST peer_list
Definition: qmgr.h:422
int refill_delay
Definition: qmgr.h:181
QMGR_JOB_LIST time_peers
Definition: qmgr.h:413
time_t queued_time
Definition: qmgr.h:335
QMGR_JOB * candidate_cache_current
Definition: qmgr.h:198
int slot_loan_factor
Definition: qmgr.h:185
QMGR_JOB * next
Definition: qmgr.h:165
int window
Definition: qmgr.h:204
QMGR_MESSAGE * message
Definition: qmgr.h:409
void htable_delete(HTABLE *table, const char *key, void(*free_fn)(void *))
Definition: htable.c:257
QMGR_ENTRY * qmgr_job_entry_select(QMGR_TRANSPORT *transport)
Definition: qmgr_job.c:829
QMGR_ENTRY_LIST entry_list
Definition: qmgr.h:437
int blocker_tag
Definition: qmgr.h:201
QMGR_PEER * next
Definition: qmgr.h:404
int min_slots
Definition: qmgr.h:186
int refill_limit
Definition: qmgr.h:179
int refcount
Definition: qmgr.h:289
#define QMGR_QUEUE_THROTTLED(q)
Definition: qmgr.h:245
void qmgr_job_move_limits(QMGR_JOB *job)
Definition: qmgr_job.c:278
QMGR_JOB_LIST transport_peers
Definition: qmgr.h:412
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
HTABLE_INFO * htable_enter(HTABLE *table, const char *key, void *value)
Definition: htable.c:212
void msg_info(const char *fmt,...)
Definition: msg.c:199