summaryrefslogtreecommitdiff
path: root/test/test_examples.py
diff options
context:
space:
mode:
authorPaul Lawrence <paullawrence@google.com>2019-08-02 16:34:58 -0700
committerandroid-build-merger <android-build-merger@google.com>2019-08-02 16:34:58 -0700
commitc77441c6bcfb18dd3c887cdf24a778960fbe0c41 (patch)
treeeb415748db51aed7c3d08a5e7dae57c8576ea940 /test/test_examples.py
parent6b03344fbe4681511bd68f81a6b2880903157c97 (diff)
parent2609a900fb4e5e09d60595e951c2afea9c1fe29e (diff)
Remove GPL files am: 94aa887ced am: b4099f5e0f
am: 2609a900fb Change-Id: I73fc30e2797980f7fc4ca0562dcbe7c0f0ba8771
Diffstat (limited to 'test/test_examples.py')
-rwxr-xr-xtest/test_examples.py719
1 files changed, 0 insertions, 719 deletions
diff --git a/test/test_examples.py b/test/test_examples.py
deleted file mode 100755
index 5d6dacc..0000000
--- a/test/test_examples.py
+++ /dev/null
@@ -1,719 +0,0 @@
-#!/usr/bin/env python3
-
-if __name__ == '__main__':
- import pytest
- import sys
- sys.exit(pytest.main([__file__] + sys.argv[1:]))
-
-import subprocess
-import os
-import sys
-import py
-import pytest
-import stat
-import shutil
-import filecmp
-import tempfile
-import time
-import errno
-import sys
-from tempfile import NamedTemporaryFile
-from contextlib import contextmanager
-from util import (wait_for_mount, umount, cleanup, base_cmdline,
- safe_sleep, basename, fuse_test_marker, test_printcap,
- fuse_proto, powerset)
-from os.path import join as pjoin
-
-pytestmark = fuse_test_marker()
-
-TEST_FILE = __file__
-
-with open(TEST_FILE, 'rb') as fh:
- TEST_DATA = fh.read()
-
-def name_generator(__ctr=[0]):
- __ctr[0] += 1
- return 'testfile_%d' % __ctr[0]
-
-options = []
-if sys.platform == 'linux':
- options.append('clone_fd')
-
-def invoke_directly(mnt_dir, name, options):
- cmdline = base_cmdline + [ pjoin(basename, 'example', name),
- '-f', mnt_dir, '-o', ','.join(options) ]
- if name == 'hello_ll':
- # supports single-threading only
- cmdline.append('-s')
-
- return cmdline
-
-def invoke_mount_fuse(mnt_dir, name, options):
- return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'),
- name, mnt_dir, '-o', ','.join(options) ]
-
-def invoke_mount_fuse_drop_privileges(mnt_dir, name, options):
- if os.getuid() != 0:
- pytest.skip('drop_privileges requires root, skipping.')
-
- return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',))
-
-class raii_tmpdir:
- def __init__(self):
- self.d = tempfile.mkdtemp()
-
- def __str__(self):
- return str(self.d)
-
- def mkdir(self, path):
- return py.path.local(str(self.d)).mkdir(path)
-
-@pytest.fixture
-def short_tmpdir():
- return raii_tmpdir()
-
-@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse,
- invoke_mount_fuse_drop_privileges))
-@pytest.mark.parametrize("options", powerset(options))
-@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
-def test_hello(tmpdir, name, options, cmdline_builder, output_checker):
- mnt_dir = str(tmpdir)
- mount_process = subprocess.Popen(
- cmdline_builder(mnt_dir, name, options),
- stdout=output_checker.fd, stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
- assert os.listdir(mnt_dir) == [ 'hello' ]
- filename = pjoin(mnt_dir, 'hello')
- with open(filename, 'r') as fh:
- assert fh.read() == 'Hello World!\n'
- with pytest.raises(IOError) as exc_info:
- open(filename, 'r+')
- assert exc_info.value.errno == errno.EACCES
- with pytest.raises(IOError) as exc_info:
- open(filename + 'does-not-exist', 'r+')
- assert exc_info.value.errno == errno.ENOENT
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-@pytest.mark.parametrize("writeback", (False, True))
-@pytest.mark.parametrize("name", ('passthrough', 'passthrough_fh', 'passthrough_ll'))
-@pytest.mark.parametrize("debug", (False, True))
-def test_passthrough(short_tmpdir, name, debug, output_checker, writeback):
- # Avoid false positives from libfuse debug messages
- if debug:
- output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
- count=0)
-
- # test_syscalls prints "No error" under FreeBSD
- output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]",
- count=0)
-
- mnt_dir = str(short_tmpdir.mkdir('mnt'))
- src_dir = str(short_tmpdir.mkdir('src'))
-
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', name),
- '-f', mnt_dir ]
- if debug:
- cmdline.append('-d')
-
- if writeback:
- if name != 'passthrough_ll':
- pytest.skip('example does not support writeback caching')
- cmdline.append('-o')
- cmdline.append('writeback')
-
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
- work_dir = mnt_dir + src_dir
-
- tst_statvfs(work_dir)
- tst_readdir(src_dir, work_dir)
- tst_readdir_big(src_dir, work_dir)
- tst_open_read(src_dir, work_dir)
- tst_open_write(src_dir, work_dir)
- tst_create(work_dir)
- tst_passthrough(src_dir, work_dir)
- tst_append(src_dir, work_dir)
- tst_seek(src_dir, work_dir)
- tst_mkdir(work_dir)
- tst_rmdir(src_dir, work_dir)
- tst_unlink(src_dir, work_dir)
- tst_symlink(work_dir)
- if os.getuid() == 0:
- tst_chown(work_dir)
-
- # Underlying fs may not have full nanosecond resolution
- tst_utimens(work_dir, ns_tol=1000)
-
- tst_link(work_dir)
- tst_truncate_path(work_dir)
- tst_truncate_fd(work_dir)
- tst_open_unlink(work_dir)
-
- syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
- work_dir, ':' + src_dir ]
- if writeback:
- # When writeback caching is enabled, kernel has to open files for
- # reading even when userspace opens with O_WDONLY. This fails if the
- # filesystem process doesn't have special permission.
- syscall_test_cmd.append('-53')
- subprocess.check_call(syscall_test_cmd)
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-@pytest.mark.parametrize("cache", (False, True))
-def test_passthrough_hp(short_tmpdir, cache, output_checker):
- mnt_dir = str(short_tmpdir.mkdir('mnt'))
- src_dir = str(short_tmpdir.mkdir('src'))
-
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'passthrough_hp'),
- src_dir, mnt_dir ]
-
- if not cache:
- cmdline.append('--nocache')
-
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
-
- tst_statvfs(mnt_dir)
- tst_readdir(src_dir, mnt_dir)
- tst_readdir_big(src_dir, mnt_dir)
- tst_open_read(src_dir, mnt_dir)
- tst_open_write(src_dir, mnt_dir)
- tst_create(mnt_dir)
- if not cache:
- tst_passthrough(src_dir, mnt_dir)
- tst_append(src_dir, mnt_dir)
- tst_seek(src_dir, mnt_dir)
- tst_mkdir(mnt_dir)
- tst_rmdir(src_dir, mnt_dir)
- tst_unlink(src_dir, mnt_dir)
- tst_symlink(mnt_dir)
- if os.getuid() == 0:
- tst_chown(mnt_dir)
-
- # Underlying fs may not have full nanosecond resolution
- tst_utimens(mnt_dir, ns_tol=1000)
-
- tst_link(mnt_dir)
- tst_truncate_path(mnt_dir)
- tst_truncate_fd(mnt_dir)
- tst_open_unlink(mnt_dir)
-
- # test_syscalls assumes that changes in source directory
- # will be reflected immediately in mountpoint, so we
- # can't use it.
- if not cache:
- syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
- mnt_dir, ':' + src_dir ]
- subprocess.check_call(syscall_test_cmd)
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-
-@pytest.mark.skipif(fuse_proto < (7,11),
- reason='not supported by running kernel')
-def test_ioctl(tmpdir, output_checker):
- progname = pjoin(basename, 'example', 'ioctl')
- if not os.path.exists(progname):
- pytest.skip('%s not built' % os.path.basename(progname))
-
- mnt_dir = str(tmpdir)
- testfile = pjoin(mnt_dir, 'fioc')
- cmdline = base_cmdline + [progname, '-f', mnt_dir ]
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
-
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'ioctl_client'),
- testfile ]
- assert subprocess.check_output(cmdline) == b'0\n'
- with open(testfile, 'wb') as fh:
- fh.write(b'foobar')
- assert subprocess.check_output(cmdline) == b'6\n'
- subprocess.check_call(cmdline + [ '3' ])
- with open(testfile, 'rb') as fh:
- assert fh.read()== b'foo'
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-def test_poll(tmpdir, output_checker):
- mnt_dir = str(tmpdir)
- cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
- '-f', mnt_dir ]
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'poll_client') ]
- subprocess.check_call(cmdline, cwd=mnt_dir)
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-def test_null(tmpdir, output_checker):
- progname = pjoin(basename, 'example', 'null')
- if not os.path.exists(progname):
- pytest.skip('%s not built' % os.path.basename(progname))
-
- mnt_file = str(tmpdir) + '/file'
- with open(mnt_file, 'w') as fh:
- fh.write('dummy')
- cmdline = base_cmdline + [ progname, '-f', mnt_file ]
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- def test_fn(name):
- return os.stat(name).st_size > 4000
- try:
- wait_for_mount(mount_process, mnt_file, test_fn)
- with open(mnt_file, 'rb') as fh:
- assert fh.read(382) == b'\0' * 382
- with open(mnt_file, 'wb') as fh:
- fh.write(b'whatever')
- except:
- cleanup(mount_process, mnt_file)
- raise
- else:
- umount(mount_process, mnt_file)
-
-
-@pytest.mark.skipif(fuse_proto < (7,12),
- reason='not supported by running kernel')
-@pytest.mark.parametrize("notify", (True, False))
-def test_notify_inval_entry(tmpdir, notify, output_checker):
- mnt_dir = str(tmpdir)
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'notify_inval_entry'),
- '-f', '--update-interval=1',
- '--timeout=5', mnt_dir ]
- if not notify:
- cmdline.append('--no-notify')
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
- try:
- wait_for_mount(mount_process, mnt_dir)
- fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
- try:
- os.stat(fname)
- except FileNotFoundError:
- # We may have hit a race condition and issued
- # readdir just before the name changed
- fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
- os.stat(fname)
-
- safe_sleep(2)
- if not notify:
- os.stat(fname)
- safe_sleep(5)
- with pytest.raises(FileNotFoundError):
- os.stat(fname)
- except:
- cleanup(mount_process, mnt_dir)
- raise
- else:
- umount(mount_process, mnt_dir)
-
-@pytest.mark.skipif(os.getuid() != 0,
- reason='needs to run as root')
-def test_cuse(output_checker):
-
- # Valgrind warns about unknown ioctls, that's ok
- output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
- r'==\1== \s{3}.+\n'
- r'==\1== \s{3}.+$', count=0)
-
- devname = 'cuse-test-%d' % os.getpid()
- devpath = '/dev/%s' % devname
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'cuse'),
- '-f', '--name=%s' % devname ]
- mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
- stderr=output_checker.fd)
-
- cmdline = base_cmdline + \
- [ pjoin(basename, 'example', 'cuse_client'),
- devpath ]
- try:
- wait_for_mount(mount_process, devpath,
- test_fn=os.path.exists)
- assert subprocess.check_output(cmdline + ['s']) == b'0\n'
- data = b'some test data'
- off = 5
- proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
- stdin=subprocess.PIPE)
- proc.stdin.write(data)
- proc.stdin.close()
- assert proc.wait(timeout=10) == 0
- size = str(off + len(data)).encode() + b'\n'
- assert subprocess.check_output(cmdline + ['s']) == size
- out = subprocess.check_output(
- cmdline + [ 'r', str(off + len(data) + 2), '0' ])
- assert out == (b'\0' * off) + data
- finally:
- mount_process.terminate()
-
-@contextmanager
-def os_open(name, flags):
- fd = os.open(name, flags)
- try:
- yield fd
- finally:
- os.close(fd)
-
-def os_create(name):
- os.close(os.open(name, os.O_CREAT | os.O_RDWR))
-
-def tst_unlink(src_dir, mnt_dir):
- name = name_generator()
- fullname = mnt_dir + "/" + name
- with open(pjoin(src_dir, name), 'wb') as fh:
- fh.write(b'hello')
- assert name in os.listdir(mnt_dir)
- os.unlink(fullname)
- with pytest.raises(OSError) as exc_info:
- os.stat(fullname)
- assert exc_info.value.errno == errno.ENOENT
- assert name not in os.listdir(mnt_dir)
-
-def tst_mkdir(mnt_dir):
- dirname = name_generator()
- fullname = mnt_dir + "/" + dirname
- os.mkdir(fullname)
- fstat = os.stat(fullname)
- assert stat.S_ISDIR(fstat.st_mode)
- assert os.listdir(fullname) == []
- # Some filesystem (e.g. BTRFS) don't track st_nlink for directories
- assert fstat.st_nlink in (1,2)
- assert dirname in os.listdir(mnt_dir)
-
-def tst_rmdir(src_dir, mnt_dir):
- name = name_generator()
- fullname = mnt_dir + "/" + name
- os.mkdir(pjoin(src_dir, name))
- assert name in os.listdir(mnt_dir)
- os.rmdir(fullname)
- with pytest.raises(OSError) as exc_info:
- os.stat(fullname)
- assert exc_info.value.errno == errno.ENOENT
- assert name not in os.listdir(mnt_dir)
-
-def tst_symlink(mnt_dir):
- linkname = name_generator()
- fullname = mnt_dir + "/" + linkname
- os.symlink("/imaginary/dest", fullname)
- fstat = os.lstat(fullname)
- assert stat.S_ISLNK(fstat.st_mode)
- assert os.readlink(fullname) == "/imaginary/dest"
- assert fstat.st_nlink == 1
- assert linkname in os.listdir(mnt_dir)
-
-def tst_create(mnt_dir):
- name = name_generator()
- fullname = pjoin(mnt_dir, name)
- with pytest.raises(OSError) as exc_info:
- os.stat(fullname)
- assert exc_info.value.errno == errno.ENOENT
- assert name not in os.listdir(mnt_dir)
-
- fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
- os.close(fd)
-
- assert name in os.listdir(mnt_dir)
- fstat = os.lstat(fullname)
- assert stat.S_ISREG(fstat.st_mode)
- assert fstat.st_nlink == 1
- assert fstat.st_size == 0
-
-def tst_chown(mnt_dir):
- filename = pjoin(mnt_dir, name_generator())
- os.mkdir(filename)
- fstat = os.lstat(filename)
- uid = fstat.st_uid
- gid = fstat.st_gid
-
- uid_new = uid + 1
- os.chown(filename, uid_new, -1)
- fstat = os.lstat(filename)
- assert fstat.st_uid == uid_new
- assert fstat.st_gid == gid
-
- gid_new = gid + 1
- os.chown(filename, -1, gid_new)
- fstat = os.lstat(filename)
- assert fstat.st_uid == uid_new
- assert fstat.st_gid == gid_new
-
-def tst_open_read(src_dir, mnt_dir):
- name = name_generator()
- with open(pjoin(src_dir, name), 'wb') as fh_out, \
- open(TEST_FILE, 'rb') as fh_in:
- shutil.copyfileobj(fh_in, fh_out)
-
- assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
-
-def tst_open_write(src_dir, mnt_dir):
- name = name_generator()
- os_create(pjoin(src_dir, name))
- fullname = pjoin(mnt_dir, name)
- with open(fullname, 'wb') as fh_out, \
- open(TEST_FILE, 'rb') as fh_in:
- shutil.copyfileobj(fh_in, fh_out)
-
- assert filecmp.cmp(fullname, TEST_FILE, False)
-
-def tst_append(src_dir, mnt_dir):
- name = name_generator()
- os_create(pjoin(src_dir, name))
- fullname = pjoin(mnt_dir, name)
- with os_open(fullname, os.O_WRONLY) as fd:
- os.write(fd, b'foo\n')
- with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
- os.write(fd, b'bar\n')
-
- with open(fullname, 'rb') as fh:
- assert fh.read() == b'foo\nbar\n'
-
-def tst_seek(src_dir, mnt_dir):
- name = name_generator()
- os_create(pjoin(src_dir, name))
- fullname = pjoin(mnt_dir, name)
- with os_open(fullname, os.O_WRONLY) as fd:
- os.lseek(fd, 1, os.SEEK_SET)
- os.write(fd, b'foobar\n')
- with os_open(fullname, os.O_WRONLY) as fd:
- os.lseek(fd, 4, os.SEEK_SET)
- os.write(fd, b'com')
-
- with open(fullname, 'rb') as fh:
- assert fh.read() == b'\0foocom\n'
-
-def tst_open_unlink(mnt_dir):
- name = pjoin(mnt_dir, name_generator())
- data1 = b'foo'
- data2 = b'bar'
- fullname = pjoin(mnt_dir, name)
- with open(fullname, 'wb+', buffering=0) as fh:
- fh.write(data1)
- os.unlink(fullname)
- with pytest.raises(OSError) as exc_info:
- os.stat(fullname)
- assert exc_info.value.errno == errno.ENOENT
- assert name not in os.listdir(mnt_dir)
- fh.write(data2)
- fh.seek(0)
- assert fh.read() == data1+data2
-
-def tst_statvfs(mnt_dir):
- os.statvfs(mnt_dir)
-
-def tst_link(mnt_dir):
- name1 = pjoin(mnt_dir, name_generator())
- name2 = pjoin(mnt_dir, name_generator())
- shutil.copyfile(TEST_FILE, name1)
- assert filecmp.cmp(name1, TEST_FILE, False)
-
- fstat1 = os.lstat(name1)
- assert fstat1.st_nlink == 1
-
- os.link(name1, name2)
-
- fstat1 = os.lstat(name1)
- fstat2 = os.lstat(name2)
- assert fstat1 == fstat2
- assert fstat1.st_nlink == 2
- assert os.path.basename(name2) in os.listdir(mnt_dir)
- assert filecmp.cmp(name1, name2, False)
-
- # Since RELEASE requests are asynchronous, it is possible that
- # libfuse still considers the file to be open at this point
- # and (since -o hard_remove is not used) renames it instead of
- # deleting it. In that case, the following lstat() call will
- # still report an st_nlink value of 2 (cf. issue #157).
- os.unlink(name2)
-
- assert os.path.basename(name2) not in os.listdir(mnt_dir)
- with pytest.raises(FileNotFoundError):
- os.lstat(name2)
-
- # See above, we may have to wait until RELEASE has been
- # received before the st_nlink value is correct.
- maxwait = time.time() + 2
- fstat1 = os.lstat(name1)
- while fstat1.st_nlink == 2 and time.time() < maxwait:
- fstat1 = os.lstat(name1)
- time.sleep(0.1)
- assert fstat1.st_nlink == 1
-
- os.unlink(name1)
-
-def tst_readdir(src_dir, mnt_dir):
- newdir = name_generator()
-
- src_newdir = pjoin(src_dir, newdir)
- mnt_newdir = pjoin(mnt_dir, newdir)
- file_ = src_newdir + "/" + name_generator()
- subdir = src_newdir + "/" + name_generator()
- subfile = subdir + "/" + name_generator()
-
- os.mkdir(src_newdir)
- shutil.copyfile(TEST_FILE, file_)
- os.mkdir(subdir)
- shutil.copyfile(TEST_FILE, subfile)
-
- listdir_is = os.listdir(mnt_newdir)
- listdir_is.sort()
- listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
- listdir_should.sort()
- assert listdir_is == listdir_should
-
- os.unlink(file_)
- os.unlink(subfile)
- os.rmdir(subdir)
- os.rmdir(src_newdir)
-
-def tst_readdir_big(src_dir, mnt_dir):
-
- # Add enough entries so that readdir needs to be called
- # multiple times.
- fnames = []
- for i in range(500):
- fname = ('A rather long filename to make sure that we '
- 'fill up the buffer - ' * 3) + str(i)
- with open(pjoin(src_dir, fname), 'w') as fh:
- fh.write('File %d' % i)
- fnames.append(fname)
-
- listdir_is = sorted(os.listdir(mnt_dir))
- listdir_should = sorted(os.listdir(src_dir))
- assert listdir_is == listdir_should
-
- for fname in fnames:
- stat_src = os.stat(pjoin(src_dir, fname))
- stat_mnt = os.stat(pjoin(mnt_dir, fname))
- assert stat_src.st_ino == stat_mnt.st_ino
- assert stat_src.st_mtime == stat_mnt.st_mtime
- assert stat_src.st_ctime == stat_mnt.st_ctime
- assert stat_src.st_size == stat_mnt.st_size
- os.unlink(pjoin(src_dir, fname))
-
-def tst_truncate_path(mnt_dir):
- assert len(TEST_DATA) > 1024
-
- filename = pjoin(mnt_dir, name_generator())
- with open(filename, 'wb') as fh:
- fh.write(TEST_DATA)
-
- fstat = os.stat(filename)
- size = fstat.st_size
- assert size == len(TEST_DATA)
-
- # Add zeros at the end
- os.truncate(filename, size + 1024)
- assert os.stat(filename).st_size == size + 1024
- with open(filename, 'rb') as fh:
- assert fh.read(size) == TEST_DATA
- assert fh.read(1025) == b'\0' * 1024
-
- # Truncate data
- os.truncate(filename, size - 1024)
- assert os.stat(filename).st_size == size - 1024
- with open(filename, 'rb') as fh:
- assert fh.read(size) == TEST_DATA[:size-1024]
-
- os.unlink(filename)
-
-def tst_truncate_fd(mnt_dir):
- assert len(TEST_DATA) > 1024
- with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
- fd = fh.fileno()
- fh.write(TEST_DATA)
- fstat = os.fstat(fd)
- size = fstat.st_size
- assert size == len(TEST_DATA)
-
- # Add zeros at the end
- os.ftruncate(fd, size + 1024)
- assert os.fstat(fd).st_size == size + 1024
- fh.seek(0)
- assert fh.read(size) == TEST_DATA
- assert fh.read(1025) == b'\0' * 1024
-
- # Truncate data
- os.ftruncate(fd, size - 1024)
- assert os.fstat(fd).st_size == size - 1024
- fh.seek(0)
- assert fh.read(size) == TEST_DATA[:size-1024]
-
-def tst_utimens(mnt_dir, ns_tol=0):
- filename = pjoin(mnt_dir, name_generator())
- os.mkdir(filename)
- fstat = os.lstat(filename)
-
- atime = fstat.st_atime + 42.28
- mtime = fstat.st_mtime - 42.23
- if sys.version_info < (3,3):
- os.utime(filename, (atime, mtime))
- else:
- atime_ns = fstat.st_atime_ns + int(42.28*1e9)
- mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
- os.utime(filename, None, ns=(atime_ns, mtime_ns))
-
- fstat = os.lstat(filename)
-
- assert abs(fstat.st_atime - atime) < 1
- assert abs(fstat.st_mtime - mtime) < 1
- if sys.version_info >= (3,3):
- assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
- assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
-
-def tst_passthrough(src_dir, mnt_dir):
- name = name_generator()
- src_name = pjoin(src_dir, name)
- mnt_name = pjoin(src_dir, name)
- assert name not in os.listdir(src_dir)
- assert name not in os.listdir(mnt_dir)
- with open(src_name, 'w') as fh:
- fh.write('Hello, world')
- assert name in os.listdir(src_dir)
- assert name in os.listdir(mnt_dir)
- assert os.stat(src_name) == os.stat(mnt_name)
-
- name = name_generator()
- src_name = pjoin(src_dir, name)
- mnt_name = pjoin(src_dir, name)
- assert name not in os.listdir(src_dir)
- assert name not in os.listdir(mnt_dir)
- with open(mnt_name, 'w') as fh:
- fh.write('Hello, world')
- assert name in os.listdir(src_dir)
- assert name in os.listdir(mnt_dir)
- assert os.stat(src_name) == os.stat(mnt_name)
-
-# avoid warning about unused import
-test_printcap
-
-