485 static int vstream_buf_get_ready(
VBUF *);
486 static int vstream_buf_put_ready(
VBUF *);
487 static int vstream_buf_space(
VBUF *, ssize_t);
500 vstream_buf_get_ready, vstream_buf_put_ready, vstream_buf_space,
506 vstream_buf_get_ready, vstream_buf_put_ready, vstream_buf_space,
512 vstream_buf_get_ready, vstream_buf_put_ready, vstream_buf_space,
517 #define VSTREAM_STATIC(v) ((v) >= VSTREAM_IN && (v) <= VSTREAM_ERR)
523 #define VSTREAM_ACC_MASK(f) ((f) & (O_APPEND | O_WRONLY | O_RDWR))
525 #define VSTREAM_CAN_READ(f) (VSTREAM_ACC_MASK(f) == O_RDONLY \
526 || VSTREAM_ACC_MASK(f) == O_RDWR)
527 #define VSTREAM_CAN_WRITE(f) (VSTREAM_ACC_MASK(f) & O_WRONLY \
528 || VSTREAM_ACC_MASK(f) & O_RDWR \
529 || VSTREAM_ACC_MASK(f) & O_APPEND)
531 #define VSTREAM_BUF_COUNT(bp, n) \
532 ((bp)->flags & VSTREAM_FLAG_READ ? -(n) : (n))
534 #define VSTREAM_BUF_AT_START(bp) { \
535 (bp)->cnt = VSTREAM_BUF_COUNT((bp), (bp)->len); \
536 (bp)->ptr = (bp)->data; \
539 #define VSTREAM_BUF_AT_OFFSET(bp, offset) { \
540 (bp)->ptr = (bp)->data + (offset); \
541 (bp)->cnt = VSTREAM_BUF_COUNT(bp, (bp)->len - (offset)); \
544 #define VSTREAM_BUF_AT_END(bp) { \
546 (bp)->ptr = (bp)->data + (bp)->len; \
549 #define VSTREAM_BUF_ZERO(bp) { \
551 (bp)->data = (bp)->ptr = 0; \
552 (bp)->len = (bp)->cnt = 0; \
555 #define VSTREAM_BUF_ACTIONS(bp, get_action, put_action, space_action) { \
556 (bp)->get_ready = (get_action); \
557 (bp)->put_ready = (put_action); \
558 (bp)->space = (space_action); \
561 #define VSTREAM_SAVE_STATE(stream, buffer, filedes) { \
562 stream->buffer = stream->buf; \
563 stream->filedes = stream->fd; \
566 #define VSTREAM_RESTORE_STATE(stream, buffer, filedes) do { \
567 stream->buffer.flags = stream->buf.flags; \
568 stream->buf = stream->buffer; \
569 stream->fd = stream->filedes; \
572 #define VSTREAM_FORK_STATE(stream, buffer, filedes) { \
573 stream->buffer = stream->buf; \
574 stream->filedes = stream->fd; \
575 stream->buffer.data = stream->buffer.ptr = 0; \
576 stream->buffer.len = stream->buffer.cnt = 0; \
577 stream->buffer.flags &= ~VSTREAM_FLAG_FIXED; \
580 #define VSTREAM_FLAG_READ_DOUBLE (VSTREAM_FLAG_READ | VSTREAM_FLAG_DOUBLE)
581 #define VSTREAM_FLAG_WRITE_DOUBLE (VSTREAM_FLAG_WRITE | VSTREAM_FLAG_DOUBLE)
583 #define VSTREAM_FFLUSH_SOME(stream) \
584 vstream_fflush_some((stream), (stream)->buf.len - (stream)->buf.cnt)
587 #define VSTREAM_SUB_TIME(x, y, z) \
589 (x).tv_sec = (y).tv_sec - (z).tv_sec; \
590 (x).tv_usec = (y).tv_usec - (z).tv_usec; \
591 while ((x).tv_usec < 0) { \
592 (x).tv_usec += 1000000; \
595 while ((x).tv_usec >= 1000000) { \
596 (x).tv_usec -= 1000000; \
603 static void vstream_buf_init(
VBUF *bp,
int flags)
619 static void vstream_buf_alloc(
VBUF *bp, ssize_t len)
622 ssize_t used = bp->
ptr - bp->
data;
623 const char *myname =
"vstream_buf_alloc";
626 msg_panic(
"%s: attempt to shrink buffer", myname);
628 msg_panic(
"%s: unable to extend fixed-size buffer", myname);
634 bp->
data = (
unsigned char *)
650 static void vstream_buf_wipe(
VBUF *bp)
660 static int vstream_fflush_some(
VSTREAM *stream, ssize_t to_flush)
662 const char *myname =
"vstream_fflush_some";
670 struct timeval before;
671 struct timeval elapsed;
680 msg_panic(
"%s: read-only stream", myname);
690 msg_panic(
"%s: read/write stream", myname);
693 left_over = used - to_flush;
696 msg_info(
"%s: fd %d flush %ld", myname, stream->
fd, (
long) to_flush);
697 if (to_flush < 0 || left_over < 0)
698 msg_panic(
"%s: bad to_flush %ld", myname, (
long) to_flush);
699 if (to_flush < left_over)
700 msg_panic(
"%s: to_flush < left_over", myname);
704 return (VSTREAM_EOF);
718 for (data = (
void *) bp->
data, len = to_flush; len > 0; len -= n, data += n) {
724 return (VSTREAM_EOF);
727 GETTIMEOFDAY(&before);
732 if ((n = stream->
write_fn(stream->
fd, data, len, timeout, stream->
context)) <= 0) {
734 if (errno == ETIMEDOUT) {
738 return (VSTREAM_EOF);
741 GETTIMEOFDAY(&stream->
iotime);
748 msg_info(
"%s: %d flushed %ld/%ld", myname, stream->
fd,
749 (
long) n, (
long) to_flush);
752 stream->
offset += to_flush;
761 memcpy(bp->
data, bp->
data + to_flush, left_over);
769 static int vstream_fflush_delayed(
VSTREAM *stream)
777 msg_panic(
"vstream_fflush_delayed: bad flags");
802 static int vstream_buf_get_ready(
VBUF *bp)
805 const char *myname =
"vstream_buf_get_ready";
807 struct timeval before;
808 struct timeval elapsed;
822 return (VSTREAM_EOF);
838 msg_panic(
"%s: read/write stream", myname);
861 if (vstream_fflush_delayed(stream))
862 return (VSTREAM_EOF);
868 return (VSTREAM_EOF);
888 return (VSTREAM_EOF);
890 GETTIMEOFDAY(&before);
896 if (errno == ETIMEDOUT) {
900 return (VSTREAM_EOF);
903 return (VSTREAM_EOF);
906 GETTIMEOFDAY(&stream->
iotime);
913 msg_info(
"%s: fd %d got %ld", myname, stream->
fd, (
long) n);
924 static int vstream_buf_put_ready(
VBUF *bp)
927 const char *myname =
"vstream_buf_put_ready";
949 msg_panic(
"%s: read/write stream", myname);
964 }
else if (bp->
cnt <= 0) {
966 return (VSTREAM_EOF);
973 static int vstream_buf_space(
VBUF *bp, ssize_t want)
979 const char *myname =
"vstream_buf_space";
987 msg_panic(
"%s: read-only stream", myname);
989 msg_panic(
"%s: bad length %ld", myname, (
long) want);
1006 msg_panic(
"%s: read/write stream", myname);
1017 #define VSTREAM_TRUNCATE(count, base) (((count) / (base)) * (base))
1018 #define VSTREAM_ROUNDUP(count, base) VSTREAM_TRUNCATE(count + base - 1, base)
1022 if (want > bp->
cnt) {
1025 return (VSTREAM_EOF);
1026 if ((shortage = (want - bp->
cnt)) > 0) {
1032 vstream_buf_alloc(bp, bp->
len + incr);
1043 const char *myname =
"vstream_fpurge";
1046 #define VSTREAM_MAYBE_PURGE_WRITE(d, b) if ((d) & VSTREAM_PURGE_WRITE) \
1047 VSTREAM_BUF_AT_START((b))
1048 #define VSTREAM_MAYBE_PURGE_READ(d, b) if ((d) & VSTREAM_PURGE_READ) \
1049 VSTREAM_BUF_AT_END((b))
1079 msg_panic(
"%s: read/write stream", myname);
1095 const char *myname =
"vstream_fseek";
1106 if (whence == SEEK_CUR)
1107 offset += (bp->
ptr - bp->
data);
1108 else if (whence == SEEK_END)
1116 if (whence == SEEK_CUR)
1118 else if (whence == SEEK_END)
1125 msg_panic(
"%s: read/write stream", myname);
1145 if ((stream->
offset = lseek(stream->
fd, offset, whence)) < 0) {
1146 if (errno == ESPIPE)
1174 if ((stream->
offset = lseek(stream->
fd, (off_t) 0, SEEK_CUR)) < 0) {
1212 msg_panic(
"vstream_fdopen: bad file %d", fd);
1225 vstream_buf_init(&stream->
buf, flags);
1246 if ((fd = open(path, flags, mode)) < 0) {
1262 vstream_fflush_delayed(stream);
1278 if (stream->
pid != 0)
1279 msg_panic(
"vstream_fclose: stream has process");
1286 err |= close(stream->
read_fd);
1290 vstream_buf_wipe(&stream->
read_buf);
1294 if (stream->
fd >= 0)
1295 err |= close(stream->
fd);
1296 vstream_buf_wipe(&stream->
buf);
1304 return (err ? VSTREAM_EOF : 0);
1364 while ((ch = *str++) != 0)
1366 return (VSTREAM_EOF);
1374 const char *myname =
"vstream_control";
1378 ssize_t req_bufsize = 0;
1381 #define SWAP(type,a,b) do { type temp = (a); (a) = (b); (b) = (temp); } while (0)
1383 for (va_start(ap, name); name !=
VSTREAM_CTL_END; name = va_arg(ap,
int)) {
1392 stream->
context = va_arg(ap,
void *);
1413 msg_panic(
"VSTREAM_CTL_READ_FD requires double buffering");
1414 stream->
read_fd = va_arg(ap,
int);
1419 msg_panic(
"VSTREAM_CTL_WRITE_FD requires double buffering");
1420 stream->
write_fd = va_arg(ap,
int);
1424 stream2 = va_arg(ap,
VSTREAM *);
1427 msg_panic(
"VSTREAM_CTL_SWAP_FD can't swap descriptors between "
1428 "single-buffered and double-buffered streams");
1435 SWAP(
int, stream->
fd, stream2->
fd);
1440 GETTIMEOFDAY(&stream->
iotime);
1441 stream->
timeout = va_arg(ap,
int);
1446 if (stream->
jbuf == 0)
1451 #ifdef VSTREAM_CTL_DUPFD
1453 #define VSTREAM_TRY_DUPFD(backup, fd, floor) do { \
1454 if (((backup) = (fd)) < floor) { \
1455 if (((fd) = fcntl((backup), F_DUPFD, (floor))) < 0) \
1456 msg_fatal("fcntl F_DUPFD %d: %m", (floor)); \
1457 (void) close(backup); \
1461 case VSTREAM_CTL_DUPFD:
1462 floor = va_arg(ap,
int);
1464 VSTREAM_TRY_DUPFD(old_fd, stream->
read_fd, floor);
1468 VSTREAM_TRY_DUPFD(old_fd, stream->
write_fd, floor);
1472 VSTREAM_TRY_DUPFD(old_fd, stream->
fd, floor);
1481 req_bufsize = va_arg(ap, ssize_t);
1483 if (req_bufsize < 0 || req_bufsize > INT_MAX)
1484 msg_panic(
"unreasonable VSTREAM_CTL_BUFSIZE request: %ld",
1485 (
long) req_bufsize);
1489 msg_info(
"fd=%d: stream buffer size old=%ld new=%ld",
1492 (
long) req_bufsize);
1513 msg_panic(
"%s: bad name %d", myname, name);
1552 switch (command & ~VSTREAM_BST_MASK_DIR) {
1554 return (bp ? -bp->
cnt : 0);
1566 switch (command & ~VSTREAM_BST_MASK_DIR) {
1568 return (bp ? bp->
len - bp->
cnt : 0);
1573 msg_panic(
"vstream_bufstat: unknown command: %d", command);
1596 return ((
const char *) vp->
buf.
ptr);
1606 static void copy_line(ssize_t bufsize)
1620 static void printf_number(
void)
1629 int main(
int argc,
char **argv)
#define VSTREAM_FLAG_RD_TIMEOUT
#define VSTREAM_CTL_STOP_DEADLINE
struct timeval time_limit
#define VSTREAM_CTL_START_DEADLINE
#define VSTREAM_FLAG_WRITE
char * mystrdup(const char *str)
#define VSTREAM_FLAG_WR_ERR
NORETURN msg_panic(const char *fmt,...)
off_t vstream_ftell(VSTREAM *stream)
const char * vstream_peek_data(VSTREAM *vp)
void * myrealloc(void *ptr, ssize_t len)
VSTREAM * vstream_vfprintf(VSTREAM *vp, const char *format, va_list ap)
int main(int argc, char **argv)
#define VSTREAM_CTL_CONTEXT
#define VSTREAM_CAN_READ(f)
#define VSTREAM_CTL_READ_FD
#define VSTREAM_SUB_TIME(x, y, z)
#define VSTREAM_CTL_SWAP_FD
#define VSTREAM_BUF_AT_START(bp)
ssize_t vstream_peek(VSTREAM *vp)
#define VSTREAM_FLAG_WRITE_DOUBLE
VSTREAM * vstream_fopen(const char *path, int flags, mode_t mode)
#define VSTREAM_FLAG_DOUBLE
#define VSTREAM_FLAG_RD_ERR
#define VSTREAM_BUF_ACTIONS(bp, get_action, put_action, space_action)
#define VSTREAM_BST_MASK_DIR
#define CA_VSTREAM_CTL_BUFSIZE(v)
VSTREAM * vstream_fprintf(VSTREAM *stream, const char *fmt,...)
#define VSTREAM_CTL_BUFSIZE
#define VBUF_TO_APPL(vbuf_ptr, app_type, vbuf_member)
#define VSTREAM_CTL_READ_FN
#define VSTREAM_FFLUSH_SOME(stream)
int vstream_fclose(VSTREAM *stream)
#define VSTREAM_FLAG_READ_DOUBLE
VSTREAM * vstream_printf(const char *fmt,...)
#define VSTREAM_FLAG_FIXED
ssize_t timed_read(int, void *, size_t, int, void *)
ssize_t(* VSTREAM_RW_FN)(int, void *, size_t, int, void *)
#define VSTREAM_MAYBE_PURGE_READ(d, b)
#define VSTREAM_FORK_STATE(stream, buffer, filedes)
#define VSTREAM_TRUNCATE(count, base)
#define VSTREAM_FLAG_DEADLINE
#define VSTREAM_FLAG_WR_TIMEOUT
#define VSTREAM_BUF_ZERO(bp)
off_t vstream_fseek(VSTREAM *stream, off_t offset, int whence)
int vstream_fflush(VSTREAM *stream)
ssize_t timed_write(int, const void *, size_t, int, void *)
#define VSTREAM_MAYBE_PURGE_WRITE(d, b)
VBUF_PUT_READY_FN put_ready
VBUF * vbuf_print(VBUF *bp, const char *format, va_list ap)
#define VSTREAM_FLAG_READ
#define VSTREAM_CTL_TIMEOUT
#define VSTREAM_BST_FLAG_OUT
#define VSTREAM_SAVE_STATE(stream, buffer, filedes)
#define VSTREAM_CTL_WRITE_FD
VSTREAM_WAITPID_FN waitpid_fn
int vstream_fdclose(VSTREAM *stream)
#define VSTREAM_BST_FLAG_IN
#define VSTREAM_ROUNDUP(count, base)
#define VSTREAM_STATIC(v)
#define vstream_fileno(vp)
#define VSTREAM_BST_FLAG_PEND
#define VSTREAM_CAN_WRITE(f)
#define VSTREAM_PUTC(ch, vp)
VSTREAM * vstream_vprintf(const char *format, va_list ap)
void vstream_control(VSTREAM *stream, int name,...)
ssize_t vstream_bufstat(VSTREAM *vp, int command)
int vstream_fpurge(VSTREAM *stream, int direction)
#define vstream_ferror(vp)
#define VSTREAM_CTL_EXCEPT
#define VSTREAM_FLAG_SEEK
#define VSTREAM_BUF_AT_OFFSET(bp, offset)
#define VSTREAM_CTL_DOUBLE
#define VSTREAM_CTL_WRITE_FN
ssize_t write_buf(int, const char *, ssize_t, int)
VSTREAM * vstream_fdopen(int fd, int flags)
#define VSTREAM_RESTORE_STATE(stream, buffer, filedes)
#define VSTREAM_BUF_AT_END(bp)
void * mymalloc(ssize_t len)
int vstream_fputs(const char *str, VSTREAM *stream)
#define VSTREAM_FLAG_NSEEK
void msg_info(const char *fmt,...)