diff options
Diffstat (limited to 'test/test_examples.py')
-rwxr-xr-x | test/test_examples.py | 719 |
1 files changed, 0 insertions, 719 deletions
diff --git a/test/test_examples.py b/test/test_examples.py deleted file mode 100755 index 5d6dacc..0000000 --- a/test/test_examples.py +++ /dev/null @@ -1,719 +0,0 @@ -#!/usr/bin/env python3 - -if __name__ == '__main__': - import pytest - import sys - sys.exit(pytest.main([__file__] + sys.argv[1:])) - -import subprocess -import os -import sys -import py -import pytest -import stat -import shutil -import filecmp -import tempfile -import time -import errno -import sys -from tempfile import NamedTemporaryFile -from contextlib import contextmanager -from util import (wait_for_mount, umount, cleanup, base_cmdline, - safe_sleep, basename, fuse_test_marker, test_printcap, - fuse_proto, powerset) -from os.path import join as pjoin - -pytestmark = fuse_test_marker() - -TEST_FILE = __file__ - -with open(TEST_FILE, 'rb') as fh: - TEST_DATA = fh.read() - -def name_generator(__ctr=[0]): - __ctr[0] += 1 - return 'testfile_%d' % __ctr[0] - -options = [] -if sys.platform == 'linux': - options.append('clone_fd') - -def invoke_directly(mnt_dir, name, options): - cmdline = base_cmdline + [ pjoin(basename, 'example', name), - '-f', mnt_dir, '-o', ','.join(options) ] - if name == 'hello_ll': - # supports single-threading only - cmdline.append('-s') - - return cmdline - -def invoke_mount_fuse(mnt_dir, name, options): - return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'), - name, mnt_dir, '-o', ','.join(options) ] - -def invoke_mount_fuse_drop_privileges(mnt_dir, name, options): - if os.getuid() != 0: - pytest.skip('drop_privileges requires root, skipping.') - - return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',)) - -class raii_tmpdir: - def __init__(self): - self.d = tempfile.mkdtemp() - - def __str__(self): - return str(self.d) - - def mkdir(self, path): - return py.path.local(str(self.d)).mkdir(path) - -@pytest.fixture -def short_tmpdir(): - return raii_tmpdir() - -@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse, - invoke_mount_fuse_drop_privileges)) -@pytest.mark.parametrize("options", powerset(options)) -@pytest.mark.parametrize("name", ('hello', 'hello_ll')) -def test_hello(tmpdir, name, options, cmdline_builder, output_checker): - mnt_dir = str(tmpdir) - mount_process = subprocess.Popen( - cmdline_builder(mnt_dir, name, options), - stdout=output_checker.fd, stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - assert os.listdir(mnt_dir) == [ 'hello' ] - filename = pjoin(mnt_dir, 'hello') - with open(filename, 'r') as fh: - assert fh.read() == 'Hello World!\n' - with pytest.raises(IOError) as exc_info: - open(filename, 'r+') - assert exc_info.value.errno == errno.EACCES - with pytest.raises(IOError) as exc_info: - open(filename + 'does-not-exist', 'r+') - assert exc_info.value.errno == errno.ENOENT - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - -@pytest.mark.parametrize("writeback", (False, True)) -@pytest.mark.parametrize("name", ('passthrough', 'passthrough_fh', 'passthrough_ll')) -@pytest.mark.parametrize("debug", (False, True)) -def test_passthrough(short_tmpdir, name, debug, output_checker, writeback): - # Avoid false positives from libfuse debug messages - if debug: - output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$', - count=0) - - # test_syscalls prints "No error" under FreeBSD - output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]", - count=0) - - mnt_dir = str(short_tmpdir.mkdir('mnt')) - src_dir = str(short_tmpdir.mkdir('src')) - - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', name), - '-f', mnt_dir ] - if debug: - cmdline.append('-d') - - if writeback: - if name != 'passthrough_ll': - pytest.skip('example does not support writeback caching') - cmdline.append('-o') - cmdline.append('writeback') - - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - work_dir = mnt_dir + src_dir - - tst_statvfs(work_dir) - tst_readdir(src_dir, work_dir) - tst_readdir_big(src_dir, work_dir) - tst_open_read(src_dir, work_dir) - tst_open_write(src_dir, work_dir) - tst_create(work_dir) - tst_passthrough(src_dir, work_dir) - tst_append(src_dir, work_dir) - tst_seek(src_dir, work_dir) - tst_mkdir(work_dir) - tst_rmdir(src_dir, work_dir) - tst_unlink(src_dir, work_dir) - tst_symlink(work_dir) - if os.getuid() == 0: - tst_chown(work_dir) - - # Underlying fs may not have full nanosecond resolution - tst_utimens(work_dir, ns_tol=1000) - - tst_link(work_dir) - tst_truncate_path(work_dir) - tst_truncate_fd(work_dir) - tst_open_unlink(work_dir) - - syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), - work_dir, ':' + src_dir ] - if writeback: - # When writeback caching is enabled, kernel has to open files for - # reading even when userspace opens with O_WDONLY. This fails if the - # filesystem process doesn't have special permission. - syscall_test_cmd.append('-53') - subprocess.check_call(syscall_test_cmd) - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - -@pytest.mark.parametrize("cache", (False, True)) -def test_passthrough_hp(short_tmpdir, cache, output_checker): - mnt_dir = str(short_tmpdir.mkdir('mnt')) - src_dir = str(short_tmpdir.mkdir('src')) - - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'passthrough_hp'), - src_dir, mnt_dir ] - - if not cache: - cmdline.append('--nocache') - - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - - tst_statvfs(mnt_dir) - tst_readdir(src_dir, mnt_dir) - tst_readdir_big(src_dir, mnt_dir) - tst_open_read(src_dir, mnt_dir) - tst_open_write(src_dir, mnt_dir) - tst_create(mnt_dir) - if not cache: - tst_passthrough(src_dir, mnt_dir) - tst_append(src_dir, mnt_dir) - tst_seek(src_dir, mnt_dir) - tst_mkdir(mnt_dir) - tst_rmdir(src_dir, mnt_dir) - tst_unlink(src_dir, mnt_dir) - tst_symlink(mnt_dir) - if os.getuid() == 0: - tst_chown(mnt_dir) - - # Underlying fs may not have full nanosecond resolution - tst_utimens(mnt_dir, ns_tol=1000) - - tst_link(mnt_dir) - tst_truncate_path(mnt_dir) - tst_truncate_fd(mnt_dir) - tst_open_unlink(mnt_dir) - - # test_syscalls assumes that changes in source directory - # will be reflected immediately in mountpoint, so we - # can't use it. - if not cache: - syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'), - mnt_dir, ':' + src_dir ] - subprocess.check_call(syscall_test_cmd) - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - - -@pytest.mark.skipif(fuse_proto < (7,11), - reason='not supported by running kernel') -def test_ioctl(tmpdir, output_checker): - progname = pjoin(basename, 'example', 'ioctl') - if not os.path.exists(progname): - pytest.skip('%s not built' % os.path.basename(progname)) - - mnt_dir = str(tmpdir) - testfile = pjoin(mnt_dir, 'fioc') - cmdline = base_cmdline + [progname, '-f', mnt_dir ] - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'ioctl_client'), - testfile ] - assert subprocess.check_output(cmdline) == b'0\n' - with open(testfile, 'wb') as fh: - fh.write(b'foobar') - assert subprocess.check_output(cmdline) == b'6\n' - subprocess.check_call(cmdline + [ '3' ]) - with open(testfile, 'rb') as fh: - assert fh.read()== b'foo' - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - -def test_poll(tmpdir, output_checker): - mnt_dir = str(tmpdir) - cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'), - '-f', mnt_dir ] - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'poll_client') ] - subprocess.check_call(cmdline, cwd=mnt_dir) - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - -def test_null(tmpdir, output_checker): - progname = pjoin(basename, 'example', 'null') - if not os.path.exists(progname): - pytest.skip('%s not built' % os.path.basename(progname)) - - mnt_file = str(tmpdir) + '/file' - with open(mnt_file, 'w') as fh: - fh.write('dummy') - cmdline = base_cmdline + [ progname, '-f', mnt_file ] - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - def test_fn(name): - return os.stat(name).st_size > 4000 - try: - wait_for_mount(mount_process, mnt_file, test_fn) - with open(mnt_file, 'rb') as fh: - assert fh.read(382) == b'\0' * 382 - with open(mnt_file, 'wb') as fh: - fh.write(b'whatever') - except: - cleanup(mount_process, mnt_file) - raise - else: - umount(mount_process, mnt_file) - - -@pytest.mark.skipif(fuse_proto < (7,12), - reason='not supported by running kernel') -@pytest.mark.parametrize("notify", (True, False)) -def test_notify_inval_entry(tmpdir, notify, output_checker): - mnt_dir = str(tmpdir) - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'notify_inval_entry'), - '-f', '--update-interval=1', - '--timeout=5', mnt_dir ] - if not notify: - cmdline.append('--no-notify') - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - try: - wait_for_mount(mount_process, mnt_dir) - fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) - try: - os.stat(fname) - except FileNotFoundError: - # We may have hit a race condition and issued - # readdir just before the name changed - fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0]) - os.stat(fname) - - safe_sleep(2) - if not notify: - os.stat(fname) - safe_sleep(5) - with pytest.raises(FileNotFoundError): - os.stat(fname) - except: - cleanup(mount_process, mnt_dir) - raise - else: - umount(mount_process, mnt_dir) - -@pytest.mark.skipif(os.getuid() != 0, - reason='needs to run as root') -def test_cuse(output_checker): - - # Valgrind warns about unknown ioctls, that's ok - output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n' - r'==\1== \s{3}.+\n' - r'==\1== \s{3}.+$', count=0) - - devname = 'cuse-test-%d' % os.getpid() - devpath = '/dev/%s' % devname - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'cuse'), - '-f', '--name=%s' % devname ] - mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd, - stderr=output_checker.fd) - - cmdline = base_cmdline + \ - [ pjoin(basename, 'example', 'cuse_client'), - devpath ] - try: - wait_for_mount(mount_process, devpath, - test_fn=os.path.exists) - assert subprocess.check_output(cmdline + ['s']) == b'0\n' - data = b'some test data' - off = 5 - proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ], - stdin=subprocess.PIPE) - proc.stdin.write(data) - proc.stdin.close() - assert proc.wait(timeout=10) == 0 - size = str(off + len(data)).encode() + b'\n' - assert subprocess.check_output(cmdline + ['s']) == size - out = subprocess.check_output( - cmdline + [ 'r', str(off + len(data) + 2), '0' ]) - assert out == (b'\0' * off) + data - finally: - mount_process.terminate() - -@contextmanager -def os_open(name, flags): - fd = os.open(name, flags) - try: - yield fd - finally: - os.close(fd) - -def os_create(name): - os.close(os.open(name, os.O_CREAT | os.O_RDWR)) - -def tst_unlink(src_dir, mnt_dir): - name = name_generator() - fullname = mnt_dir + "/" + name - with open(pjoin(src_dir, name), 'wb') as fh: - fh.write(b'hello') - assert name in os.listdir(mnt_dir) - os.unlink(fullname) - with pytest.raises(OSError) as exc_info: - os.stat(fullname) - assert exc_info.value.errno == errno.ENOENT - assert name not in os.listdir(mnt_dir) - -def tst_mkdir(mnt_dir): - dirname = name_generator() - fullname = mnt_dir + "/" + dirname - os.mkdir(fullname) - fstat = os.stat(fullname) - assert stat.S_ISDIR(fstat.st_mode) - assert os.listdir(fullname) == [] - # Some filesystem (e.g. BTRFS) don't track st_nlink for directories - assert fstat.st_nlink in (1,2) - assert dirname in os.listdir(mnt_dir) - -def tst_rmdir(src_dir, mnt_dir): - name = name_generator() - fullname = mnt_dir + "/" + name - os.mkdir(pjoin(src_dir, name)) - assert name in os.listdir(mnt_dir) - os.rmdir(fullname) - with pytest.raises(OSError) as exc_info: - os.stat(fullname) - assert exc_info.value.errno == errno.ENOENT - assert name not in os.listdir(mnt_dir) - -def tst_symlink(mnt_dir): - linkname = name_generator() - fullname = mnt_dir + "/" + linkname - os.symlink("/imaginary/dest", fullname) - fstat = os.lstat(fullname) - assert stat.S_ISLNK(fstat.st_mode) - assert os.readlink(fullname) == "/imaginary/dest" - assert fstat.st_nlink == 1 - assert linkname in os.listdir(mnt_dir) - -def tst_create(mnt_dir): - name = name_generator() - fullname = pjoin(mnt_dir, name) - with pytest.raises(OSError) as exc_info: - os.stat(fullname) - assert exc_info.value.errno == errno.ENOENT - assert name not in os.listdir(mnt_dir) - - fd = os.open(fullname, os.O_CREAT | os.O_RDWR) - os.close(fd) - - assert name in os.listdir(mnt_dir) - fstat = os.lstat(fullname) - assert stat.S_ISREG(fstat.st_mode) - assert fstat.st_nlink == 1 - assert fstat.st_size == 0 - -def tst_chown(mnt_dir): - filename = pjoin(mnt_dir, name_generator()) - os.mkdir(filename) - fstat = os.lstat(filename) - uid = fstat.st_uid - gid = fstat.st_gid - - uid_new = uid + 1 - os.chown(filename, uid_new, -1) - fstat = os.lstat(filename) - assert fstat.st_uid == uid_new - assert fstat.st_gid == gid - - gid_new = gid + 1 - os.chown(filename, -1, gid_new) - fstat = os.lstat(filename) - assert fstat.st_uid == uid_new - assert fstat.st_gid == gid_new - -def tst_open_read(src_dir, mnt_dir): - name = name_generator() - with open(pjoin(src_dir, name), 'wb') as fh_out, \ - open(TEST_FILE, 'rb') as fh_in: - shutil.copyfileobj(fh_in, fh_out) - - assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False) - -def tst_open_write(src_dir, mnt_dir): - name = name_generator() - os_create(pjoin(src_dir, name)) - fullname = pjoin(mnt_dir, name) - with open(fullname, 'wb') as fh_out, \ - open(TEST_FILE, 'rb') as fh_in: - shutil.copyfileobj(fh_in, fh_out) - - assert filecmp.cmp(fullname, TEST_FILE, False) - -def tst_append(src_dir, mnt_dir): - name = name_generator() - os_create(pjoin(src_dir, name)) - fullname = pjoin(mnt_dir, name) - with os_open(fullname, os.O_WRONLY) as fd: - os.write(fd, b'foo\n') - with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd: - os.write(fd, b'bar\n') - - with open(fullname, 'rb') as fh: - assert fh.read() == b'foo\nbar\n' - -def tst_seek(src_dir, mnt_dir): - name = name_generator() - os_create(pjoin(src_dir, name)) - fullname = pjoin(mnt_dir, name) - with os_open(fullname, os.O_WRONLY) as fd: - os.lseek(fd, 1, os.SEEK_SET) - os.write(fd, b'foobar\n') - with os_open(fullname, os.O_WRONLY) as fd: - os.lseek(fd, 4, os.SEEK_SET) - os.write(fd, b'com') - - with open(fullname, 'rb') as fh: - assert fh.read() == b'\0foocom\n' - -def tst_open_unlink(mnt_dir): - name = pjoin(mnt_dir, name_generator()) - data1 = b'foo' - data2 = b'bar' - fullname = pjoin(mnt_dir, name) - with open(fullname, 'wb+', buffering=0) as fh: - fh.write(data1) - os.unlink(fullname) - with pytest.raises(OSError) as exc_info: - os.stat(fullname) - assert exc_info.value.errno == errno.ENOENT - assert name not in os.listdir(mnt_dir) - fh.write(data2) - fh.seek(0) - assert fh.read() == data1+data2 - -def tst_statvfs(mnt_dir): - os.statvfs(mnt_dir) - -def tst_link(mnt_dir): - name1 = pjoin(mnt_dir, name_generator()) - name2 = pjoin(mnt_dir, name_generator()) - shutil.copyfile(TEST_FILE, name1) - assert filecmp.cmp(name1, TEST_FILE, False) - - fstat1 = os.lstat(name1) - assert fstat1.st_nlink == 1 - - os.link(name1, name2) - - fstat1 = os.lstat(name1) - fstat2 = os.lstat(name2) - assert fstat1 == fstat2 - assert fstat1.st_nlink == 2 - assert os.path.basename(name2) in os.listdir(mnt_dir) - assert filecmp.cmp(name1, name2, False) - - # Since RELEASE requests are asynchronous, it is possible that - # libfuse still considers the file to be open at this point - # and (since -o hard_remove is not used) renames it instead of - # deleting it. In that case, the following lstat() call will - # still report an st_nlink value of 2 (cf. issue #157). - os.unlink(name2) - - assert os.path.basename(name2) not in os.listdir(mnt_dir) - with pytest.raises(FileNotFoundError): - os.lstat(name2) - - # See above, we may have to wait until RELEASE has been - # received before the st_nlink value is correct. - maxwait = time.time() + 2 - fstat1 = os.lstat(name1) - while fstat1.st_nlink == 2 and time.time() < maxwait: - fstat1 = os.lstat(name1) - time.sleep(0.1) - assert fstat1.st_nlink == 1 - - os.unlink(name1) - -def tst_readdir(src_dir, mnt_dir): - newdir = name_generator() - - src_newdir = pjoin(src_dir, newdir) - mnt_newdir = pjoin(mnt_dir, newdir) - file_ = src_newdir + "/" + name_generator() - subdir = src_newdir + "/" + name_generator() - subfile = subdir + "/" + name_generator() - - os.mkdir(src_newdir) - shutil.copyfile(TEST_FILE, file_) - os.mkdir(subdir) - shutil.copyfile(TEST_FILE, subfile) - - listdir_is = os.listdir(mnt_newdir) - listdir_is.sort() - listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ] - listdir_should.sort() - assert listdir_is == listdir_should - - os.unlink(file_) - os.unlink(subfile) - os.rmdir(subdir) - os.rmdir(src_newdir) - -def tst_readdir_big(src_dir, mnt_dir): - - # Add enough entries so that readdir needs to be called - # multiple times. - fnames = [] - for i in range(500): - fname = ('A rather long filename to make sure that we ' - 'fill up the buffer - ' * 3) + str(i) - with open(pjoin(src_dir, fname), 'w') as fh: - fh.write('File %d' % i) - fnames.append(fname) - - listdir_is = sorted(os.listdir(mnt_dir)) - listdir_should = sorted(os.listdir(src_dir)) - assert listdir_is == listdir_should - - for fname in fnames: - stat_src = os.stat(pjoin(src_dir, fname)) - stat_mnt = os.stat(pjoin(mnt_dir, fname)) - assert stat_src.st_ino == stat_mnt.st_ino - assert stat_src.st_mtime == stat_mnt.st_mtime - assert stat_src.st_ctime == stat_mnt.st_ctime - assert stat_src.st_size == stat_mnt.st_size - os.unlink(pjoin(src_dir, fname)) - -def tst_truncate_path(mnt_dir): - assert len(TEST_DATA) > 1024 - - filename = pjoin(mnt_dir, name_generator()) - with open(filename, 'wb') as fh: - fh.write(TEST_DATA) - - fstat = os.stat(filename) - size = fstat.st_size - assert size == len(TEST_DATA) - - # Add zeros at the end - os.truncate(filename, size + 1024) - assert os.stat(filename).st_size == size + 1024 - with open(filename, 'rb') as fh: - assert fh.read(size) == TEST_DATA - assert fh.read(1025) == b'\0' * 1024 - - # Truncate data - os.truncate(filename, size - 1024) - assert os.stat(filename).st_size == size - 1024 - with open(filename, 'rb') as fh: - assert fh.read(size) == TEST_DATA[:size-1024] - - os.unlink(filename) - -def tst_truncate_fd(mnt_dir): - assert len(TEST_DATA) > 1024 - with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh: - fd = fh.fileno() - fh.write(TEST_DATA) - fstat = os.fstat(fd) - size = fstat.st_size - assert size == len(TEST_DATA) - - # Add zeros at the end - os.ftruncate(fd, size + 1024) - assert os.fstat(fd).st_size == size + 1024 - fh.seek(0) - assert fh.read(size) == TEST_DATA - assert fh.read(1025) == b'\0' * 1024 - - # Truncate data - os.ftruncate(fd, size - 1024) - assert os.fstat(fd).st_size == size - 1024 - fh.seek(0) - assert fh.read(size) == TEST_DATA[:size-1024] - -def tst_utimens(mnt_dir, ns_tol=0): - filename = pjoin(mnt_dir, name_generator()) - os.mkdir(filename) - fstat = os.lstat(filename) - - atime = fstat.st_atime + 42.28 - mtime = fstat.st_mtime - 42.23 - if sys.version_info < (3,3): - os.utime(filename, (atime, mtime)) - else: - atime_ns = fstat.st_atime_ns + int(42.28*1e9) - mtime_ns = fstat.st_mtime_ns - int(42.23*1e9) - os.utime(filename, None, ns=(atime_ns, mtime_ns)) - - fstat = os.lstat(filename) - - assert abs(fstat.st_atime - atime) < 1 - assert abs(fstat.st_mtime - mtime) < 1 - if sys.version_info >= (3,3): - assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol - assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol - -def tst_passthrough(src_dir, mnt_dir): - name = name_generator() - src_name = pjoin(src_dir, name) - mnt_name = pjoin(src_dir, name) - assert name not in os.listdir(src_dir) - assert name not in os.listdir(mnt_dir) - with open(src_name, 'w') as fh: - fh.write('Hello, world') - assert name in os.listdir(src_dir) - assert name in os.listdir(mnt_dir) - assert os.stat(src_name) == os.stat(mnt_name) - - name = name_generator() - src_name = pjoin(src_dir, name) - mnt_name = pjoin(src_dir, name) - assert name not in os.listdir(src_dir) - assert name not in os.listdir(mnt_dir) - with open(mnt_name, 'w') as fh: - fh.write('Hello, world') - assert name in os.listdir(src_dir) - assert name in os.listdir(mnt_dir) - assert os.stat(src_name) == os.stat(mnt_name) - -# avoid warning about unused import -test_printcap - - |