#!/usr/bin/env python """ Created in Jun 2016 @author: Harald Voehringer """ from itertools import groupby from collections import namedtuple, defaultdict import textwrap DEBUG = False FASTA = namedtuple('FASTA', ['header', 'seq']) PDBDATA = namedtuple('PDBDATA', ['entry', 'res', 'rfr', 'comp', 'met']) def as_pairs(fasta): """ Reads in fasta headers as defined """ for header, group in groupby(fasta, lambda x: x[0] == '>'): if header: line = next(group) identifier = line.split(' ')[0].split('>')[1] header_line = str(line.strip()) else: sequence = ''.join(line.strip() for line in group).upper() data = FASTA(header_line, sequence) yield identifier, data def read_fasta(fasta_file): """ Reads in fasta sequences with help of the as_pairs function.""" fasta_dic = dict() duplicate = 0 with open(fasta_file) as fasta: for title, sequence in as_pairs(fasta): if title not in fasta_dic: fasta_dic[title] = sequence else: duplicate +=1 if duplicate != 0: print ('! Warning found duplicate {num} fasta.'.format( num = duplicate)) return fasta_dic def read_fasta_annotations(fname): """ Reads the information about resolution, R-free and completness which be outputed by cif2fasta.""" annotations = dict() with open(fname) as fh: for line in fh: if len(line) > 0 and line[0] == '#': continue identifier, res, r_free, comp, method = line.strip().split('\t') try: res = float(res) except ValueError: res = None try: r_free = float(r_free) except ValueError: r_free = None try: comp = float(comp) except ValueError: comp = None annotations[identifier] = PDBDATA(identifier, res, r_free, comp, method) return annotations def read_cluster(cluster_file): """ Reads in the clusters (this is the output of MMseqs). """ cluster = defaultdict(set) with open(cluster_file) as fh: for line in fh: exemplar, node = line.split() if node in cluster[exemplar]: raise Exception('! Please check the clustering procedure: {n} was found twice in cluster {e}.'.format( n = node, e = exemplar)) else: cluster[exemplar].add(node) return cluster def read_pdblist(in_file): pdb_list = set() with open(in_file) as fh: for line in fh: if line.startswith('#'): continue strip_line = line.strip() pdb_code = strip_line.split('_')[0] # this is a very very basic check if len(pdb_code) != 4: print ('! Warning: {line} seems to be an incorrect identifer. Skipping it.'.format( line = strip_line)) continue pdb = strip_line.upper() pdb_list.add(pdb) return pdb_list def select_sequences(clusters, annotations): selected_sequences = set() for idx, cluster in enumerate(clusters): nodes = [ annotations[entry] for entry in clusters[cluster] ] if DEBUG: print ('Processing Cluster {c} ({i}): {m}'.format( c = cluster, i = idx, m = ', '.join([x.entry for x in nodes]))) # select the best entries of the cluster best_res = float('inf') best_entry_res = None best_rfr = float('inf') best_entry_rfr = None best_comp = -float('inf') best_entry_comp = None # iterate through each entry in nodes while selecting the representative sequence for node in nodes: if (node.res is not None) and (node.res < best_res): best_res = node.res best_entry_res = node.entry if (node.rfr is not None) and (node.rfr < best_rfr): best_rfr = node.rfr best_entry_rfr = node.entry if (node.comp is not None) and (node.comp > best_comp): best_comp = node.comp best_entry_comp = node.entry if best_entry_res is not None: selected_sequences.add(best_entry_res) if DEBUG: print (' - Selected {n} (best resolution = {r}).'.format( n = best_entry_res, r = best_res)) if best_entry_rfr is not None: selected_sequences.add(best_entry_rfr) if DEBUG: print (' - Selected {n} (best R-free = {r}).'.format( n = best_entry_rfr, r = best_rfr)) if best_entry_comp is not None: selected_sequences.add(best_entry_comp) if DEBUG: print (' - Selected {n} (best completness = {r}).'.format( n = best_entry_comp, r = best_comp)) if best_entry_res == None and best_entry_rfr == None and best_entry_comp == None: print ('! Warning: Did not find any representative entry for cluster {c}.'.format( c = cluster)) return selected_sequences def write_sequences(out_file, fasta_db, selected_sequences): """ Writes selected sequences to a fasta file.""" with open(out_file, 'w') as fh: for seq in selected_sequences: fasta_entry = '{h}\n{seq}\n'.format( h = fasta_db[seq].header, seq = '\n'.join(textwrap.wrap(fasta_db[seq].seq, 80))) fh.write(fasta_entry) def arg(): import argparse description = """ pdbfilter.py selects from sequence clusters (determined by MMSeqs) the sequences which have the best resolution, R-free factor and/or completness and writes them to a fasta file. """.replace('\n', '') epilog = '2016 Harald Voehringer' # Initiate a ArgumentParser Class parser = argparse.ArgumentParser(description = description, epilog = epilog) # Call add_options to the parser parser.add_argument('fasta', help = 'input fasta file (created by cif2fasta.py)', metavar = 'FILE') parser.add_argument('cluster', help = 'sequence clusters (MMseqs)', metavar = 'FILE') parser.add_argument('annotations', help = 'annotations file (created by cif2fasta using the -p flag, contains information about resolution, R-free and completness of sequences).', metavar = 'FILE') parser.add_argument('out_file', help = 'output fasta file', metavar = 'FILE') parser.add_argument('-i', '--include', help = 'include PDB chains', metavar = 'FILE') parser.add_argument('-r', '--remove', help = 'exclude PDB chains', metavar = 'FILE') parser.add_argument('-v', '--verbose', action = 'store_true', help = 'verbose mode') return parser def main(): import sys parser = arg() args = parser.parse_args(sys.argv[1:]) global DEBUG if args.verbose: DEBUG = True fasta = read_fasta(args.fasta) clu70 = read_cluster(args.cluster) annot = read_fasta_annotations(args.annotations) print ("Found {i} clusters.".format( i = len(clu70.keys()))) # choose representative sequences from clusters selection = select_sequences(clu70, annot) # make sure that pdbs specified in the argument are included if args.include: to_include = read_pdblist(args.include) for pdb in to_include: if pdb in fasta.keys(): if pdb not in selection: if DEBUG: print ('Adding {p}.'.format( p = pdb)) selection.add(pdb) else: print ('! Warning: {p} was not found in input fasta.'.format( p = pdb)) # removes entries if args.remove: to_remove = read_pdblist(args.remove) for pdb in to_remove: if pdb in selection: if DEBUG: print ('Removing {p}.'.format( p = pdb)) selection.remove(pdb) # write them to file write_sequences(args.out_file, fasta, selection) if __name__ == "__main__": main()