Postfix3.3.1
qmqp-sink.c
[詳解]
1 /*++
2 /* NAME
3 /* qmqp-sink 1
4 /* SUMMARY
5 /* parallelized QMQP test server
6 /* SYNOPSIS
7 /* .fi
8 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
9 /* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR
10 /*
11 /* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
12 /* \fBunix:\fR\fIpathname\fR \fIbacklog\fR
13 /* DESCRIPTION
14 /* \fBqmqp-sink\fR listens on the named host (or address) and port.
15 /* It receives messages from the network and throws them away.
16 /* The purpose is to measure QMQP client performance, not protocol
17 /* compliance.
18 /* Connections can be accepted on IPv4 or IPv6 endpoints, or on
19 /* UNIX-domain sockets.
20 /* IPv4 and IPv6 are the default.
21 /* This program is the complement of the \fBqmqp-source\fR(1) program.
22 /*
23 /* Note: this is an unsupported test program. No attempt is made
24 /* to maintain compatibility between successive versions.
25 /*
26 /* Arguments:
27 /* .IP \fB-4\fR
28 /* Support IPv4 only. This option has no effect when
29 /* Postfix is built without IPv6 support.
30 /* .IP \fB-6\fR
31 /* Support IPv6 only. This option is not available when
32 /* Postfix is built without IPv6 support.
33 /* .IP \fB-c\fR
34 /* Display a running counter that is updated whenever a delivery
35 /* is completed.
36 /* .IP \fB-v\fR
37 /* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP
38 /* conversation.
39 /* .IP "\fB-x \fItime\fR"
40 /* Terminate after \fItime\fR seconds. This is to facilitate memory
41 /* leak testing.
42 /* SEE ALSO
43 /* qmqp-source(1), QMQP message generator
44 /* LICENSE
45 /* .ad
46 /* .fi
47 /* The Secure Mailer license must be distributed with this software.
48 /* AUTHOR(S)
49 /* Wietse Venema
50 /* IBM T.J. Watson Research
51 /* P.O. Box 704
52 /* Yorktown Heights, NY 10598, USA
53 /*
54 /* Wietse Venema
55 /* Google, Inc.
56 /* 111 8th Avenue
57 /* New York, NY 10011, USA
58 /*--*/
59 
60 /* System library. */
61 
62 #include <sys_defs.h>
63 #include <sys/socket.h>
64 #include <sys/wait.h>
65 #include <unistd.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <fcntl.h>
69 #include <signal.h>
70 
71 /* Utility library. */
72 
73 #include <msg.h>
74 #include <vstring.h>
75 #include <vstream.h>
76 #include <listen.h>
77 #include <events.h>
78 #include <mymalloc.h>
79 #include <iostuff.h>
80 #include <msg_vstream.h>
81 #include <netstring.h>
82 #include <inet_proto.h>
83 
84 /* Global library. */
85 
86 #include <qmqp_proto.h>
87 #include <mail_version.h>
88 
89 /* Application-specific. */
90 
91 typedef struct {
92  VSTREAM *stream; /* client connection */
93  int count; /* bytes to go */
94 } SINK_STATE;
95 
96 static int var_tmout;
97 static VSTRING *buffer;
98 static void disconnect(SINK_STATE *);
99 static int count_deliveries;
100 static int counter;
101 
102 /* send_reply - finish conversation */
103 
104 static void send_reply(SINK_STATE *state)
105 {
106  vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK);
107  NETSTRING_PUT_BUF(state->stream, buffer);
108  netstring_fflush(state->stream);
109  if (count_deliveries) {
110  counter++;
111  vstream_printf("%d\r", counter);
113  }
114  disconnect(state);
115 }
116 
117 /* read_data - read over-all netstring data */
118 
119 static void read_data(int unused_event, void *context)
120 {
121  SINK_STATE *state = (SINK_STATE *) context;
122  int fd = vstream_fileno(state->stream);
123  int count;
124 
125  /*
126  * Refill the VSTREAM buffer, if necessary.
127  */
128  if (VSTREAM_GETC(state->stream) == VSTREAM_EOF)
131  state->count--;
132 
133  /*
134  * Flush the VSTREAM buffer. As documented, vstream_fseek() discards
135  * unread input.
136  */
137  if ((count = vstream_peek(state->stream)) > 0) {
138  state->count -= count;
139  if (state->count <= 0) {
140  send_reply(state);
141  return;
142  }
144  }
145 
146  /*
147  * Do not block while waiting for the arrival of more data.
148  */
150  event_enable_read(fd, read_data, context);
151 }
152 
153 /* read_length - read over-all netstring length */
154 
155 static void read_length(int event, void *context)
156 {
157  SINK_STATE *state = (SINK_STATE *) context;
158 
159  switch (vstream_setjmp(state->stream)) {
160 
161  default:
162  msg_panic("unknown error reading input");
163 
164  case NETSTRING_ERR_TIME:
165  msg_panic("attempt to read non-readable socket");
166  /* NOTREACHED */
167 
168  case NETSTRING_ERR_EOF:
169  msg_warn("lost connection");
170  disconnect(state);
171  return;
172 
174  msg_warn("netstring format error");
175  disconnect(state);
176  return;
177 
178  case NETSTRING_ERR_SIZE:
179  msg_warn("netstring size error");
180  disconnect(state);
181  return;
182 
183  /*
184  * Include the netstring terminator in the read byte count. This
185  * violates abstractions.
186  */
187  case 0:
188  state->count = netstring_get_length(state->stream) + 1;
189  read_data(event, context);
190  return;
191  }
192 }
193 
194 /* disconnect - handle disconnection events */
195 
196 static void disconnect(SINK_STATE *state)
197 {
199  vstream_fclose(state->stream);
200  myfree((void *) state);
201 }
202 
203 /* connect_event - handle connection events */
204 
205 static void connect_event(int unused_event, void *context)
206 {
207  int sock = CAST_ANY_PTR_TO_INT(context);
208  struct sockaddr_storage ss;
209  SOCKADDR_SIZE len = sizeof(ss);
210  struct sockaddr *sa = (struct sockaddr *) &ss;
211  SINK_STATE *state;
212  int fd;
213 
214  if ((fd = accept(sock, sa, &len)) >= 0) {
215  if (msg_verbose)
216  msg_info("connect (%s)",
217 #ifdef AF_LOCAL
218  sa->sa_family == AF_LOCAL ? "AF_LOCAL" :
219 #else
220  sa->sa_family == AF_UNIX ? "AF_UNIX" :
221 #endif
222  sa->sa_family == AF_INET ? "AF_INET" :
223 #ifdef AF_INET6
224  sa->sa_family == AF_INET6 ? "AF_INET6" :
225 #endif
226  "unknown protocol family");
228  state = (SINK_STATE *) mymalloc(sizeof(*state));
229  state->stream = vstream_fdopen(fd, O_RDWR);
230  vstream_tweak_sock(state->stream);
231  netstring_setup(state->stream, var_tmout);
232  event_enable_read(fd, read_length, (void *) state);
233  }
234 }
235 
236 /* terminate - voluntary exit */
237 
238 static void terminate(int unused_event, void *unused_context)
239 {
240  exit(0);
241 }
242 
243 /* usage - explain */
244 
245 static void usage(char *myname)
246 {
247  msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname);
248 }
249 
251 
252 int main(int argc, char **argv)
253 {
254  int sock;
255  int backlog;
256  int ch;
257  int ttl;
258  const char *protocols = INET_PROTO_NAME_ALL;
259 
260  /*
261  * Fingerprint executables and core dumps.
262  */
264 
265  /*
266  * Fix 20051207.
267  */
268  signal(SIGPIPE, SIG_IGN);
269 
270  /*
271  * Initialize diagnostics.
272  */
273  msg_vstream_init(argv[0], VSTREAM_ERR);
274 
275  /*
276  * Parse JCL.
277  */
278  while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) {
279  switch (ch) {
280  case '4':
281  protocols = INET_PROTO_NAME_IPV4;
282  break;
283  case '6':
284  protocols = INET_PROTO_NAME_IPV6;
285  break;
286  case 'c':
287  count_deliveries++;
288  break;
289  case 'v':
290  msg_verbose++;
291  break;
292  case 'x':
293  if ((ttl = atoi(optarg)) <= 0)
294  usage(argv[0]);
295  event_request_timer(terminate, (void *) 0, ttl);
296  break;
297  default:
298  usage(argv[0]);
299  }
300  }
301  if (argc - optind != 2)
302  usage(argv[0]);
303  if ((backlog = atoi(argv[optind + 1])) <= 0)
304  usage(argv[0]);
305 
306  /*
307  * Initialize.
308  */
309  (void) inet_proto_init("protocols", protocols);
310  buffer = vstring_alloc(1024);
311  if (strncmp(argv[optind], "unix:", 5) == 0) {
312  sock = unix_listen(argv[optind] + 5, backlog, BLOCKING);
313  } else {
314  if (strncmp(argv[optind], "inet:", 5) == 0)
315  argv[optind] += 5;
316  sock = inet_listen(argv[optind], backlog, BLOCKING);
317  }
318 
319  /*
320  * Start the event handler.
321  */
322  event_enable_read(sock, connect_event, CAST_INT_TO_VOID_PTR(sock));
323  for (;;)
324  event_loop(-1);
325 }
int msg_verbose
Definition: msg.c:177
void event_enable_read(int fd, EVENT_NOTIFY_RDWR_FN callback, void *context)
Definition: events.c:729
MAIL_VERSION_STAMP_DECLARE
Definition: qmqp-sink.c:250
VSTREAM * stream
Definition: qmqp-sink.c:92
#define VSTREAM_EOF
Definition: vstream.h:110
void myfree(void *ptr)
Definition: mymalloc.c:207
void netstring_except(VSTREAM *stream, int exception)
Definition: netstring.c:192
void netstring_fflush(VSTREAM *stream)
Definition: netstring.c:342
int inet_listen(const char *addr, int backlog, int block_mode)
Definition: inet_listen.c:82
struct SINK_STATE SINK_STATE
#define NETSTRING_ERR_FORMAT
Definition: netstring.h:25
#define CAST_ANY_PTR_TO_INT(cptr)
Definition: sys_defs.h:1298
NORETURN msg_panic(const char *fmt,...)
Definition: msg.c:295
#define VSTREAM_OUT
Definition: vstream.h:67
ssize_t netstring_get_length(VSTREAM *stream)
Definition: netstring.c:199
#define VSTREAM_GETC(vp)
Definition: vstream.h:108
INET_PROTO_INFO * inet_proto_init(const char *context, const char *protocols)
Definition: inet_proto.c:180
void netstring_setup(VSTREAM *stream, int timeout)
Definition: netstring.c:182
int count
Definition: qmqp-sink.c:93
#define INET_PROTO_NAME_ALL
Definition: mail_params.h:992
int main(int argc, char **argv)
Definition: qmqp-sink.c:252
#define vstream_setjmp(stream)
Definition: vstream.h:248
#define SOCKADDR_SIZE
Definition: sys_defs.h:1411
#define NETSTRING_ERR_SIZE
Definition: netstring.h:26
#define vstream_ftimeout(vp)
Definition: vstream.h:124
#define CAST_INT_TO_VOID_PTR(ival)
Definition: sys_defs.h:1299
int vstream_fclose(VSTREAM *stream)
Definition: vstream.c:1268
void event_loop(int delay)
Definition: events.c:998
VSTREAM * vstream_printf(const char *fmt,...)
Definition: vstream.c:1335
#define VSTREAM_PURGE_BOTH
Definition: vstream.h:90
void msg_warn(const char *fmt,...)
Definition: msg.c:215
VSTRING * vstring_alloc(ssize_t len)
Definition: vstring.c:353
#define MAIL_VERSION_STAMP_ALLOCATE
Definition: mail_version.h:67
VSTRING * vstring_sprintf(VSTRING *vp, const char *format,...)
Definition: vstring.c:602
#define INET_PROTO_NAME_IPV6
Definition: mail_params.h:991
NORETURN msg_fatal(const char *fmt,...)
Definition: msg.c:249
int vstream_fflush(VSTREAM *stream)
Definition: vstream.c:1257
#define GETOPT(argc, argv, str)
Definition: sys_defs.h:1313
#define vstream_peek(vp)
Definition: vstream.h:232
#define NETSTRING_ERR_EOF
Definition: netstring.h:23
#define NON_BLOCKING
Definition: iostuff.h:49
#define NETSTRING_ERR_TIME
Definition: netstring.h:24
int non_blocking(int, int)
Definition: non_blocking.c:55
int vstream_tweak_sock(VSTREAM *)
Definition: vstream_tweak.c:60
time_t event_request_timer(EVENT_NOTIFY_TIME_FN callback, void *context, int delay)
Definition: events.c:894
int unix_listen(const char *, int, int)
Definition: unix_listen.c:66
#define vstream_fileno(vp)
Definition: vstream.h:115
void msg_vstream_init(const char *name, VSTREAM *vp)
Definition: msg_vstream.c:77
#define QMQP_STAT_OK
Definition: qmqp_proto.h:14
void event_disable_readwrite(int fd)
Definition: events.c:839
int vstream_fpurge(VSTREAM *stream, int direction)
Definition: vstream.c:1041
#define INET_PROTO_NAME_IPV4
Definition: mail_params.h:990
#define BLOCKING
Definition: iostuff.h:48
#define VSTREAM_ERR
Definition: vstream.h:68
VSTREAM * vstream_fdopen(int fd, int flags)
Definition: vstream.c:1204
#define NETSTRING_PUT_BUF(str, buf)
Definition: netstring.h:41
void * mymalloc(ssize_t len)
Definition: mymalloc.c:150
void msg_info(const char *fmt,...)
Definition: msg.c:199