summaryrefslogtreecommitdiff
path: root/test/test_examples.py
diff options
context:
space:
mode:
authorPaul Lawrence <paullawrence@google.com>2019-08-02 16:07:45 -0700
committerandroid-build-merger <android-build-merger@google.com>2019-08-02 16:07:45 -0700
commitae757e65a913f0bcdebbea4cabc83fefe2079a7e (patch)
tree0dd2751cc9e8688b3486192bf2eb8151aa4e8013 /test/test_examples.py
parent3fb189c2fc0eb6c5cbe0fc2b99679eceb1ed0e31 (diff)
parent9b55afda075f8202fe93bc1873373a60165e38d5 (diff)
Merge remote-tracking branch 'origin/upstream-master'
am: 9b55afda07 Change-Id: I9f4a778e138589fcab2330147635e8d6c01c6e37
Diffstat (limited to 'test/test_examples.py')
-rwxr-xr-xtest/test_examples.py719
1 files changed, 719 insertions, 0 deletions
diff --git a/test/test_examples.py b/test/test_examples.py
new file mode 100755
index 0000000..5d6dacc
--- /dev/null
+++ b/test/test_examples.py
@@ -0,0 +1,719 @@
+#!/usr/bin/env python3
+
+if __name__ == '__main__':
+ import pytest
+ import sys
+ sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+import sys
+import py
+import pytest
+import stat
+import shutil
+import filecmp
+import tempfile
+import time
+import errno
+import sys
+from tempfile import NamedTemporaryFile
+from contextlib import contextmanager
+from util import (wait_for_mount, umount, cleanup, base_cmdline,
+ safe_sleep, basename, fuse_test_marker, test_printcap,
+ fuse_proto, powerset)
+from os.path import join as pjoin
+
+pytestmark = fuse_test_marker()
+
+TEST_FILE = __file__
+
+with open(TEST_FILE, 'rb') as fh:
+ TEST_DATA = fh.read()
+
+def name_generator(__ctr=[0]):
+ __ctr[0] += 1
+ return 'testfile_%d' % __ctr[0]
+
+options = []
+if sys.platform == 'linux':
+ options.append('clone_fd')
+
+def invoke_directly(mnt_dir, name, options):
+ cmdline = base_cmdline + [ pjoin(basename, 'example', name),
+ '-f', mnt_dir, '-o', ','.join(options) ]
+ if name == 'hello_ll':
+ # supports single-threading only
+ cmdline.append('-s')
+
+ return cmdline
+
+def invoke_mount_fuse(mnt_dir, name, options):
+ return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'),
+ name, mnt_dir, '-o', ','.join(options) ]
+
+def invoke_mount_fuse_drop_privileges(mnt_dir, name, options):
+ if os.getuid() != 0:
+ pytest.skip('drop_privileges requires root, skipping.')
+
+ return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',))
+
+class raii_tmpdir:
+ def __init__(self):
+ self.d = tempfile.mkdtemp()
+
+ def __str__(self):
+ return str(self.d)
+
+ def mkdir(self, path):
+ return py.path.local(str(self.d)).mkdir(path)
+
+@pytest.fixture
+def short_tmpdir():
+ return raii_tmpdir()
+
+@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse,
+ invoke_mount_fuse_drop_privileges))
+@pytest.mark.parametrize("options", powerset(options))
+@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
+def test_hello(tmpdir, name, options, cmdline_builder, output_checker):
+ mnt_dir = str(tmpdir)
+ mount_process = subprocess.Popen(
+ cmdline_builder(mnt_dir, name, options),
+ stdout=output_checker.fd, stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ assert os.listdir(mnt_dir) == [ 'hello' ]
+ filename = pjoin(mnt_dir, 'hello')
+ with open(filename, 'r') as fh:
+ assert fh.read() == 'Hello World!\n'
+ with pytest.raises(IOError) as exc_info:
+ open(filename, 'r+')
+ assert exc_info.value.errno == errno.EACCES
+ with pytest.raises(IOError) as exc_info:
+ open(filename + 'does-not-exist', 'r+')
+ assert exc_info.value.errno == errno.ENOENT
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+@pytest.mark.parametrize("writeback", (False, True))
+@pytest.mark.parametrize("name", ('passthrough', 'passthrough_fh', 'passthrough_ll'))
+@pytest.mark.parametrize("debug", (False, True))
+def test_passthrough(short_tmpdir, name, debug, output_checker, writeback):
+ # Avoid false positives from libfuse debug messages
+ if debug:
+ output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
+ count=0)
+
+ # test_syscalls prints "No error" under FreeBSD
+ output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]",
+ count=0)
+
+ mnt_dir = str(short_tmpdir.mkdir('mnt'))
+ src_dir = str(short_tmpdir.mkdir('src'))
+
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', name),
+ '-f', mnt_dir ]
+ if debug:
+ cmdline.append('-d')
+
+ if writeback:
+ if name != 'passthrough_ll':
+ pytest.skip('example does not support writeback caching')
+ cmdline.append('-o')
+ cmdline.append('writeback')
+
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ work_dir = mnt_dir + src_dir
+
+ tst_statvfs(work_dir)
+ tst_readdir(src_dir, work_dir)
+ tst_readdir_big(src_dir, work_dir)
+ tst_open_read(src_dir, work_dir)
+ tst_open_write(src_dir, work_dir)
+ tst_create(work_dir)
+ tst_passthrough(src_dir, work_dir)
+ tst_append(src_dir, work_dir)
+ tst_seek(src_dir, work_dir)
+ tst_mkdir(work_dir)
+ tst_rmdir(src_dir, work_dir)
+ tst_unlink(src_dir, work_dir)
+ tst_symlink(work_dir)
+ if os.getuid() == 0:
+ tst_chown(work_dir)
+
+ # Underlying fs may not have full nanosecond resolution
+ tst_utimens(work_dir, ns_tol=1000)
+
+ tst_link(work_dir)
+ tst_truncate_path(work_dir)
+ tst_truncate_fd(work_dir)
+ tst_open_unlink(work_dir)
+
+ syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
+ work_dir, ':' + src_dir ]
+ if writeback:
+ # When writeback caching is enabled, kernel has to open files for
+ # reading even when userspace opens with O_WDONLY. This fails if the
+ # filesystem process doesn't have special permission.
+ syscall_test_cmd.append('-53')
+ subprocess.check_call(syscall_test_cmd)
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+@pytest.mark.parametrize("cache", (False, True))
+def test_passthrough_hp(short_tmpdir, cache, output_checker):
+ mnt_dir = str(short_tmpdir.mkdir('mnt'))
+ src_dir = str(short_tmpdir.mkdir('src'))
+
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'passthrough_hp'),
+ src_dir, mnt_dir ]
+
+ if not cache:
+ cmdline.append('--nocache')
+
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+
+ tst_statvfs(mnt_dir)
+ tst_readdir(src_dir, mnt_dir)
+ tst_readdir_big(src_dir, mnt_dir)
+ tst_open_read(src_dir, mnt_dir)
+ tst_open_write(src_dir, mnt_dir)
+ tst_create(mnt_dir)
+ if not cache:
+ tst_passthrough(src_dir, mnt_dir)
+ tst_append(src_dir, mnt_dir)
+ tst_seek(src_dir, mnt_dir)
+ tst_mkdir(mnt_dir)
+ tst_rmdir(src_dir, mnt_dir)
+ tst_unlink(src_dir, mnt_dir)
+ tst_symlink(mnt_dir)
+ if os.getuid() == 0:
+ tst_chown(mnt_dir)
+
+ # Underlying fs may not have full nanosecond resolution
+ tst_utimens(mnt_dir, ns_tol=1000)
+
+ tst_link(mnt_dir)
+ tst_truncate_path(mnt_dir)
+ tst_truncate_fd(mnt_dir)
+ tst_open_unlink(mnt_dir)
+
+ # test_syscalls assumes that changes in source directory
+ # will be reflected immediately in mountpoint, so we
+ # can't use it.
+ if not cache:
+ syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
+ mnt_dir, ':' + src_dir ]
+ subprocess.check_call(syscall_test_cmd)
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+
+@pytest.mark.skipif(fuse_proto < (7,11),
+ reason='not supported by running kernel')
+def test_ioctl(tmpdir, output_checker):
+ progname = pjoin(basename, 'example', 'ioctl')
+ if not os.path.exists(progname):
+ pytest.skip('%s not built' % os.path.basename(progname))
+
+ mnt_dir = str(tmpdir)
+ testfile = pjoin(mnt_dir, 'fioc')
+ cmdline = base_cmdline + [progname, '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'ioctl_client'),
+ testfile ]
+ assert subprocess.check_output(cmdline) == b'0\n'
+ with open(testfile, 'wb') as fh:
+ fh.write(b'foobar')
+ assert subprocess.check_output(cmdline) == b'6\n'
+ subprocess.check_call(cmdline + [ '3' ])
+ with open(testfile, 'rb') as fh:
+ assert fh.read()== b'foo'
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def test_poll(tmpdir, output_checker):
+ mnt_dir = str(tmpdir)
+ cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
+ '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'poll_client') ]
+ subprocess.check_call(cmdline, cwd=mnt_dir)
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def test_null(tmpdir, output_checker):
+ progname = pjoin(basename, 'example', 'null')
+ if not os.path.exists(progname):
+ pytest.skip('%s not built' % os.path.basename(progname))
+
+ mnt_file = str(tmpdir) + '/file'
+ with open(mnt_file, 'w') as fh:
+ fh.write('dummy')
+ cmdline = base_cmdline + [ progname, '-f', mnt_file ]
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ def test_fn(name):
+ return os.stat(name).st_size > 4000
+ try:
+ wait_for_mount(mount_process, mnt_file, test_fn)
+ with open(mnt_file, 'rb') as fh:
+ assert fh.read(382) == b'\0' * 382
+ with open(mnt_file, 'wb') as fh:
+ fh.write(b'whatever')
+ except:
+ cleanup(mount_process, mnt_file)
+ raise
+ else:
+ umount(mount_process, mnt_file)
+
+
+@pytest.mark.skipif(fuse_proto < (7,12),
+ reason='not supported by running kernel')
+@pytest.mark.parametrize("notify", (True, False))
+def test_notify_inval_entry(tmpdir, notify, output_checker):
+ mnt_dir = str(tmpdir)
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'notify_inval_entry'),
+ '-f', '--update-interval=1',
+ '--timeout=5', mnt_dir ]
+ if not notify:
+ cmdline.append('--no-notify')
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
+ try:
+ os.stat(fname)
+ except FileNotFoundError:
+ # We may have hit a race condition and issued
+ # readdir just before the name changed
+ fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
+ os.stat(fname)
+
+ safe_sleep(2)
+ if not notify:
+ os.stat(fname)
+ safe_sleep(5)
+ with pytest.raises(FileNotFoundError):
+ os.stat(fname)
+ except:
+ cleanup(mount_process, mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+@pytest.mark.skipif(os.getuid() != 0,
+ reason='needs to run as root')
+def test_cuse(output_checker):
+
+ # Valgrind warns about unknown ioctls, that's ok
+ output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
+ r'==\1== \s{3}.+\n'
+ r'==\1== \s{3}.+$', count=0)
+
+ devname = 'cuse-test-%d' % os.getpid()
+ devpath = '/dev/%s' % devname
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'cuse'),
+ '-f', '--name=%s' % devname ]
+ mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
+ stderr=output_checker.fd)
+
+ cmdline = base_cmdline + \
+ [ pjoin(basename, 'example', 'cuse_client'),
+ devpath ]
+ try:
+ wait_for_mount(mount_process, devpath,
+ test_fn=os.path.exists)
+ assert subprocess.check_output(cmdline + ['s']) == b'0\n'
+ data = b'some test data'
+ off = 5
+ proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
+ stdin=subprocess.PIPE)
+ proc.stdin.write(data)
+ proc.stdin.close()
+ assert proc.wait(timeout=10) == 0
+ size = str(off + len(data)).encode() + b'\n'
+ assert subprocess.check_output(cmdline + ['s']) == size
+ out = subprocess.check_output(
+ cmdline + [ 'r', str(off + len(data) + 2), '0' ])
+ assert out == (b'\0' * off) + data
+ finally:
+ mount_process.terminate()
+
+@contextmanager
+def os_open(name, flags):
+ fd = os.open(name, flags)
+ try:
+ yield fd
+ finally:
+ os.close(fd)
+
+def os_create(name):
+ os.close(os.open(name, os.O_CREAT | os.O_RDWR))
+
+def tst_unlink(src_dir, mnt_dir):
+ name = name_generator()
+ fullname = mnt_dir + "/" + name
+ with open(pjoin(src_dir, name), 'wb') as fh:
+ fh.write(b'hello')
+ assert name in os.listdir(mnt_dir)
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+
+def tst_mkdir(mnt_dir):
+ dirname = name_generator()
+ fullname = mnt_dir + "/" + dirname
+ os.mkdir(fullname)
+ fstat = os.stat(fullname)
+ assert stat.S_ISDIR(fstat.st_mode)
+ assert os.listdir(fullname) == []
+ # Some filesystem (e.g. BTRFS) don't track st_nlink for directories
+ assert fstat.st_nlink in (1,2)
+ assert dirname in os.listdir(mnt_dir)
+
+def tst_rmdir(src_dir, mnt_dir):
+ name = name_generator()
+ fullname = mnt_dir + "/" + name
+ os.mkdir(pjoin(src_dir, name))
+ assert name in os.listdir(mnt_dir)
+ os.rmdir(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+
+def tst_symlink(mnt_dir):
+ linkname = name_generator()
+ fullname = mnt_dir + "/" + linkname
+ os.symlink("/imaginary/dest", fullname)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISLNK(fstat.st_mode)
+ assert os.readlink(fullname) == "/imaginary/dest"
+ assert fstat.st_nlink == 1
+ assert linkname in os.listdir(mnt_dir)
+
+def tst_create(mnt_dir):
+ name = name_generator()
+ fullname = pjoin(mnt_dir, name)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+
+ fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+
+ assert name in os.listdir(mnt_dir)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISREG(fstat.st_mode)
+ assert fstat.st_nlink == 1
+ assert fstat.st_size == 0
+
+def tst_chown(mnt_dir):
+ filename = pjoin(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+ uid = fstat.st_uid
+ gid = fstat.st_gid
+
+ uid_new = uid + 1
+ os.chown(filename, uid_new, -1)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid
+
+ gid_new = gid + 1
+ os.chown(filename, -1, gid_new)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid_new
+
+def tst_open_read(src_dir, mnt_dir):
+ name = name_generator()
+ with open(pjoin(src_dir, name), 'wb') as fh_out, \
+ open(TEST_FILE, 'rb') as fh_in:
+ shutil.copyfileobj(fh_in, fh_out)
+
+ assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
+
+def tst_open_write(src_dir, mnt_dir):
+ name = name_generator()
+ os_create(pjoin(src_dir, name))
+ fullname = pjoin(mnt_dir, name)
+ with open(fullname, 'wb') as fh_out, \
+ open(TEST_FILE, 'rb') as fh_in:
+ shutil.copyfileobj(fh_in, fh_out)
+
+ assert filecmp.cmp(fullname, TEST_FILE, False)
+
+def tst_append(src_dir, mnt_dir):
+ name = name_generator()
+ os_create(pjoin(src_dir, name))
+ fullname = pjoin(mnt_dir, name)
+ with os_open(fullname, os.O_WRONLY) as fd:
+ os.write(fd, b'foo\n')
+ with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
+ os.write(fd, b'bar\n')
+
+ with open(fullname, 'rb') as fh:
+ assert fh.read() == b'foo\nbar\n'
+
+def tst_seek(src_dir, mnt_dir):
+ name = name_generator()
+ os_create(pjoin(src_dir, name))
+ fullname = pjoin(mnt_dir, name)
+ with os_open(fullname, os.O_WRONLY) as fd:
+ os.lseek(fd, 1, os.SEEK_SET)
+ os.write(fd, b'foobar\n')
+ with os_open(fullname, os.O_WRONLY) as fd:
+ os.lseek(fd, 4, os.SEEK_SET)
+ os.write(fd, b'com')
+
+ with open(fullname, 'rb') as fh:
+ assert fh.read() == b'\0foocom\n'
+
+def tst_open_unlink(mnt_dir):
+ name = pjoin(mnt_dir, name_generator())
+ data1 = b'foo'
+ data2 = b'bar'
+ fullname = pjoin(mnt_dir, name)
+ with open(fullname, 'wb+', buffering=0) as fh:
+ fh.write(data1)
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert name not in os.listdir(mnt_dir)
+ fh.write(data2)
+ fh.seek(0)
+ assert fh.read() == data1+data2
+
+def tst_statvfs(mnt_dir):
+ os.statvfs(mnt_dir)
+
+def tst_link(mnt_dir):
+ name1 = pjoin(mnt_dir, name_generator())
+ name2 = pjoin(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name1)
+ assert filecmp.cmp(name1, TEST_FILE, False)
+
+ fstat1 = os.lstat(name1)
+ assert fstat1.st_nlink == 1
+
+ os.link(name1, name2)
+
+ fstat1 = os.lstat(name1)
+ fstat2 = os.lstat(name2)
+ assert fstat1 == fstat2
+ assert fstat1.st_nlink == 2
+ assert os.path.basename(name2) in os.listdir(mnt_dir)
+ assert filecmp.cmp(name1, name2, False)
+
+ # Since RELEASE requests are asynchronous, it is possible that
+ # libfuse still considers the file to be open at this point
+ # and (since -o hard_remove is not used) renames it instead of
+ # deleting it. In that case, the following lstat() call will
+ # still report an st_nlink value of 2 (cf. issue #157).
+ os.unlink(name2)
+
+ assert os.path.basename(name2) not in os.listdir(mnt_dir)
+ with pytest.raises(FileNotFoundError):
+ os.lstat(name2)
+
+ # See above, we may have to wait until RELEASE has been
+ # received before the st_nlink value is correct.
+ maxwait = time.time() + 2
+ fstat1 = os.lstat(name1)
+ while fstat1.st_nlink == 2 and time.time() < maxwait:
+ fstat1 = os.lstat(name1)
+ time.sleep(0.1)
+ assert fstat1.st_nlink == 1
+
+ os.unlink(name1)
+
+def tst_readdir(src_dir, mnt_dir):
+ newdir = name_generator()
+
+ src_newdir = pjoin(src_dir, newdir)
+ mnt_newdir = pjoin(mnt_dir, newdir)
+ file_ = src_newdir + "/" + name_generator()
+ subdir = src_newdir + "/" + name_generator()
+ subfile = subdir + "/" + name_generator()
+
+ os.mkdir(src_newdir)
+ shutil.copyfile(TEST_FILE, file_)
+ os.mkdir(subdir)
+ shutil.copyfile(TEST_FILE, subfile)
+
+ listdir_is = os.listdir(mnt_newdir)
+ listdir_is.sort()
+ listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
+ listdir_should.sort()
+ assert listdir_is == listdir_should
+
+ os.unlink(file_)
+ os.unlink(subfile)
+ os.rmdir(subdir)
+ os.rmdir(src_newdir)
+
+def tst_readdir_big(src_dir, mnt_dir):
+
+ # Add enough entries so that readdir needs to be called
+ # multiple times.
+ fnames = []
+ for i in range(500):
+ fname = ('A rather long filename to make sure that we '
+ 'fill up the buffer - ' * 3) + str(i)
+ with open(pjoin(src_dir, fname), 'w') as fh:
+ fh.write('File %d' % i)
+ fnames.append(fname)
+
+ listdir_is = sorted(os.listdir(mnt_dir))
+ listdir_should = sorted(os.listdir(src_dir))
+ assert listdir_is == listdir_should
+
+ for fname in fnames:
+ stat_src = os.stat(pjoin(src_dir, fname))
+ stat_mnt = os.stat(pjoin(mnt_dir, fname))
+ assert stat_src.st_ino == stat_mnt.st_ino
+ assert stat_src.st_mtime == stat_mnt.st_mtime
+ assert stat_src.st_ctime == stat_mnt.st_ctime
+ assert stat_src.st_size == stat_mnt.st_size
+ os.unlink(pjoin(src_dir, fname))
+
+def tst_truncate_path(mnt_dir):
+ assert len(TEST_DATA) > 1024
+
+ filename = pjoin(mnt_dir, name_generator())
+ with open(filename, 'wb') as fh:
+ fh.write(TEST_DATA)
+
+ fstat = os.stat(filename)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.truncate(filename, size + 1024)
+ assert os.stat(filename).st_size == size + 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.truncate(filename, size - 1024)
+ assert os.stat(filename).st_size == size - 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+ os.unlink(filename)
+
+def tst_truncate_fd(mnt_dir):
+ assert len(TEST_DATA) > 1024
+ with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
+ fd = fh.fileno()
+ fh.write(TEST_DATA)
+ fstat = os.fstat(fd)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.ftruncate(fd, size + 1024)
+ assert os.fstat(fd).st_size == size + 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.ftruncate(fd, size - 1024)
+ assert os.fstat(fd).st_size == size - 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+def tst_utimens(mnt_dir, ns_tol=0):
+ filename = pjoin(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+
+ atime = fstat.st_atime + 42.28
+ mtime = fstat.st_mtime - 42.23
+ if sys.version_info < (3,3):
+ os.utime(filename, (atime, mtime))
+ else:
+ atime_ns = fstat.st_atime_ns + int(42.28*1e9)
+ mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
+ os.utime(filename, None, ns=(atime_ns, mtime_ns))
+
+ fstat = os.lstat(filename)
+
+ assert abs(fstat.st_atime - atime) < 1
+ assert abs(fstat.st_mtime - mtime) < 1
+ if sys.version_info >= (3,3):
+ assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
+ assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
+
+def tst_passthrough(src_dir, mnt_dir):
+ name = name_generator()
+ src_name = pjoin(src_dir, name)
+ mnt_name = pjoin(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(src_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
+
+ name = name_generator()
+ src_name = pjoin(src_dir, name)
+ mnt_name = pjoin(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(mnt_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
+
+# avoid warning about unused import
+test_printcap
+
+