windows_util.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # Creates os.path.xislink and os.xreadlink functions
  2. # On Windows these are new
  3. # On other systems they are copied from os.path.islink and os.readlink
  4. import os
  5. import os.path
  6. try:
  7. from win32file import *
  8. from winioctlcon import FSCTL_GET_REPARSE_POINT
  9. __all__ = ['islink', 'readlink']
  10. # Win32file doesn't seem to have this attribute.
  11. FILE_ATTRIBUTE_REPARSE_POINT = 1024
  12. # To make things easier.
  13. REPARSE_FOLDER = (FILE_ATTRIBUTE_DIRECTORY | FILE_ATTRIBUTE_REPARSE_POINT)
  14. # For the parse_reparse_buffer function
  15. SYMBOLIC_LINK = 'symbolic'
  16. MOUNTPOINT = 'mountpoint'
  17. GENERIC = 'generic'
  18. def islink(fpath):
  19. """ Windows islink implementation. """
  20. if GetFileAttributes(fpath) & REPARSE_FOLDER:
  21. return True
  22. return False
  23. def parse_reparse_buffer(original, reparse_type=SYMBOLIC_LINK):
  24. """ Implementing the below in Python:
  25. typedef struct _REPARSE_DATA_BUFFER {
  26. ULONG ReparseTag;
  27. USHORT ReparseDataLength;
  28. USHORT Reserved;
  29. union {
  30. struct {
  31. USHORT SubstituteNameOffset;
  32. USHORT SubstituteNameLength;
  33. USHORT PrintNameOffset;
  34. USHORT PrintNameLength;
  35. ULONG Flags;
  36. WCHAR PathBuffer[1];
  37. } SymbolicLinkReparseBuffer;
  38. struct {
  39. USHORT SubstituteNameOffset;
  40. USHORT SubstituteNameLength;
  41. USHORT PrintNameOffset;
  42. USHORT PrintNameLength;
  43. WCHAR PathBuffer[1];
  44. } MountPointReparseBuffer;
  45. struct {
  46. UCHAR DataBuffer[1];
  47. } GenericReparseBuffer;
  48. } DUMMYUNIONNAME;
  49. } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
  50. """
  51. # Size of our data types
  52. SZULONG = 4 # sizeof(ULONG)
  53. SZUSHORT = 2 # sizeof(USHORT)
  54. # Our structure.
  55. # Probably a better way to iterate a dictionary in a particular order,
  56. # but I was in a hurry, unfortunately, so I used pkeys.
  57. buffer = {
  58. 'tag' : SZULONG,
  59. 'data_length' : SZUSHORT,
  60. 'reserved' : SZUSHORT,
  61. SYMBOLIC_LINK : {
  62. 'substitute_name_offset' : SZUSHORT,
  63. 'substitute_name_length' : SZUSHORT,
  64. 'print_name_offset' : SZUSHORT,
  65. 'print_name_length' : SZUSHORT,
  66. 'flags' : SZULONG,
  67. 'buffer' : u'',
  68. 'pkeys' : [
  69. 'substitute_name_offset',
  70. 'substitute_name_length',
  71. 'print_name_offset',
  72. 'print_name_length',
  73. 'flags',
  74. ]
  75. },
  76. MOUNTPOINT : {
  77. 'substitute_name_offset' : SZUSHORT,
  78. 'substitute_name_length' : SZUSHORT,
  79. 'print_name_offset' : SZUSHORT,
  80. 'print_name_length' : SZUSHORT,
  81. 'buffer' : u'',
  82. 'pkeys' : [
  83. 'substitute_name_offset',
  84. 'substitute_name_length',
  85. 'print_name_offset',
  86. 'print_name_length',
  87. ]
  88. },
  89. GENERIC : {
  90. 'pkeys' : [],
  91. 'buffer': ''
  92. }
  93. }
  94. # Header stuff
  95. buffer['tag'] = original[:SZULONG]
  96. buffer['data_length'] = original[SZULONG:SZUSHORT]
  97. buffer['reserved'] = original[SZULONG+SZUSHORT:SZUSHORT]
  98. original = original[8:]
  99. # Parsing
  100. k = reparse_type
  101. for c in buffer[k]['pkeys']:
  102. if type(buffer[k][c]) == int:
  103. sz = buffer[k][c]
  104. bytes = original[:sz]
  105. buffer[k][c] = 0
  106. for b in bytes:
  107. n = ord(b)
  108. if n:
  109. buffer[k][c] += n
  110. original = original[sz:]
  111. # Using the offset and length's grabbed, we'll set the buffer.
  112. buffer[k]['buffer'] = original
  113. return buffer
  114. def readlink(fpath):
  115. """ Windows readlink implementation. """
  116. # This wouldn't return true if the file didn't exist, as far as I know.
  117. if not islink(fpath):
  118. return None
  119. # Open the file correctly depending on the string type.
  120. handle = CreateFileW(fpath, GENERIC_READ, 0, None, OPEN_EXISTING, FILE_FLAG_OPEN_REPARSE_POINT, 0) \
  121. if type(fpath) == unicode else \
  122. CreateFile(fpath, GENERIC_READ, 0, None, OPEN_EXISTING, FILE_FLAG_OPEN_REPARSE_POINT, 0)
  123. # MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16384 = (16*1024)
  124. buffer = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 16*1024)
  125. # Above will return an ugly string (byte array), so we'll need to parse it.
  126. # But first, we'll close the handle to our file so we're not locking it anymore.
  127. CloseHandle(handle)
  128. # Minimum possible length (assuming that the length of the target is bigger than 0)
  129. if len(buffer) < 9:
  130. return None
  131. # Parse and return our result.
  132. result = parse_reparse_buffer(buffer)
  133. offset = result[SYMBOLIC_LINK]['substitute_name_offset']
  134. ending = offset + result[SYMBOLIC_LINK]['substitute_name_length']
  135. rpath = result[SYMBOLIC_LINK]['buffer'][offset:ending].replace('\x00','')
  136. if len(rpath) > 4 and rpath[0:4] == '\\??\\':
  137. rpath = rpath[4:]
  138. return rpath
  139. os.xreadlink = readlink
  140. os.path.xislink = islink
  141. except ImportError:
  142. os.xreadlink = os.readlink
  143. os.path.xislink = os.path.islink