Python源码示例:ntpath.join()
示例1
def test_copy_with_dir_as_target(smb_share):
src_filename = "%s\\source.txt" % smb_share
dst_filename = "%s\\directory" % smb_share
mkdir(dst_filename)
with open_file(src_filename, mode='w') as fd:
fd.write(u"content")
actual = copy(src_filename, dst_filename)
assert actual == ntpath.join(dst_filename, "source.txt")
with open_file("%s\\source.txt" % dst_filename) as fd:
assert fd.read() == u"content"
src_stat = smbclient_stat(src_filename)
actual = smbclient_stat("%s\\source.txt" % dst_filename)
assert actual.st_atime != src_stat.st_atime
assert actual.st_mtime != src_stat.st_mtime
assert actual.st_ctime != src_stat.st_ctime
assert actual.st_chgtime != src_stat.st_chgtime
assert actual.st_file_attributes & FileAttributes.FILE_ATTRIBUTE_READONLY == 0
示例2
def test_copy2_with_dir_as_target(smb_share):
src_filename = "%s\\source.txt" % smb_share
dst_filename = "%s\\directory" % smb_share
mkdir(dst_filename)
with open_file(src_filename, mode='w') as fd:
fd.write(u"content")
utime(src_filename, times=(1024, 1024))
actual = copy2(src_filename, dst_filename)
assert actual == ntpath.join(dst_filename, "source.txt")
with open_file("%s\\source.txt" % dst_filename) as fd:
assert fd.read() == u"content"
src_stat = smbclient_stat(src_filename)
actual = smbclient_stat("%s\\source.txt" % dst_filename)
assert actual.st_atime == 1024
assert actual.st_mtime == 1024
assert actual.st_ctime != src_stat.st_ctime
assert actual.st_chgtime != src_stat.st_chgtime
assert actual.st_file_attributes & FileAttributes.FILE_ATTRIBUTE_READONLY == 0
示例3
def test_link_to_file(smb_share):
file_data = u"content"
link_src = ntpath.join(smb_share, 'src.txt')
with smbclient.open_file(link_src, mode='w') as fd:
fd.write(file_data)
link_dst = ntpath.join(smb_share, 'dst.txt')
smbclient.link(link_src, link_dst)
with smbclient.open_file(link_dst, mode='r') as fd:
actual_data = fd.read()
assert actual_data == file_data
src_stat = smbclient.stat(link_src)
dst_stat = smbclient.stat(link_dst)
assert src_stat.st_ino == dst_stat.st_ino
assert src_stat.st_dev == dst_stat.st_dev
assert src_stat.st_nlink == 2
assert dst_stat.st_nlink == 2
示例4
def test_lstat_on_file(smb_share):
filename = ntpath.join(smb_share, 'file.txt')
with smbclient.open_file(filename, mode='w') as fd:
fd.write(u"Content")
actual = smbclient.lstat(filename)
assert isinstance(actual, smbclient.SMBStatResult)
assert actual.st_atime == actual.st_atime_ns / 1000000000
assert actual.st_mtime == actual.st_mtime_ns / 1000000000
assert actual.st_ctime == actual.st_ctime_ns / 1000000000
assert actual.st_chgtime == actual.st_chgtime_ns / 1000000000
assert actual.st_dev is not None
assert actual.st_file_attributes == FileAttributes.FILE_ATTRIBUTE_ARCHIVE
assert actual.st_gid == 0
assert actual.st_uid == 0
assert actual.st_ino is not None
assert actual.st_mode == stat.S_IFREG | 0o666
assert actual.st_nlink == 1
assert actual.st_size == 7
assert actual.st_uid == 0
assert actual.st_reparse_tag == 0
示例5
def test_lstat_on_dir(smb_share):
dirname = ntpath.join(smb_share, 'dir')
smbclient.mkdir(dirname)
actual = smbclient.lstat(dirname)
assert isinstance(actual, smbclient.SMBStatResult)
assert actual.st_atime == actual.st_atime_ns / 1000000000
assert actual.st_mtime == actual.st_mtime_ns / 1000000000
assert actual.st_ctime == actual.st_ctime_ns / 1000000000
assert actual.st_chgtime == actual.st_chgtime_ns / 1000000000
assert actual.st_dev is not None
assert actual.st_file_attributes == FileAttributes.FILE_ATTRIBUTE_DIRECTORY
assert actual.st_gid == 0
assert actual.st_uid == 0
assert actual.st_ino is not None
assert actual.st_mode == stat.S_IFDIR | 0o777
assert actual.st_nlink == 1
assert actual.st_size == 0
assert actual.st_uid == 0
assert actual.st_reparse_tag == 0
示例6
def test_scandir_large(smb_share):
dir_path = ntpath.join(smb_share, 'directory')
# Create lots of directories with the maximum name possible to ensure they won't be returned in 1 request.
smbclient.mkdir(dir_path)
for i in range(150):
dirname = str(i).zfill(255)
smbclient.mkdir(ntpath.join(smb_share, 'directory', dirname))
actual = []
for entry in smbclient.scandir(dir_path):
actual.append(entry.path)
# Just a test optimisation, remove all the dirs so we don't have to re-enumerate them again in rmtree.
for path in actual:
smbclient.rmdir(path)
assert len(actual) == 150
示例7
def export_protos(destination):
"""Exports the compiled protos for the bundle.
The engine initialization process has already built all protos and made them
importable as `PB`. We rely on `PB.__path__` because this allows the
`--proto-override` flag to work.
Args:
* repo (RecipeRepo) - The repo to export.
* destination (str) - The absolute path we're exporting to (we'll export to
a subfolder `_pb/PB`).
"""
shutil.copytree(
PB_PATH[0], # root of generated PB folder.
os.path.join(destination, '_pb', 'PB'),
ignore=lambda _base, names: [n for n in names if n.endswith('.pyc')],
)
示例8
def __executeRemote(self, data):
self.__tmpServiceName = ''.join([random.choice(string.letters) for _ in range(8)]).encode('utf-16le')
command = self.__shell + 'echo ' + data + ' ^> ' + self.__output + ' > ' + self.__batchFile + ' & ' + \
self.__shell + self.__batchFile
command += ' & ' + 'del ' + self.__batchFile
self.__serviceDeleted = False
resp = scmr.hRCreateServiceW(self.__scmr, self.__scManagerHandle, self.__tmpServiceName, self.__tmpServiceName,
lpBinaryPathName=command)
service = resp['lpServiceHandle']
try:
scmr.hRStartServiceW(self.__scmr, service)
except:
pass
scmr.hRDeleteService(self.__scmr, service)
self.__serviceDeleted = True
scmr.hRCloseServiceHandle(self.__scmr, service)
示例9
def transformKey(self, InputKey):
# Section 2.2.11.1.2 Encrypting a 64-Bit Block with a 7-Byte Key
OutputKey = []
OutputKey.append( chr(ord(InputKey[0]) >> 0x01) )
OutputKey.append( chr(((ord(InputKey[0])&0x01)<<6) | (ord(InputKey[1])>>2)) )
OutputKey.append( chr(((ord(InputKey[1])&0x03)<<5) | (ord(InputKey[2])>>3)) )
OutputKey.append( chr(((ord(InputKey[2])&0x07)<<4) | (ord(InputKey[3])>>4)) )
OutputKey.append( chr(((ord(InputKey[3])&0x0F)<<3) | (ord(InputKey[4])>>5)) )
OutputKey.append( chr(((ord(InputKey[4])&0x1F)<<2) | (ord(InputKey[5])>>6)) )
OutputKey.append( chr(((ord(InputKey[5])&0x3F)<<1) | (ord(InputKey[6])>>7)) )
OutputKey.append( chr(ord(InputKey[6]) & 0x7F) )
for i in range(8):
OutputKey[i] = chr((ord(OutputKey[i]) << 1) & 0xfe)
return "".join(OutputKey)
示例10
def getHBootKey(self):
LOG.debug('Calculating HashedBootKey from SAM')
QWERTY = "!@#$%^&*()qwertyUIOPAzxcvbnmQQQQQQQQQQQQ)(*@&%\0"
DIGITS = "0123456789012345678901234567890123456789\0"
F = self.getValue(ntpath.join('SAM\Domains\Account','F'))[1]
domainData = DOMAIN_ACCOUNT_F(F)
rc4Key = self.MD5(domainData['Key0']['Salt'] + QWERTY + self.__bootKey + DIGITS)
rc4 = ARC4.new(rc4Key)
self.__hashedBootKey = rc4.encrypt(domainData['Key0']['Key']+domainData['Key0']['CheckSum'])
# Verify key with checksum
checkSum = self.MD5( self.__hashedBootKey[:16] + DIGITS + self.__hashedBootKey[:16] + QWERTY)
if checkSum != self.__hashedBootKey[16:]:
raise Exception('hashedBootKey CheckSum failed, Syskey startup password probably in use! :(')
示例11
def __getInterface(self, interface, resp):
# Now let's parse the answer and build an Interface instance
objRefType = OBJREF(''.join(resp))['flags']
objRef = None
if objRefType == FLAGS_OBJREF_CUSTOM:
objRef = OBJREF_CUSTOM(''.join(resp))
elif objRefType == FLAGS_OBJREF_HANDLER:
objRef = OBJREF_HANDLER(''.join(resp))
elif objRefType == FLAGS_OBJREF_STANDARD:
objRef = OBJREF_STANDARD(''.join(resp))
elif objRefType == FLAGS_OBJREF_EXTENDED:
objRef = OBJREF_EXTENDED(''.join(resp))
else:
logging.error("Unknown OBJREF Type! 0x%x" % objRefType)
return IRemUnknown2(
INTERFACE(interface.get_cinstance(), None, interface.get_ipidRemUnknown(), objRef['std']['ipid'],
oxid=objRef['std']['oxid'], oid=objRef['std']['oxid'],
target=interface.get_target()))
示例12
def transformKey(self, InputKey):
# Section 2.2.11.1.2 Encrypting a 64-Bit Block with a 7-Byte Key
OutputKey = []
OutputKey.append( chr(ord(InputKey[0]) >> 0x01) )
OutputKey.append( chr(((ord(InputKey[0])&0x01)<<6) | (ord(InputKey[1])>>2)) )
OutputKey.append( chr(((ord(InputKey[1])&0x03)<<5) | (ord(InputKey[2])>>3)) )
OutputKey.append( chr(((ord(InputKey[2])&0x07)<<4) | (ord(InputKey[3])>>4)) )
OutputKey.append( chr(((ord(InputKey[3])&0x0F)<<3) | (ord(InputKey[4])>>5)) )
OutputKey.append( chr(((ord(InputKey[4])&0x1F)<<2) | (ord(InputKey[5])>>6)) )
OutputKey.append( chr(((ord(InputKey[5])&0x3F)<<1) | (ord(InputKey[6])>>7)) )
OutputKey.append( chr(ord(InputKey[6]) & 0x7F) )
for i in range(8):
OutputKey[i] = chr((ord(OutputKey[i]) << 1) & 0xfe)
return "".join(OutputKey)
示例13
def _GetDefines(config):
"""Returns the list of preprocessor definitions for this configuation.
Arguments:
config: The dictionary that defines the special processing to be done
for this configuration.
Returns:
The list of preprocessor definitions.
"""
defines = []
for d in config.get('defines', []):
if type(d) == list:
fd = '='.join([str(dpart) for dpart in d])
else:
fd = str(d)
defines.append(fd)
return defines
示例14
def _ConvertToolsToExpectedForm(tools):
"""Convert tools to a form expected by Visual Studio.
Arguments:
tools: A dictionary of settings; the tool name is the key.
Returns:
A list of Tool objects.
"""
tool_list = []
for tool, settings in tools.items():
# Collapse settings with lists.
settings_fixed = {}
for setting, value in settings.items():
if type(value) == list:
if ((tool == 'VCLinkerTool' and
setting == 'AdditionalDependencies') or
setting == 'AdditionalOptions'):
settings_fixed[setting] = ' '.join(value)
else:
settings_fixed[setting] = ';'.join(value)
else:
settings_fixed[setting] = value
# Add in this tool.
tool_list.append(MSVS.Tool(tool, settings_fixed))
return tool_list
示例15
def _GetCopies(spec):
copies = []
# Add copies.
for cpy in spec.get('copies', []):
for src in cpy.get('files', []):
dst = os.path.join(cpy['destination'], os.path.basename(src))
# _AddCustomBuildToolForMSVS() will call _FixPath() on the inputs and
# outputs, so do the same for our generated command line.
if src.endswith('/'):
src_bare = src[:-1]
base_dir = posixpath.split(src_bare)[0]
outer_dir = posixpath.split(src_bare)[1]
fixed_dst = _FixPath(dst)
full_dst = '"%s\\%s\\"' % (fixed_dst, outer_dir)
cmd = 'mkdir %s 2>nul & cd "%s" && xcopy /e /f /y "%s" %s' % (
full_dst, _FixPath(base_dir), outer_dir, full_dst)
copies.append(([src], ['dummy_copies', dst], cmd,
'Copying %s to %s' % (src, fixed_dst)))
else:
fix_dst = _FixPath(cpy['destination'])
cmd = 'mkdir "%s" 2>nul & set ERRORLEVEL=0 & copy /Y "%s" "%s"' % (
fix_dst, _FixPath(src), _FixPath(dst))
copies.append(([src], [dst], cmd, 'Copying %s to %s' % (src, fix_dst)))
return copies
示例16
def _DictsToFolders(base_path, bucket, flat):
# Convert to folders recursively.
children = []
for folder, contents in bucket.items():
if type(contents) == dict:
folder_children = _DictsToFolders(os.path.join(base_path, folder),
contents, flat)
if flat:
children += folder_children
else:
folder_children = MSVSNew.MSVSFolderEntry(os.path.join(base_path, folder),
name='(' + folder + ')',
entries=folder_children)
children.append(folder_children)
else:
children.append(contents)
return children
示例17
def PerformBuild(data, configurations, params):
options = params['options']
msvs_version = params['msvs_version']
devenv = os.path.join(msvs_version.path, 'Common7', 'IDE', 'devenv.com')
sln_path = ''
for build_file, build_file_dict in data.items():
(build_file_root, build_file_ext) = os.path.splitext(build_file)
if build_file_ext != '.gyp':
continue
sln_path = build_file_root + options.suffix + '.sln'
if options.generator_output:
sln_path = os.path.join(options.generator_output, sln_path)
assert sln_path
for config in configurations:
arguments = [devenv, sln_path, '/Build', config]
print('Building [%s]: %s' % (config, arguments))
subprocess.check_call(arguments)
示例18
def _VerifySourcesExist(sources, root_dir):
"""Verifies that all source files exist on disk.
Checks that all regular source files, i.e. not created at run time,
exist on disk. Missing files cause needless recompilation but no otherwise
visible errors.
Arguments:
sources: A recursive list of Filter/file names.
root_dir: The root directory for the relative path names.
Returns:
A list of source files that cannot be found on disk.
"""
missing_sources = []
for source in sources:
if isinstance(source, MSVS.Filter):
missing_sources.extend(_VerifySourcesExist(source.contents, root_dir))
else:
if '$' not in source:
full_path = os.path.join(root_dir, source)
if not os.path.exists(full_path):
missing_sources.append(full_path)
return missing_sources
示例19
def _copy(src, dst, follow_symlinks, copy_meta_func, **kwargs):
# Need to check if dst is a UNC path before checking if it's a dir in smbclient.path before checking to see if it's
# a local directory. If either one is a dir, join the filename of src onto dst.
if ntpath.normpath(dst).startswith('\\\\') and isdir(dst, **kwargs):
dst = ntpath.join(dst, ntpath.basename(src))
elif os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
copy_meta_func(src, dst, follow_symlinks=follow_symlinks)
return dst
示例20
def test_copy_raises_when_source_and_target_identical_local(tmpdir):
test_dir = tmpdir.mkdir('test').strpath
filename = os.path.join(test_dir, 'file.txt')
with open(filename, mode='w') as fd:
fd.write(u"content")
expected = "are the same file"
with pytest.raises(Exception, match=re.escape(expected)):
copy(filename, filename)
示例21
def test_copy2_raises_when_source_and_target_identical_local(tmpdir):
test_dir = tmpdir.mkdir('test').strpath
filename = os.path.join(test_dir, 'file.txt')
with open(filename, mode='w') as fd:
fd.write(u"content")
expected = "are the same file"
with pytest.raises(Exception, match=re.escape(expected)):
copy2(filename, filename)
示例22
def test_link_existing_file_failed(smb_share):
file_data = u"content"
link_src = ntpath.join(smb_share, 'src.txt')
with smbclient.open_file(link_src, mode='w') as fd:
fd.write(file_data)
link_dst = ntpath.join(smb_share, 'dst.txt')
with smbclient.open_file(link_dst, mode='w') as fd:
fd.write(file_data)
expected = "[NtStatus 0xc0000035] File exists:"
with pytest.raises(OSError, match=re.escape(expected)):
smbclient.link(link_src, link_dst)
示例23
def test_link_missing_src_fail(smb_share):
link_src = ntpath.join(smb_share, 'src.txt')
link_dst = ntpath.join(smb_share, 'dst.txt')
expected = "[NtStatus 0xc0000034] No such file or directory"
with pytest.raises(OSError, match=re.escape(expected)):
smbclient.link(link_src, link_dst)
示例24
def test_listdir_missing(dirpath, ntstatus, smb_share):
expected = "[NtStatus %s] No such file or directory" % ntstatus
with pytest.raises(SMBOSError, match=re.escape(expected)):
smbclient.listdir(ntpath.join(smb_share, dirpath))
示例25
def test_listdir_file_fail(smb_share):
filename = ntpath.join(smb_share, "file.txt")
with smbclient.open_file(filename, mode='w') as fd:
fd.write(u"data")
expected = "[NtStatus 0xc0000103] Not a directory"
with pytest.raises(SMBOSError, match=re.escape(expected)):
smbclient.listdir(filename)
示例26
def test_mkdir(smb_share):
dirname = ntpath.join(smb_share, 'dir')
smbclient.mkdir(dirname)
actual = smbclient.stat(dirname)
assert stat.S_ISDIR(actual.st_mode)
expected = "[NtStatus 0xc0000035] File exists:"
with pytest.raises(SMBOSError, match=re.escape(expected)):
smbclient.mkdir(dirname)
示例27
def test_mkdir_missing_parent_fail(smb_share):
dirname = ntpath.join(smb_share, 'dir', 'subdir')
expected = "[NtStatus 0xc000003a] No such file or directory"
with pytest.raises(SMBOSError, match=re.escape(expected)):
smbclient.mkdir(dirname)
示例28
def test_mkdir_path_is_file_fail(smb_share):
filename = ntpath.join(smb_share, 'test.txt')
with smbclient.open_file(filename, mode='w') as fd:
fd.write(u"content")
expected = "[NtStatus 0xc0000035] File exists:"
with pytest.raises(SMBOSError, match=re.escape(expected)):
smbclient.mkdir(filename)
示例29
def test_makedirs_exist_ok(smb_share):
dirpath = ntpath.join(smb_share, 'folder')
smbclient.makedirs(dirpath)
expected = "[NtStatus 0xc0000035] File exists:"
with pytest.raises(OSError, match=re.escape(expected)):
smbclient.makedirs(dirpath)
smbclient.makedirs(dirpath, exist_ok=True)
示例30
def test_makedirs_missing_parents(smb_share):
dirpath = ntpath.join(smb_share, 'missing', 'missing', 'folder')
smbclient.makedirs(dirpath)
assert stat.S_ISDIR(smbclient.stat(dirpath).st_mode)