a3m.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #!/usr/bin/env python
  2. class A3MFormatError(Exception):
  3. def __init__(self, value):
  4. self.value = "ERROR: "+value
  5. def __str__(self):
  6. return repr(self.value)
  7. class A3M_Container:
  8. RESIDUES = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  9. VALID_MATCH_STATES = set(RESIDUES)
  10. VALID_INSERTION_STATES = set(RESIDUES.lower())
  11. VALID_GAP_STATES = set("-.")
  12. VALID_SS_CONF_STATES = set("0123456789")
  13. VALID_SS_STATES = set("ECH")
  14. VALID_DSSP_STATES = set("CHBEGITS-")
  15. def __init__(self):
  16. self.header = None
  17. self.annotations = dict()
  18. self.consensus = None
  19. self.sequences = []
  20. self.nr_match_states = None
  21. @property
  22. def number_sequences(self):
  23. """get the current number of protein sequences"""
  24. return len(self.sequences)
  25. def check_and_add_sequence(self, header, sequence):
  26. try:
  27. if (not self.check_and_add_annotation(header, sequence) and
  28. not self.check_and_add_consensus(header, sequence)):
  29. self.check_sequence(sequence)
  30. self.sequences.append((header, sequence))
  31. except A3MFormatError as e:
  32. raise e
  33. def check_and_add_consensus(self, header, sequence):
  34. header_name = header[1:].split()[0]
  35. if header_name.endswith("_consensus"):
  36. if self.consensus:
  37. raise A3MFormatError("Multiple definitions of consensus!")
  38. else:
  39. self.check_sequence(sequence)
  40. self.consensus = (header, sequence)
  41. return True
  42. else:
  43. return False
  44. def check_and_add_annotation(self, header, sequence):
  45. annotation_classes = [
  46. ("ss_conf", self.check_ss_conf),
  47. ("ss_pred", self.check_ss_pred),
  48. ("ss_dssp", self.check_dssp)
  49. ]
  50. for (annotation_name, check) in annotation_classes:
  51. if(header[1:].startswith(annotation_name)):
  52. if(annotation_name in self.annotations):
  53. raise A3MFormatError(
  54. "Multiple definitions of {}!".format(annotation_name)
  55. )
  56. elif check(sequence):
  57. self.annotations[annotation_name] = sequence
  58. return True
  59. return False
  60. def check_match_states(self, match_states):
  61. if not self.nr_match_states:
  62. self.nr_match_states = match_states
  63. if match_states == 0:
  64. raise A3MFormatError("Sequence with zero match states!")
  65. elif match_states != self.nr_match_states:
  66. raise A3MFormatError(
  67. ("Sequence with diverging number "
  68. "of match states ({} vs. {})!").format(
  69. match_states,
  70. self.nr_match_states
  71. )
  72. )
  73. def check_ss_conf(self, sequence):
  74. count_match_states = sum((c in self.VALID_SS_CONF_STATES
  75. or c in self.VALID_GAP_STATES)
  76. for c in sequence)
  77. self.check_match_states(count_match_states)
  78. invalid_states = set(sequence) - self.VALID_SS_CONF_STATES
  79. invalid_states -= self.VALID_GAP_STATES
  80. if len(invalid_states):
  81. raise A3MFormatError(
  82. ("Undefined character(s) '{}' in predicted "
  83. "secondary structure confidence!").format(invalid_states))
  84. else:
  85. return True
  86. def check_ss_pred(self, sequence):
  87. count_match_states = sum((c in self.VALID_SS_STATES
  88. or c in self.VALID_GAP_STATES)
  89. for c in sequence)
  90. self.check_match_states(count_match_states)
  91. invalid_states = set(sequence) - self.VALID_SS_STATES
  92. invalid_states -= self.VALID_GAP_STATES
  93. if len(invalid_states):
  94. raise A3MFormatError(
  95. ("Undefined character(s) '{}' in predicted "
  96. "secondary structure!").format(invalid_states))
  97. else:
  98. return True
  99. def check_dssp(self, sequence):
  100. count_match_states = sum(
  101. (c in self.VALID_DSSP_STATES) for c in sequence)
  102. self.check_match_states(count_match_states)
  103. invalid_states = set(sequence) - self.VALID_DSSP_STATES
  104. if len(invalid_states):
  105. raise A3MFormatError(
  106. ("Undefined character(s) '{}' in "
  107. "dssp annotation!").format(invalid_states))
  108. else:
  109. return True
  110. def check_sequence(self, sequence):
  111. count_match_states = sum((c in self.VALID_MATCH_STATES
  112. or c in self.VALID_GAP_STATES)
  113. for c in sequence)
  114. self.check_match_states(count_match_states)
  115. invalid_states = set(sequence) - self.VALID_MATCH_STATES
  116. invalid_states -= self.VALID_GAP_STATES
  117. invalid_states -= self.VALID_INSERTION_STATES
  118. if len(invalid_states):
  119. raise A3MFormatError(
  120. ("Undefined character(s) '{}' in "
  121. "protein sequence!").format(invalid_states))
  122. else:
  123. return True
  124. def get_sub_sequence(self, sequence, limits):
  125. sub_sequence = []
  126. for (start, end) in limits:
  127. start_pos = 0
  128. pos = -1
  129. for i in range(len(sequence)):
  130. if (sequence[i] in self.VALID_MATCH_STATES or
  131. sequence[i] in self.VALID_GAP_STATES):
  132. pos += 1
  133. if pos + 1 == start:
  134. start_pos = i
  135. break
  136. end_pos = 0
  137. pos = -1
  138. for i in range(len(sequence)):
  139. if (sequence[i] in self.VALID_MATCH_STATES or
  140. sequence[i] in self.VALID_GAP_STATES):
  141. pos += 1
  142. if pos + 1 == end:
  143. end_pos = i
  144. break
  145. sub_sequence.append(sequence[start_pos:end_pos+1])
  146. return "".join(sub_sequence)
  147. def __str__(self):
  148. content = []
  149. if self.header:
  150. content.append(self.header)
  151. if self.consensus:
  152. content.append(self.consensus[0])
  153. content.append(self.consensus[1])
  154. for (header, sequence) in self.sequences:
  155. content.append(header)
  156. content.append(sequence)
  157. return "\n".join(content)
  158. def split_a3m(self, limits):
  159. new_a3m = A3M_Container()
  160. if self.consensus:
  161. new_consensus_sequence = self.get_sub_sequence(self.consensus[1],
  162. limits)
  163. new_a3m.consensus = (self.consensus[0], new_consensus_sequence)
  164. for (header, sequence) in self.sequences:
  165. new_sequence = self.get_sub_sequence(sequence, limits)
  166. new_a3m.sequences.append((header, new_sequence))
  167. return new_a3m
  168. def read_a3m(self, fh):
  169. lines = fh.readlines()
  170. self.read_a3m_from_lines(lines)
  171. fh.close()
  172. def read_a3m_from_lines(self, lines):
  173. sequence_header = None
  174. sequence = []
  175. is_first_line = True
  176. for line in lines:
  177. line = line.strip()
  178. if len(line) == 0:
  179. continue
  180. elif line[0] == "#":
  181. if is_first_line:
  182. self.header = line
  183. elif line[0] == ">":
  184. if sequence_header:
  185. self.check_and_add_sequence(sequence_header,
  186. "".join(sequence))
  187. sequence = []
  188. sequence_header = line.rstrip()
  189. else:
  190. sequence.append(line.strip().strip("\x00"))
  191. is_first_line = False
  192. if sequence_header:
  193. self.check_and_add_sequence(sequence_header, "".join(sequence))