123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- #!/usr/bin/env python
- """
- Created in Jun 2016
- @author: Harald Voehringer
- """
- from itertools import groupby
- from collections import namedtuple, defaultdict
- import textwrap
- DEBUG = False
- FASTA = namedtuple('FASTA', ['header', 'seq'])
- PDBDATA = namedtuple('PDBDATA', ['entry', 'res', 'rfr', 'comp', 'met'])
- def as_pairs(fasta):
- """ Reads in fasta headers as defined """
- for header, group in groupby(fasta, lambda x: x[0] == '>'):
- if header:
- line = next(group)
- identifier = line.split(' ')[0].split('>')[1]
- header_line = str(line.strip())
- else:
- sequence = ''.join(line.strip() for line in group).upper()
- data = FASTA(header_line, sequence)
- yield identifier, data
-
- def read_fasta(fasta_file):
- """ Reads in fasta sequences with help of the as_pairs function."""
- fasta_dic = dict()
- duplicate = 0
-
- with open(fasta_file) as fasta:
- for title, sequence in as_pairs(fasta):
- if title not in fasta_dic:
- fasta_dic[title] = sequence
- else:
- duplicate +=1
-
- if duplicate != 0:
- print ('! Warning found duplicate {num} fasta.'.format(
- num = duplicate))
-
- return fasta_dic
- def read_fasta_annotations(fname):
- """ Reads the information about resolution, R-free and completness which
- be outputed by cif2fasta."""
- annotations = dict()
- with open(fname) as fh:
- for line in fh:
- if len(line) > 0 and line[0] == '#':
- continue
- identifier, res, r_free, comp, method = line.strip().split('\t')
- try:
- res = float(res)
- except ValueError:
- res = None
- try:
- r_free = float(r_free)
- except ValueError:
- r_free = None
- try:
- comp = float(comp)
- except ValueError:
- comp = None
- annotations[identifier] = PDBDATA(identifier, res, r_free, comp, method)
- return annotations
- def read_cluster(cluster_file):
- """ Reads in the clusters (this is the output of MMseqs). """
- cluster = defaultdict(set)
- with open(cluster_file) as fh:
- for line in fh:
- exemplar, node = line.split()
-
- if node in cluster[exemplar]:
- raise Exception('! Please check the clustering procedure: {n} was found twice in cluster {e}.'.format(
- n = node,
- e = exemplar))
- else:
- cluster[exemplar].add(node)
- return cluster
- def read_pdblist(in_file):
- pdb_list = set()
-
- with open(in_file) as fh:
- for line in fh:
- if line.startswith('#'):
- continue
- strip_line = line.strip()
- pdb_code = strip_line.split('_')[0]
- # this is a very very basic check
- if len(pdb_code) != 4:
- print ('! Warning: {line} seems to be an incorrect identifer. Skipping it.'.format(
- line = strip_line))
- continue
-
- pdb = strip_line.upper()
- pdb_list.add(pdb)
- return pdb_list
- def select_sequences(clusters, annotations):
- selected_sequences = set()
- for idx, cluster in enumerate(clusters):
-
- nodes = [ annotations[entry] for entry in clusters[cluster] ]
-
- if DEBUG:
- print ('Processing Cluster {c} ({i}): {m}'.format(
- c = cluster,
- i = idx,
- m = ', '.join([x.entry for x in nodes])))
- # select the best entries of the cluster
- best_res = float('inf')
- best_entry_res = None
- best_rfr = float('inf')
- best_entry_rfr = None
- best_comp = -float('inf')
- best_entry_comp = None
- # iterate through each entry in nodes while selecting the representative sequence
- for node in nodes:
-
- if (node.res is not None) and (node.res < best_res):
- best_res = node.res
- best_entry_res = node.entry
- if (node.rfr is not None) and (node.rfr < best_rfr):
- best_rfr = node.rfr
- best_entry_rfr = node.entry
- if (node.comp is not None) and (node.comp > best_comp):
- best_comp = node.comp
- best_entry_comp = node.entry
- if best_entry_res is not None:
- selected_sequences.add(best_entry_res)
-
- if DEBUG:
- print (' - Selected {n} (best resolution = {r}).'.format(
- n = best_entry_res,
- r = best_res))
- if best_entry_rfr is not None:
- selected_sequences.add(best_entry_rfr)
-
- if DEBUG:
- print (' - Selected {n} (best R-free = {r}).'.format(
- n = best_entry_rfr,
- r = best_rfr))
-
- if best_entry_comp is not None:
- selected_sequences.add(best_entry_comp)
-
- if DEBUG:
- print (' - Selected {n} (best completness = {r}).'.format(
- n = best_entry_comp,
- r = best_comp))
- if best_entry_res == None and best_entry_rfr == None and best_entry_comp == None:
- print ('! Warning: Did not find any representative entry for cluster {c}.'.format(
- c = cluster))
-
- return selected_sequences
- def write_sequences(out_file, fasta_db, selected_sequences):
- """ Writes selected sequences to a fasta file."""
- with open(out_file, 'w') as fh:
- for seq in selected_sequences:
-
- fasta_entry = '{h}\n{seq}\n'.format(
- h = fasta_db[seq].header,
- seq = '\n'.join(textwrap.wrap(fasta_db[seq].seq, 80)))
-
- fh.write(fasta_entry)
- def arg():
- import argparse
- description = """
- pdbfilter.py selects from sequence clusters (determined by MMSeqs) the sequences
- which have the best resolution, R-free factor and/or completness and writes them to a fasta file.
- """.replace('\n', '')
-
- epilog = '2016 Harald Voehringer'
- # Initiate a ArgumentParser Class
- parser = argparse.ArgumentParser(description = description, epilog = epilog)
-
- # Call add_options to the parser
- parser.add_argument('fasta', help = 'input fasta file (created by cif2fasta.py)', metavar = 'FILE')
- parser.add_argument('cluster', help = 'sequence clusters (MMseqs)', metavar = 'FILE')
- parser.add_argument('annotations', help = 'annotations file (created by cif2fasta using the -p flag, contains information about resolution, R-free and completness of sequences).', metavar = 'FILE')
- parser.add_argument('out_file', help = 'output fasta file', metavar = 'FILE')
- parser.add_argument('-i', '--include', help = 'include PDB chains', metavar = 'FILE')
- parser.add_argument('-r', '--remove', help = 'exclude PDB chains', metavar = 'FILE')
- parser.add_argument('-v', '--verbose', action = 'store_true', help = 'verbose mode')
- return parser
- def main():
- import sys
- parser = arg()
- args = parser.parse_args(sys.argv[1:])
- global DEBUG
-
- if args.verbose:
- DEBUG = True
- fasta = read_fasta(args.fasta)
- clu70 = read_cluster(args.cluster)
- annot = read_fasta_annotations(args.annotations)
- print ("Found {i} clusters.".format(
- i = len(clu70.keys())))
- # choose representative sequences from clusters
- selection = select_sequences(clu70, annot)
- # make sure that pdbs specified in the argument are included
- if args.include:
- to_include = read_pdblist(args.include)
- for pdb in to_include:
- if pdb in fasta.keys():
- if pdb not in selection:
- if DEBUG:
- print ('Adding {p}.'.format(
- p = pdb))
- selection.add(pdb)
- else:
- print ('! Warning: {p} was not found in input fasta.'.format(
- p = pdb))
- # removes entries
- if args.remove:
- to_remove = read_pdblist(args.remove)
- for pdb in to_remove:
- if pdb in selection:
- if DEBUG:
- print ('Removing {p}.'.format(
- p = pdb))
- selection.remove(pdb)
-
- # write them to file
- write_sequences(args.out_file, fasta, selection)
- if __name__ == "__main__":
- main()
|