[ruby-dev:33843] IO.copy_stream
From:
Tanaka Akira <akr@...>
Date:
2008-02-18 12:52:22 UTC
List:
ruby-dev #33843
IO.copy_stream をつけるのはどうでしょうか。
IO.copy_stream(src, dst) で src から dst にコピーします。
src, dst はファイル名か IO オブジェクトです。
たとえば、IO.copy_stream(STDIN, STDOUT) のように使えます。
この操作はファイルのコピーとか Web サーバでファイルをソケッ
トに送り込むとかけっこうやることがあるのですが、Ruby 本体と
してはすぐに使えるメソッドは提供しておらず、
FileUtils.copy_stream を使うなどの必要があります。
FileUtils.copy_stream に比べて IO.copy_stream の利点には以下
のものがあります。
* コピー中 global interpreter lock を外せる
コピーをしている最中は Ruby のオブジェクトに触る必要がない
ので、GIL を外せて、それによりそのあいだ他のスレッドが動け
ます
* システムと状況によっては sendfile を使える
これにより、コピーをカーネルに任せっ切りにできて、コンテキ
ストスイッチやカーネル空間とユーザ空間の間のコピーを削減で
きます。
なお、sendfile は現時点では GNU/Linux のものしか扱っていま
せん。(Apache Portable Runtime を調べてみると、けっこうい
ろいろなシステムがサポートしているようですが。) あと、src
がファイルで dst がソケットであるという条件があります。
また、省略可能引数が 2つあって、コピーする長さと src におけ
るコピー開始位置を指定できます。
IO.copy_stream(src, dst, copy_length, src_offset)
src が (ファイル名でなく) IO で、かつ、src_offset を指定した
時には src のファイルオフセットは動かしません。sendfile がな
い場合、これは SUS で定義されている pread で実現します。
pread もなければいまのところエラーです。エラーじゃなくて
lseek でファイルオフセットを戻してもいいかもしれませんが。
Index: configure.in
===================================================================
--- configure.in (revision 15543)
+++ configure.in (working copy)
@@ -582,7 +582,7 @@ AC_CHECK_HEADERS(stdlib.h string.h unist
fcntl.h sys/fcntl.h sys/select.h sys/time.h sys/times.h sys/param.h\
syscall.h pwd.h grp.h a.out.h utime.h memory.h direct.h sys/resource.h \
sys/mkdev.h sys/utime.h xti.h netinet/in_systm.h float.h ieeefp.h pthread.h \
- ucontext.h intrinsics.h langinfo.h locale.h)
+ ucontext.h intrinsics.h langinfo.h locale.h sys/sendfile.h)
dnl Check additional types.
AC_CHECK_SIZEOF(rlim_t, 0, [
@@ -661,7 +661,8 @@ AC_CHECK_FUNCS(fmod killpg wait4 waitpid
dlopen sigprocmask sigaction sigsetjmp _setjmp vsnprintf snprintf\
setsid telldir seekdir fchmod cosh sinh tanh log2 round\
setuid setgid daemon select_large_fdset setenv unsetenv\
- mktime timegm clock_gettime gettimeofday)
+ mktime timegm clock_gettime gettimeofday\
+ pread sendfile)
AC_ARG_ENABLE(setreuid,
[ --enable-setreuid use setreuid()/setregid() according to need even if obsolete.],
[use_setreuid=$enableval])
Index: io.c
===================================================================
--- io.c (revision 15543)
+++ io.c (working copy)
@@ -6011,6 +6011,480 @@ rb_io_s_read(int argc, VALUE *argv, VALU
return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io);
}
+struct copy_stream_struct {
+ VALUE src;
+ VALUE dst;
+ int src_fd;
+ int dst_fd;
+ off_t copy_length;
+ off_t src_offset;
+ int close_src;
+ int close_dst;
+ off_t total;
+ char *syserr;
+ int error_no;
+ char *notimp;
+ rb_fdset_t fds;
+};
+
+static void
+copy_stream_rbuf_to_dst(struct copy_stream_struct *stp,
+ rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path)
+{
+ ssize_t r;
+ int len;
+retry:
+ len = src_fptr->rbuf_len;
+ if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+ len = stp->copy_length;
+ }
+ if (len == 0)
+ return;
+ r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len);
+ if (len == r) {
+ src_fptr->rbuf_len -= len;
+ if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0;
+ if (stp->copy_length != (off_t)-1) stp->copy_length -= len;
+ stp->total += len;
+ return;
+ }
+ else if (0 <= r) {
+ src_fptr->rbuf_off += r;
+ src_fptr->rbuf_len -= r;
+ if (stp->copy_length != (off_t)-1) stp->copy_length -= r;
+ stp->total += r;
+ errno = EAGAIN;
+ }
+ if (rb_io_wait_writable(dst_fptr->fd)) {
+ rb_io_check_closed(dst_fptr);
+ if (src_fptr->rbuf_len)
+ goto retry;
+ }
+ rb_sys_fail(dst_path);
+}
+
+static int
+copy_stream_wait_read(struct copy_stream_struct *stp)
+{
+ int ret;
+retry:
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->src_fd, &stp->fds);
+ ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto retry;
+ stp->syserr = "select";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 0;
+}
+
+static int
+copy_stream_wait_write(struct copy_stream_struct *stp)
+{
+ int ret;
+retry:
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->dst_fd, &stp->fds);
+ ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+ if (ret == -1) {
+ if (errno == EINTR)
+ goto retry;
+ stp->syserr = "select";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef HAVE_SENDFILE
+
+#ifdef __linux__
+#define USE_SENDFILE
+
+#ifdef HAVE_SYS_SENDFILE_H
+#include <sys/sendfile.h>
+#endif
+
+static ssize_t
+simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
+{
+ return sendfile(out_fd, in_fd, offset, count);
+}
+
+#endif
+
+#endif
+
+#ifdef USE_SENDFILE
+static int
+copy_stream_sendfile(struct copy_stream_struct *stp)
+{
+ struct stat src_stat, dst_stat;
+ ssize_t ss;
+ int ret;
+
+ off_t copy_length;
+ off_t src_offset;
+ int use_pread;
+
+ ret = fstat(stp->src_fd, &src_stat);
+ if (ret == -1) {
+ stp->syserr = "fstat";
+ stp->error_no = errno;
+ return -1;
+ }
+ if (!S_ISREG(src_stat.st_mode))
+ return 0;
+
+ ret = fstat(stp->dst_fd, &dst_stat);
+ if (ret == -1) {
+ stp->syserr = "fstat";
+ stp->error_no = errno;
+ return -1;
+ }
+ if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK)
+ return 0;
+
+ src_offset = stp->src_offset;
+ use_pread = src_offset != (off_t)-1;
+
+ copy_length = stp->copy_length;
+ if (copy_length == (off_t)-1) {
+ if (use_pread)
+ copy_length = src_stat.st_size - src_offset;
+ else {
+ off_t cur = lseek(stp->src_fd, 0, SEEK_CUR);
+ if (cur == (off_t)-1) {
+ stp->syserr = "lseek";
+ stp->error_no = errno;
+ return -1;
+ }
+ copy_length = src_stat.st_size - cur;
+ }
+ }
+
+retry_sendfile:
+ if (use_pread) {
+ ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length);
+ }
+ else {
+ ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length);
+ }
+ if (0 < ss) {
+ stp->total += ss;
+ copy_length -= ss;
+ if (0 < copy_length) {
+ ss = -1;
+ errno = EAGAIN;
+ }
+ }
+ if (ss == -1) {
+ if (errno == EINVAL || errno == ENOSYS)
+ return 0;
+ if (errno == EINTR)
+ goto retry_sendfile;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_write(stp) == -1)
+ return -1;
+ goto retry_sendfile;
+ }
+ stp->syserr = "sendfile";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 1;
+}
+#endif
+
+static ssize_t
+copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset)
+{
+ ssize_t ss;
+retry_read:
+ if (offset == (off_t)-1)
+ ss = read(stp->src_fd, buf, len);
+ else {
+#ifdef HAVE_PREAD
+ ss = pread(stp->src_fd, buf, len, offset);
+#else
+ stp->notimp = "pread";
+ return -1;
+#endif
+ }
+ if (ss == 0) {
+ return 0;
+ }
+ if (ss == -1) {
+ if (errno == EINTR)
+ goto retry_read;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_read(stp) == -1)
+ return -1;
+ goto retry_read;
+ }
+ if (errno == ENOSYS) {
+ stp->notimp = "pread";
+ return -1;
+ }
+ stp->syserr = offset == (off_t)-1 ? "read" : "pread";
+ stp->error_no = errno;
+ return -1;
+ }
+ return ss;
+}
+
+static int
+copy_stream_write(struct copy_stream_struct *stp, char *buf, int len)
+{
+ ssize_t ss;
+ int off = 0;
+ while (len) {
+ ss = write(stp->dst_fd, buf+off, len);
+ if (ss == -1) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_write(stp) == -1)
+ return -1;
+ continue;
+ }
+ stp->syserr = "write";
+ stp->error_no = errno;
+ return -1;
+ }
+ off += ss;
+ len -= ss;
+ stp->total += ss;
+ }
+ return 0;
+}
+
+static void
+copy_stream_read_write(struct copy_stream_struct *stp)
+{
+ char buf[1024*16];
+ int len;
+ ssize_t ss;
+ int ret;
+ off_t copy_length;
+ int use_eof;
+ off_t src_offset;
+ int use_pread;
+
+ copy_length = stp->copy_length;
+ use_eof = copy_length == (off_t)-1;
+ src_offset = stp->src_offset;
+ use_pread = src_offset != (off_t)-1;
+
+ if (use_pread && stp->close_src) {
+ off_t r;
+ r = lseek(stp->src_fd, src_offset, SEEK_SET);
+ if (r == (off_t)-1) {
+ stp->syserr = "lseek";
+ stp->error_no = errno;
+ return;
+ }
+ src_offset = (off_t)-1;
+ use_pread = 0;
+ }
+
+ while (use_eof || 0 < copy_length) {
+ if (!use_eof && copy_length < sizeof(buf)) {
+ len = copy_length;
+ }
+ else {
+ len = sizeof(buf);
+ }
+ if (use_pread) {
+ ss = copy_stream_read(stp, buf, len, src_offset);
+ if (0 < ss)
+ src_offset += ss;
+ }
+ else {
+ ss = copy_stream_read(stp, buf, len, (off_t)-1);
+ }
+ if (ss <= 0) /* EOF or error */
+ return;
+
+ ret = copy_stream_write(stp, buf, ss);
+ if (ret < 0)
+ return;
+
+ if (!use_eof)
+ copy_length -= ss;
+ }
+}
+
+static VALUE
+copy_stream_func(void *arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ int ret;
+
+#ifdef USE_SENDFILE
+ ret = copy_stream_sendfile(stp);
+ if (ret != 0)
+ goto finish; /* error or success */
+#endif
+
+ copy_stream_read_write(stp);
+
+finish:
+ return Qnil;
+}
+
+static VALUE
+copy_stream_body(VALUE arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ VALUE src_io, dst_io;
+ rb_io_t *src_fptr, *dst_fptr;
+ int src_fd, dst_fd;
+ char *src_path = 0, *dst_path = 0;
+
+ src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io");
+ if (!NIL_P(src_io)) {
+ GetOpenFile(src_io, src_fptr);
+ src_fd = src_fptr->fd;
+ }
+ else {
+ src_fptr = 0;
+ FilePathValue(stp->src);
+ src_path = StringValueCStr(stp->src);
+ src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0);
+ if (src_fd == -1) { rb_sys_fail(src_path); }
+ stp->close_src = 1;
+ }
+ stp->src_fd = src_fd;
+
+ dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io");
+ if (!NIL_P(dst_io)) {
+ dst_io = GetWriteIO(dst_io);
+ GetOpenFile(dst_io, dst_fptr);
+ dst_fd = dst_fptr->fd;
+ }
+ else {
+ dst_fptr = 0;
+ FilePathValue(stp->dst);
+ dst_path = StringValueCStr(stp->dst);
+ dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600);
+ if (dst_fd == -1) { rb_sys_fail(dst_path); }
+ stp->close_dst = 1;
+ }
+ stp->dst_fd = dst_fd;
+
+ stp->total = 0;
+
+ if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) {
+ long len = src_fptr->rbuf_len;
+ VALUE str;
+ if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+ len = stp->copy_length;
+ }
+ str = rb_str_buf_new(len);
+ rb_str_resize(str,len);
+ read_buffered_data(RSTRING_PTR(str), len, src_fptr);
+ io_fwrite(str, dst_fptr);
+ stp->total += len;
+ if (stp->copy_length != (off_t)-1)
+ stp->copy_length -= len;
+ }
+
+ if (dst_fptr && io_fflush(dst_fptr) < 0) {
+ rb_raise(rb_eIOError, "flush failed");
+ }
+
+ if (src_fptr) {
+ copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path);
+ }
+
+ if (stp->copy_length == 0)
+ return Qnil;
+
+ rb_fd_init(&stp->fds);
+ rb_fd_set(src_fd, &stp->fds);
+ rb_fd_set(dst_fd, &stp->fds);
+
+ return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0);
+}
+
+static VALUE
+copy_stream_finalize(VALUE arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ if (stp->close_src)
+ close(stp->src_fd);
+ if (stp->close_dst)
+ close(stp->dst_fd);
+ rb_fd_term(&stp->fds);
+ if (stp->syserr) {
+ errno = stp->error_no;
+ rb_sys_fail(stp->syserr);
+ }
+ if (stp->notimp) {
+ rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp);
+ }
+ return Qnil;
+}
+
+/*
+ * call-seq:
+ * IO.copy_stream(src, dst)
+ * IO.copy_stream(src, dst, copy_length)
+ * IO.copy_stream(src, dst, copy_length, src_offset)
+ *
+ * IO.copy_stream copies <i>src</i> to <i>dst</i>.
+ * <i>src</i> and <i>dst</i> is either a filename or an IO.
+ *
+ * This method returns the number of bytes copied.
+ *
+ * If optional arguments are not given,
+ * the start position of the copy is
+ * the beginning of the filename or
+ * the current file offset of the IO.
+ * The end position of the copy is the end of file.
+ *
+ * If <i>copy_length</i> is given,
+ * No more than <i>copy_length</i> bytes are copied.
+ *
+ * If <i>src_offset</i> is given,
+ * it specifies the start position of the copy.
+ *
+ * When <i>src_offset</i> is specified and
+ * <i>src</i> is an IO,
+ * IO.copy_stream doesn't move the current file offset.
+ *
+ */
+static VALUE
+rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io)
+{
+ VALUE src, dst, length, src_offset;
+ struct copy_stream_struct st;
+
+ MEMZERO(&st, struct copy_stream_struct, 1);
+
+ rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset);
+
+ if (NIL_P(length))
+ st.copy_length = (off_t)-1;
+ else
+ st.copy_length = NUM2OFFT(length);
+
+ if (NIL_P(src_offset))
+ st.src_offset = (off_t)-1;
+ else
+ st.src_offset = NUM2OFFT(src_offset);
+
+ st.src = src;
+ st.dst = dst;
+
+ rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st);
+
+ return OFFT2NUM(st.total);
+}
/*
* call-seq:
@@ -6595,6 +7069,7 @@ Init_IO(void)
rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1);
rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1);
rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1);
+ rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1);
rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);
Index: thread.c
===================================================================
--- thread.c (revision 15543)
+++ thread.c (working copy)
@@ -1702,6 +1702,25 @@ rb_fd_copy(rb_fdset_t *dst, const fd_set
memcpy(dst->fdset, src, size);
}
+int
+rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
+{
+ fd_set *r = NULL, *w = NULL, *e = NULL;
+ if (readfds) {
+ rb_fd_resize(n - 1, readfds);
+ r = rb_fd_ptr(readfds);
+ }
+ if (writefds) {
+ rb_fd_resize(n - 1, writefds);
+ w = rb_fd_ptr(writefds);
+ }
+ if (exceptfds) {
+ rb_fd_resize(n - 1, exceptfds);
+ e = rb_fd_ptr(exceptfds);
+ }
+ return select(n, r, w, e, timeout);
+}
+
#undef FD_ZERO
#undef FD_SET
#undef FD_CLR
Index: test/ruby/test_io.rb
===================================================================
--- test/ruby/test_io.rb (revision 15543)
+++ test/ruby/test_io.rb (working copy)
@@ -1,4 +1,7 @@
require 'test/unit'
+require 'tmpdir'
+require 'io/nonblock'
+require 'socket'
class TestIO < Test::Unit::TestCase
def test_gets_rs
@@ -55,4 +58,284 @@ class TestIO < Test::Unit::TestCase
ensure
r.close
end
+
+ def with_pipe
+ r, w = IO.pipe
+ begin
+ yield r, w
+ ensure
+ r.close unless r.closed?
+ w.close unless w.closed?
+ end
+ end
+
+ def with_read_pipe(content)
+ r, w = IO.pipe
+ w << content
+ w.close
+ begin
+ yield r
+ ensure
+ r.close
+ end
+ end
+
+ def test_copy_stream
+ Dir.mktmpdir {|d|
+ Dir.chdir d
+
+ content = "foobar"
+ File.open("src", "w") {|f| f << content }
+ IO.copy_stream("src", "dst")
+ assert_equal(content, File.read("dst"))
+
+ # overwrite by smaller file.
+ content = "baz"
+ File.open("src", "w") {|f| f << content }
+ IO.copy_stream("src", "dst")
+ assert_equal(content, File.read("dst"))
+
+ IO.copy_stream("src", "dst", 2)
+ assert_equal(content[0,2], File.read("dst"))
+
+ IO.copy_stream("src", "dst", 0)
+ assert_equal("", File.read("dst"))
+
+ IO.copy_stream("src", "dst", nil, 1)
+ assert_equal(content[1..-1], File.read("dst"))
+
+ assert_raise(Errno::ENOENT) {
+ IO.copy_stream("nodir/foo", "dst")
+ }
+
+ assert_raise(Errno::ENOENT) {
+ IO.copy_stream("src", "nodir/bar")
+ }
+
+ with_pipe {|r, w|
+ IO.copy_stream("src", w)
+ w.close
+ assert_equal(content, r.read)
+ }
+
+ with_pipe {|r, w|
+ w.close
+ assert_raise(IOError) { IO.copy_stream("src", w) }
+ }
+
+ pipe_content = "abc"
+ with_read_pipe(pipe_content) {|r|
+ IO.copy_stream(r, "dst")
+ assert_equal(pipe_content, File.read("dst"))
+ }
+
+ with_read_pipe("abc") {|r1|
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ w2.sync = false
+ w2 << "def"
+ IO.copy_stream(r1, w2)
+ w2.close
+ assert_equal("defbc", r2.read)
+ }
+ }
+
+ with_read_pipe("abc") {|r1|
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ w2.sync = false
+ w2 << "def"
+ IO.copy_stream(r1, w2, 1)
+ w2.close
+ assert_equal("defb", r2.read)
+ }
+ }
+
+ with_read_pipe("abc") {|r1|
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ IO.copy_stream(r1, w2)
+ w2.close
+ assert_equal("bc", r2.read)
+ }
+ }
+
+ with_read_pipe("abc") {|r1|
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ IO.copy_stream(r1, w2, 1)
+ w2.close
+ assert_equal("b", r2.read)
+ }
+ }
+
+ with_read_pipe("abc") {|r1|
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ IO.copy_stream(r1, w2, 0)
+ w2.close
+ assert_equal("", r2.read)
+ }
+ }
+
+ with_pipe {|r1, w1|
+ w1 << "abc"
+ assert_equal("a", r1.getc)
+ with_pipe {|r2, w2|
+ w1 << "def"
+ w1.close
+ IO.copy_stream(r1, w2)
+ w2.close
+ assert_equal("bcdef", r2.read)
+ }
+ }
+
+ with_pipe {|r, w|
+ IO.copy_stream("src", w, 1, 1)
+ w.close
+ assert_equal(content[1,1], r.read)
+ }
+
+ bigcontent = "abc" * 123456
+ File.open("bigsrc", "w") {|f| f << bigcontent }
+ IO.copy_stream("bigsrc", "bigdst")
+ assert_equal(bigcontent, File.read("bigdst"))
+
+ File.unlink("bigdst")
+ IO.copy_stream("bigsrc", "bigdst", nil, 100)
+ assert_equal(bigcontent[100..-1], File.read("bigdst"))
+
+ File.unlink("bigdst")
+ IO.copy_stream("bigsrc", "bigdst", 30000, 100)
+ assert_equal(bigcontent[100, 30000], File.read("bigdst"))
+
+ File.open("bigsrc") {|f|
+ assert_equal(0, f.pos)
+ IO.copy_stream(f, "bigdst", nil, 10)
+ assert_equal(bigcontent[10..-1], File.read("bigdst"))
+ assert_equal(0, f.pos)
+ IO.copy_stream(f, "bigdst", 40, 30)
+ assert_equal(bigcontent[30, 40], File.read("bigdst"))
+ assert_equal(0, f.pos)
+ }
+
+ with_pipe {|r, w|
+ w.close
+ assert_raise(IOError) { IO.copy_stream("src", w) }
+ }
+ }
+ end
+
+ def with_socketpair
+ s1, s2 = UNIXSocket.pair
+ begin
+ yield s1, s2
+ ensure
+ s1.close unless s1.closed?
+ s2.close unless s2.closed?
+ end
+ end
+
+ def test_copy_stream_socket
+ Dir.mktmpdir {|d|
+ Dir.chdir d
+
+ content = "foobar"
+ File.open("src", "w") {|f| f << content }
+
+ with_socketpair {|s1, s2|
+ IO.copy_stream("src", s1)
+ s1.close
+ assert_equal(content, s2.read)
+ }
+
+ bigcontent = "abc" * 123456
+ File.open("bigsrc", "w") {|f| f << bigcontent }
+
+ with_socketpair {|s1, s2|
+ t = Thread.new { s2.read }
+ IO.copy_stream("bigsrc", s1)
+ s1.close
+ result = t.value
+ assert_equal(bigcontent, result)
+ }
+
+ with_socketpair {|s1, s2|
+ t = Thread.new { s2.read }
+ IO.copy_stream("bigsrc", s1, 10000)
+ s1.close
+ result = t.value
+ assert_equal(bigcontent[0,10000], result)
+ }
+
+ File.open("bigsrc") {|f|
+ assert_equal(0, f.pos)
+ with_socketpair {|s1, s2|
+ t = Thread.new { s2.read }
+ IO.copy_stream(f, s1, nil, 100)
+ assert_equal(0, f.pos)
+ s1.close
+ result = t.value
+ assert_equal(bigcontent[100..-1], result)
+ }
+ }
+
+ File.open("bigsrc") {|f|
+ assert_equal(bigcontent[0,100], f.read(100))
+ assert_equal(100, f.pos)
+ with_socketpair {|s1, s2|
+ t = Thread.new { s2.read }
+ IO.copy_stream(f, s1)
+ assert_equal(bigcontent.length, f.pos)
+ s1.close
+ result = t.value
+ assert_equal(bigcontent[100..-1], result)
+ }
+ }
+
+ megacontent = "abc" * 1234567
+ File.open("megasrc", "w") {|f| f << megacontent }
+
+ with_socketpair {|s1, s2|
+ t = Thread.new { s2.read }
+ s1.nonblock = true
+ IO.copy_stream("megasrc", s1)
+ s1.close
+ result = t.value
+ assert_equal(megacontent, result)
+ }
+
+ with_pipe {|r1, w1|
+ with_pipe {|r2, w2|
+ t1 = Thread.new { w1 << megacontent; w1.close }
+ t2 = Thread.new { r2.read }
+ r1.nonblock = true
+ w2.nonblock = true
+ IO.copy_stream(r1, w2)
+ w2.close
+ t1.join
+ assert_equal(megacontent, t2.value)
+ }
+ }
+
+ with_pipe {|r1, w1|
+ with_pipe {|r2, w2|
+ t1 = Thread.new { w1 << megacontent; w1.close }
+ t2 = Thread.new { r2.read }
+ IO.copy_stream(r1, w2)
+ w2.close
+ t1.join
+ assert_equal(megacontent, t2.value)
+ }
+ }
+
+ with_pipe {|r, w|
+ t = Thread.new { r.read }
+ IO.copy_stream("megasrc", w)
+ w.close
+ assert_equal(megacontent, t.value)
+ }
+
+ }
+ end
end
--
[田中 哲][たなか あきら][Tanaka Akira]