123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- #!/usr/bin/env python
- class A3MFormatError(Exception):
- def __init__(self, value):
- self.value = "ERROR: "+value
- def __str__(self):
- return repr(self.value)
- class A3M_Container:
- RESIDUES = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- VALID_MATCH_STATES = set(RESIDUES)
- VALID_INSERTION_STATES = set(RESIDUES.lower())
- VALID_GAP_STATES = set("-.")
- VALID_SS_CONF_STATES = set("0123456789")
- VALID_SS_STATES = set("ECH")
- VALID_DSSP_STATES = set("CHBEGITS-")
- def __init__(self):
- self.header = None
- self.annotations = dict()
- self.consensus = None
- self.sequences = []
- self.nr_match_states = None
- @property
- def number_sequences(self):
- """get the current number of protein sequences"""
- return len(self.sequences)
- def check_and_add_sequence(self, header, sequence):
- try:
- if (not self.check_and_add_annotation(header, sequence) and
- not self.check_and_add_consensus(header, sequence)):
- self.check_sequence(sequence)
- self.sequences.append((header, sequence))
- except A3MFormatError as e:
- raise e
- def check_and_add_consensus(self, header, sequence):
- header_name = header[1:].split()[0]
- if header_name.endswith("_consensus"):
- if self.consensus:
- raise A3MFormatError("Multiple definitions of consensus!")
- else:
- self.check_sequence(sequence)
- self.consensus = (header, sequence)
- return True
- else:
- return False
- def check_and_add_annotation(self, header, sequence):
- annotation_classes = [
- ("ss_conf", self.check_ss_conf),
- ("ss_pred", self.check_ss_pred),
- ("ss_dssp", self.check_dssp)
- ]
- for (annotation_name, check) in annotation_classes:
- if(header[1:].startswith(annotation_name)):
- if(annotation_name in self.annotations):
- raise A3MFormatError(
- "Multiple definitions of {}!".format(annotation_name)
- )
- elif check(sequence):
- self.annotations[annotation_name] = sequence
- return True
- return False
- def check_match_states(self, match_states):
- if not self.nr_match_states:
- self.nr_match_states = match_states
- if match_states == 0:
- raise A3MFormatError("Sequence with zero match states!")
- elif match_states != self.nr_match_states:
- raise A3MFormatError(
- ("Sequence with diverging number "
- "of match states ({} vs. {})!").format(
- match_states,
- self.nr_match_states
- )
- )
- def check_ss_conf(self, sequence):
- count_match_states = sum((c in self.VALID_SS_CONF_STATES
- or c in self.VALID_GAP_STATES)
- for c in sequence)
- self.check_match_states(count_match_states)
- invalid_states = set(sequence) - self.VALID_SS_CONF_STATES
- invalid_states -= self.VALID_GAP_STATES
- if len(invalid_states):
- raise A3MFormatError(
- ("Undefined character(s) '{}' in predicted "
- "secondary structure confidence!").format(invalid_states))
- else:
- return True
- def check_ss_pred(self, sequence):
- count_match_states = sum((c in self.VALID_SS_STATES
- or c in self.VALID_GAP_STATES)
- for c in sequence)
- self.check_match_states(count_match_states)
- invalid_states = set(sequence) - self.VALID_SS_STATES
- invalid_states -= self.VALID_GAP_STATES
- if len(invalid_states):
- raise A3MFormatError(
- ("Undefined character(s) '{}' in predicted "
- "secondary structure!").format(invalid_states))
- else:
- return True
- def check_dssp(self, sequence):
- count_match_states = sum(
- (c in self.VALID_DSSP_STATES) for c in sequence)
- self.check_match_states(count_match_states)
- invalid_states = set(sequence) - self.VALID_DSSP_STATES
- if len(invalid_states):
- raise A3MFormatError(
- ("Undefined character(s) '{}' in "
- "dssp annotation!").format(invalid_states))
- else:
- return True
- def check_sequence(self, sequence):
- count_match_states = sum((c in self.VALID_MATCH_STATES
- or c in self.VALID_GAP_STATES)
- for c in sequence)
- self.check_match_states(count_match_states)
- invalid_states = set(sequence) - self.VALID_MATCH_STATES
- invalid_states -= self.VALID_GAP_STATES
- invalid_states -= self.VALID_INSERTION_STATES
- if len(invalid_states):
- raise A3MFormatError(
- ("Undefined character(s) '{}' in "
- "protein sequence!").format(invalid_states))
- else:
- return True
- def get_sub_sequence(self, sequence, limits):
- sub_sequence = []
- for (start, end) in limits:
- start_pos = 0
- pos = -1
- for i in range(len(sequence)):
- if (sequence[i] in self.VALID_MATCH_STATES or
- sequence[i] in self.VALID_GAP_STATES):
- pos += 1
- if pos + 1 == start:
- start_pos = i
- break
- end_pos = 0
- pos = -1
- for i in range(len(sequence)):
- if (sequence[i] in self.VALID_MATCH_STATES or
- sequence[i] in self.VALID_GAP_STATES):
- pos += 1
- if pos + 1 == end:
- end_pos = i
- break
- sub_sequence.append(sequence[start_pos:end_pos+1])
- return "".join(sub_sequence)
- def __str__(self):
- content = []
- if self.header:
- content.append(self.header)
- if self.consensus:
- content.append(self.consensus[0])
- content.append(self.consensus[1])
- for (header, sequence) in self.sequences:
- content.append(header)
- content.append(sequence)
- return "\n".join(content)
- def split_a3m(self, limits):
- new_a3m = A3M_Container()
- if self.consensus:
- new_consensus_sequence = self.get_sub_sequence(self.consensus[1],
- limits)
- new_a3m.consensus = (self.consensus[0], new_consensus_sequence)
- for (header, sequence) in self.sequences:
- new_sequence = self.get_sub_sequence(sequence, limits)
- new_a3m.sequences.append((header, new_sequence))
- return new_a3m
- def read_a3m(self, fh):
- lines = fh.readlines()
- self.read_a3m_from_lines(lines)
- fh.close()
- def read_a3m_from_lines(self, lines):
- sequence_header = None
- sequence = []
- is_first_line = True
- for line in lines:
- line = line.strip()
- if len(line) == 0:
- continue
- elif line[0] == "#":
- if is_first_line:
- self.header = line
- elif line[0] == ">":
- if sequence_header:
- self.check_and_add_sequence(sequence_header,
- "".join(sequence))
- sequence = []
- sequence_header = line.rstrip()
- else:
- sequence.append(line.strip().strip("\x00"))
- is_first_line = False
- if sequence_header:
- self.check_and_add_sequence(sequence_header, "".join(sequence))
|