#! /usr/bin/env python """IO module for sequence operations $Id: SeqIO.py,v 1.1 2004/10/10 02:53:09 nghoffma Exp $""" import string, Seq, cPickle, sys, re, types, Patterns, random, traceback, os from copy import deepcopy from Utility import inFile from Dictionaries import * #HISTORY: # #01Sep10 wr started complete reimplementation of SeqIO to improve handling; retain # all speed improving implementations from original SequenceIO; adapt all functions # to Seq instead of Sequence # #debugging # #this global variable defines the default verbosity #level of all functions in this module from 0 (silent) to #3 (ridiculously verbose) # 010920 NH did some general debugging, added readDnapars function, modified readFasta # to return a dictionary of sequence objects # 010916 NH readFasta: sequence name keys for dictionary output are always lower case # 011017 NH seqToFasta: added argument to replace a set of characters # 011023 NH modified degap to return string on request # 011031 NH debugged degap and toSeq # 011203 WR added BegEnd, writeTable, guessFrame # 020103 NH added tabulate (now returns a dictinary), consensus (uses new tabulate), # encode, encodeSeqList, readFasta no longer demands type argument # 020222 NH readFasta now assumes 'filename ' is a string (name of a file), # then checks is it is an open file object, and exits if neither is true # 020226 NH added readFastaList() # 020228 NH add seqToPhylip, writePhylip # 020610 NH add seqToNexus # 020614 NH add type argument to seqToMSF debugLevel = 0 class NoSeqsFoundError(Exception): pass def invertDict( dict, v=debugLevel ): """ return a dictionary with values and keys switched; be aware that only one of duplicate values will be retained in the inverted dictionary dict dictionary [v] verbosity""" #get list of items in form [(key,value),...] items = dict.items() newdict = {} for i in items: newdict[i[1]] = i[0] if v > 0: print '\n\n*\n*%30s\n*' % 'SeqIO.invertDict' print "entries in (original|inverted) dictionary: (%4i|%4i)" % ( len(dict),len(newdict) ) return newdict def breakLines( s, n, v=debugLevel ): """ introduce linebreaks every |n| characters in the string |s| and returns it [v] verbosity""" brokenStr = '' for i in range( 0, len(s), n): brokenStr = brokenStr + s[i:i+n]+ '\n' if v > 0: print '\n\n*\n*%30s\n*' % 'SeqIO.breakLines' print "length of (original|broken) string: (%4i|%4i)" % ( len(s),len(brokenString) ) nrBreaks = len(brokenStr.count('\n')) print "of these, %3 are linebreaks and %3 are non-linebreaks" % ( nrBreaks, len(brokenString)-nrBreaks ) return brokenStr def wrap(text, width, output='string'): """Output is a string or a list of lines""" text = ' '.join(text.split()) + ' ' lines = [] i = 0 while i != -1: i = text.rfind(' ',0,width) if i == -1: i = max([-1, text.find(' ')]) line = text[:i] text = text[i+1:] lines.append(line) if output == 'list': return [x for x in lines if x.strip()] elif output == 'string': return '\n'.join(lines).strip() def removeWhitespace( s, v=debugLevel ): """ s: string [v] verbosity returns the string |s| with all whitespace characters removed""" removed = ''.join( s.split() ) if v > 0: print '\n\n*\n*%30s\n*' % 'SeqIO.removeWhitespace' print "length of (original|noWhiteSpace) string: (%4i|%4i)" % ( len(s),len(brokenString) ) return removed removeTrailingCommaRexp = re.compile(r',+$') def removeTrailingComma(s, v=debugLevel): return removeTrailingCommaRexp.sub('', s) def removeAllButAlpha(instr): return Patterns.allButAlphaRexp.sub('', instr) def gapstrip( seq , gapchars = Patterns.singleGapsReo, v=debugLevel ): """ seq: string or sequence object [gapchars]: [Patterns.singleGapsReo] or any compiled regular expression; Patterns.singleGapsReo matches r'[-.~]' [v] verbosity Removes characters in |gapchar| expression from |s| and returns a new degapped sequence object or string (depending on object passed).""" #make a slice copy of s to allow handling of strings and sequence #objects s = seq[:] #replace all occurances of the object in the string s if v > 0: print '\n\n*\n*%30s\n*' % 'SeqIO.degap' nr = len( gapchars.findall( s ) ) print 'found %3i gap characters in sequence' % nr s = gapchars.sub( "", s ) if type(seq) == types.StringType: return s else: #make sequence object seqObj = Seq.Seq( name=seq.getName(), seq=s, hea=seq.getHea() ) return seqObj def block(str, blocksize, output='string', rexp=r'\S'): """Returns a string containing space-delimited substrings (or list) of str of length blocksize. Output can be 'string' or 'list'.""" if blocksize < 1: sys.exit('block(): blocksize must be >= 1') elif blocksize > len(str): return str expStr = rexp * blocksize #all nonwhitespace chars exp = re.compile(expStr) lis = exp.findall(str) if len(str) <= blocksize: remainder = (blocksize - len(str))*-1 else: remainder = len(str)%blocksize*-1 if remainder == 0: end = [] else: end = [str[remainder:]] if output == 'string': return string.join(lis + end, ' ') elif output == 'list': return lis + end def seqToFasta( seq, linelength = 60, degap = 0, acc=0, hea=0, begEnd=None, triplets=0, v=debugLevel, replace = {} ): """ returns string in fasta format with hard line breaks every |linelength| charcters seq: sequence object [linelength]: defines length line [degap]: {0|1} if 1, removes gap characters from string [acc]: {0|1} printing of accession number [hea]: {0|1} printing of header [begEnd] 2-tuple containing 0-based beginning and end indices of sequence [triplets] {0|1} truncate sequences, if necessary, to contain a multiple of three characters (codons); this has to be a two tuple if given [v] verbosity """ # put header line together; don't modify name fastaString = ">" + seq.getName() # if acc: # fastaString = fastaString + " Acc:: " + seq.getAcc() # if hea: # fastaString = fastaString + " Hea:: " + seq.getHea() # 030324 NH got rid of Acc:: and Hea:: tags if acc: fastaString = fastaString + ' ' + seq.getAcc() if hea: fastaString = fastaString + ' ' + seq.getHea() fastaString = fastaString + '\n' #if only complete codons are to be written, truncate sequence if neccessary if triplets and len(seq)%3 != 0: seq = seq[0:-(len(seq)%3)] #if begEnd tuple is given, truncate sequence if begEnd != None: seq = seq[begEnd[0]:begEnd[1]] if degap: seq = gapstrip(seq, v=v) # put hard linebreaks in sequence string and append to fastaString seq = breakLines( seq, linelength, v=v ) # 011017 NH: replace a set of characters in the sequence string if replace != {}: for key in replace.keys(): seq = string.replace(seq, key, replace[key]) # add to header line fastaString = fastaString + seq return fastaString def toSeq( str, reo = Patterns.fastaPatternReo, degap=0, v=debugLevel ): """convert a string containing n sequences in fasta format to a list of sequence objects str: lines of a fasta file containing one or more complete sequences reo: [Patterns.fastaPatternReo] or any other valid compiled regular expression with the groups , , and (in that order) and used to extract sequences from |str|. alternatively, expressions with fields and only can be used [degap] {0|1} remove all gap characters [v]: verbosity * return: list of sequence objects * everything of the header line will be in hea field, except for name * only sequences with non-empty sequence fields are included * all whitespace is removed from sequences and names * sequences are made all upper case""" #find all maches seqs = reo.findall( str ) if v > 0: print '\n\n\t*\n\t*%30s\n\t*' % 'SeqIO.toSeq' print '\tIdentifying sequences using the following pattern:\n\t', reo print "\tFound %4i sequences in |str|" % len( seqs ) #generate sequence objects from the list of named group tuples data = [] #if there are three groups, assume they are in order name, hea, seq try: if len( seqs[0] ) == 3: for (SeqName, SeqHea, seqStr) in seqs: SeqName = removeWhitespace( SeqName ) # trailing commas from Pearson format sequence names (ie from sequencher) SeqName = removeTrailingComma( SeqName ) if degap: seqStr = gapstrip( seqStr ) seqStr = removeWhitespace( seqStr ) if v > 1: print "\tname:%-20s first 10 nts: %10s last 10 nts: %10s" (SeqName, Seq[0:10], Seq[-11:] ) #only sequences with non-empty sequence field are included in returned list if seqStr != "": data.append( Seq.Seq( name=SeqName, hea=SeqHea, seq=seqStr.upper() ) ) elif len( seqs[0] ) == 2: for (SeqName, seqStr) in seqs: SeqName = removeWhitespace( SeqName ) # trailing commas from Pearson format sequence names (ie from sequencher) SeqName = removeTrailingComma( SeqName ) if degap: seqStr = gapstrip( seqStr ) seqStr = removeWhitespace( seqStr ) if v > 1: print "\tname:%-20s first 10 nts: %10s last 10 nts: %10s" (SeqName, seqStr[0:10], seqStr[-11:] ) #only sequences with non-empty sequence field are included in returned list if seqStr != "": data.append( Seq.Seq( name=SeqName, seq=seqStr.upper() ) ) except IndexError: raise NoSeqsFoundError #, 'Error in SeqIO.toSeq(): No sequences found!' return data def readFastaList( fastaList, degap=0, v=debugLevel, output='list' ): """Invokes readFasta on a list of filenames or file objects. Returns a concatenated list or dictionary of sequence objects.""" if v > 1: print 'SeqIO.readFastaList: reading the following list of files:' print fastaList if output == 'list': fastaOut = [] elif output == 'dict': fastaOut = {} for f in fastaList: thisFasta = readFasta( f, degap, v, output ) if output == 'list': fastaOut = fastaOut + thisFasta elif output == 'dict': fastaOut.update(thisFasta) return fastaOut def readFasta( filename='', degap=0, v=debugLevel, output='list' ): """ Read a fasta file and return a list of sequence objects filename: filename (if none is given assume standard input) [degap] {0|1} remove all gap characters [v] verbosity [output] { 'list' | 'dict' }; defaults to list""" #[010827]: open fasta file and read lines in increments of 50 sequences # 020222 NH: got rid of 'type' argument - this is goanna cause lots of errors in # other programs! #print 'set v = 3 in SeqIO, line 257' #v = 3 #access file if type(filename) == types.StringType: fileLabel = filename try: fastaFile = open(filename, 'r' ) except IOError, msg: sys.exit('SeqIO.readFasta: error reading %s\n%s' % (filename,msg) ) elif type(filename) == types.FileType: fileLabel = filename.name #filename is not a string; is it an open file? if filename.closed: sys.exit('SeqIO.readFasta: %s is a closed file' % filename) else: fastaFile = filename else: sys.exit('SeqIO.readFasta: %s must either be a string or an open file' % filename) #verbosity if v > 0: print '\n\n*\n*%30s\n*' % 'SeqIO.readFasta' if type(fastaFile) == types.FileType: print 'Reading from file object %s' % fastaFile else: print 'Reading from file', filename seqList = [] #read lines in increments of 50 sequences buffer = [] numberSeq = 0 while 1: line = fastaFile.readline() try: if line.strip()[0] == '#': continue #ignores comment lines except IndexError: pass if line == '' or line == '\n': #end of file reached; find last sequence objects try: seqList = seqList + toSeq( '\n'.join( buffer ), degap=degap, v=v ) except NoSeqsFoundError :#, msg: sys.exit( "No sequences were found in the file '%s'" % fileLabel) break # finding a '>' at the beginning of a sequence elif line.strip()[0] == '>': numberSeq = numberSeq + 1 if v > 2: print line.strip() #add line to the file buffer buffer.append( line ) #if 50 sequences were read in, make sequence objects, reset buffer and numberSeq if numberSeq == 51: if v > 1: print "\tRead in 50 sequences" try: seqList = seqList + toSeq( '\n'.join( buffer ), degap=degap, v=v ) except NoSeqsFoundError :#, msg: sys.exit( "No sequences were found in the file '%s'" % s) buffer = [buffer[-1]] #this contains the first line of the next sequence numberSeq = 1 #close streams if filename != '': fastaFile.close() if v > 0: print 'Found %4s sequences in fasta file' % len( seqList ) if output == 'list': return seqList elif output == 'dict': dictOut = {} for seq in seqList: dictOut[string.lower(seq.getName())] = seq return dictOut class EMBLFormatError(TypeError): pass def writeEMBLLine(key, val, linelength ): lmar = 5 fstr = '%-' + `lmar` + 's%s' strList = [] strLen = linelength-lmar if not val.strip(): strList.append( fstr %(key, ' ') ) else: # word wrap val valLines = wrap(val, strLen, output='list') for line in valLines: strList.append( fstr %(key, line) ) return os.linesep.join(strList) def writeEMBLStr( seqList, linelength=60, insert_blanklines=0 ): l = linelength i = insert_blanklines return os.linesep.join([writeSingleEMBLStr(s,l,i).strip() for s in seqList]) def writeEMBLSeq(seqStr, linelength): lmar = 5 seqDelimiter = '//' linesep = os.linesep seqStr = removeAllButAlpha(seqStr) strList = [] strList.append('SQ sequence length %s;' % len(seqStr)) strLen = linelength-lmar for i in range( 0, len(seqStr), strLen): strList.append( ' '*lmar + seqStr[i:i+strLen] ) return linesep.join(strList) + linesep + seqDelimiter def writeSingleEMBLStr( seqObj, linelength, insert_blanklines ): """Returns a string with mappings of Seq object characteristics as defined in readSingleEMBLStr. Call this function from writeEMBLStr. """ nameKey = 'ID' accKey = 'AC' heaKey = 'DE' seqKey = 'SQ' commentKey = 'XX' seqDelimiter = '//' strList = [] # name is required strList.append( writeEMBLLine(nameKey, seqObj.getName(), linelength )) # accession and description are optional if seqObj.getAcc(): strList.append( writeEMBLLine(accKey, seqObj.getAcc(), linelength )) if seqObj.getHea(): strList.append( writeEMBLLine(heaKey, seqObj.getHea(), linelength )) # iterate over other keys in data for k,v in seqObj.getData().items(): strList.append( writeEMBLLine(k[:2].upper(), v, linelength )) # sequence is required strList.append( writeEMBLSeq( seqObj.getSeq(), linelength ) ) if insert_blanklines: entrysep = os.linesep + commentKey + os.linesep else: entrysep = os.linesep return entrysep.join(strList) def readEMBLStr( strin ): """Reads data corresponding to one or more sequences in EMBL format. Return a list of Seq objects""" seqDelimiter = '//' strList = strin.split(seqDelimiter) seqList = [] for s in strList: if not s.strip(): continue s = s.strip() + os.linesep + seqDelimiter seqList.append( readSingleEMBLStr(s) ) return seqList def readEMBLLine( line ): """Returns key, val where key is always an uppercase, two-letter key (len(key)!=2 results in EMBLFormatError). key is the first word on the line; val may be an empty string.""" splitline = line.split(None,1) try: key, val = splitline except ValueError: key = splitline[0] val = '' if len(key) != 2: raise EMBLFormatError, 'the key %s is not 2 characters'%key return key.upper(),val # define these rexp externally to avoid multiple re calls wsAndDigitsRexp = re.compile(r'[\d\s]+') def readSingleEMBLStr( strin ): """input is a single sequence in EMBL format (expects terminal //); returns a single Seq object. Mappings from EMBL two-letter line identifiers to Seq fields are as follows: ID name (sequence name as one word) AC acc (an arbitrary accession number) DE hea (an arbitrary header string) SQ seq (DNA or AA sequence string) all other lines identified by two-letter keys will be parsed and placed in the Seq.__data dict Parsing does not strictly adhere to the 5-space gutter on the left; for example, the following lines result in the same output: cl a client CL a client """ nameKey = 'ID' accKey = 'AC' heaKey = 'DE' seqKey = 'SQ' commentKey = 'XX' seqDelimiter = '//' namedKeys = [nameKey, accKey, heaKey, seqKey] requiredKeys = [nameKey, seqKey, seqDelimiter] # look for some key features defining EMBL format k = '' try: for k in requiredKeys: strin.index(k) except ValueError: raise EMBLFormatError, 'Formatted sequence is missing the "%s" key' % k lines = strin.strip().splitlines() lines.pop(-1) # consume final // currentKey = '' IDcount = 0 allData = {} while 1: line = lines.pop(0) if not line.strip: continue key, val = readEMBLLine(line) if key == commentKey: continue if key == nameKey: IDcount += 1 if key and key != currentKey: currentKey = key allData[currentKey] = allData.get(currentKey,'') + ' ' + val if currentKey == seqKey: break if IDcount > 1: raise EMBLFormatError, 'More than one line begins with %s ' % nameKey # remaining lines represent sequence info seqStr = wsAndDigitsRexp.sub('', ''.join(lines) ) if seqStr == '': raise EMBLFormatError, 'No sequence string found in this record' # initialize Seq object try: name = allData[nameKey].split()[0] except (IndexError, KeyError): raise EMBLFormatError, 'No ID key (name not specified)' acc = allData.get(accKey,'').strip() hea = allData.get(heaKey,'').strip() for key in namedKeys: try: del allData[key] except KeyError: pass data = {} for key, val in allData.items(): data[key] = val.strip() # def __init__( self, name, seq, type='', acc='',hea='' ): return Seq.Seq(name, seqStr, type='', acc=acc, hea=hea, data=data) def readSelex( filename ): """SYNTAX: readSelex( filename ) filename: file in selex format (filename or open file object ) returns a list of Seq objects.""" #access file closefile =0 if type(filename) == types.StringType: fileLabel = filename closefile = 1 try: selexFile = open(filename, 'r' ) except IOError, msg: sys.exit('SeqIO.readFasta: error reading %s\n%s' % (filename,msg) ) elif type(filename) == types.FileType: fileLabel = filename.name #filename is not a string; is it an open file? if filename.closed: sys.exit('SeqIO.readFasta: %s is a closed file' % filename) else: selexFile = filename else: sys.exit('SeqIO.readSelex: %s must either be a string or an open file' % filename) #collect names and sequence strings stringDict = {} namelist = [] # collect names to permit output of seqs in original order for line in selexFile.readlines(): if line.strip() == '' or line.strip()[0] == '#' or len(line.split()) < 2: continue mo = Patterns.selexPatternReo.match(line) if mo == None: continue d = mo.groupdict() name = d['name'] stringDict[name] = stringDict.get(name, '') + removeWhitespace(d['seq']) if name not in namelist: namelist.append(name) # make a list of sequence objects gapchars = Patterns.singleGapsReo seqList = [] for name in namelist: seqString = stringDict[name].upper() seqString = gapchars.sub( '-', seqString ) #replace all gap chars with '-' seqList.append( Seq.Seq( name=name, seq=seqString ) ) if closefile: selexFile.close() return seqList def interleave(file, seqList, width=50, blocksize=10, number=1, hea=0, gapchar='', v=1): """Creates interleaved alignment from a list of sequence objects file an open file object blocksize 0 for no blocks number places numbers at top of alignment if 1 gapchar all gap chars [.-~] will be replaced with this char. set gapchar='' for no replacement""" blanklines = 1 #find longest sequence and name longest = len(seqList[0]) namelen = 0 for seq in seqList: if not hea: namelen = max( [ namelen, len( seq.getName() ) ] ) else: namelen = max( [ namelen, len( seq.getName() + ' ' + seq.getHea() ) ] ) longest = max([len(seq), longest]) # formattingString = "%"+`namelen`+"s %s" formattingString = "%"+`namelen`+"s %s\n" # re for gap char gapcharExp = re.compile(r'[-.~]') #write sequence objects to screen start, stop = 0, width positionCounter = 1 while 1: if number: w = min([positionCounter + width - 1,longest]) - positionCounter if blocksize: w = w + int(w/blocksize) - 1 #fstr = ' '*(namelen + 3) + '%s%' + `w` + 's' fstr = ' '*(namelen + 3) + '%s%' + `w` + 's\n' file.write( fstr % (positionCounter, min([positionCounter + width - 1,longest])) ) for i in range(len(seqList)): thisLine = seqList[i][start:stop] if blocksize: thisLine = block(thisLine,blocksize) if gapchar != '': thisLine = gapcharExp.sub(gapchar, thisLine) if not hea: file.write( formattingString % (seqList[i].getName(), thisLine) ) else: file.write( formattingString % (seqList[i].getName() + ' ' + seqList[i].getHea(), thisLine) ) start, stop = stop, stop + width positionCounter += width file.write( blanklines*'\n' ) if start > longest: break # t.msf MSF: 297 Type: N April 2, 2002 16:55 Check: 1054 .. # # Name: 143h175_2 Len: 297 Check: 9178 Weight: 1.00 # Name: 143h703_2 Len: 297 Check: 8697 Weight: 1.00 # # #// # # 1 50 #143h175_2 CCTCAGATCA CTCTTTGGCA ACGACCCCTC GTCACAATAA AGATAGGGGG #143h703_2 CCTCAGATCA CTCTTTGGCA ACGACCCCTC GTCACAATAA AGATAGGGGG def getSeqNames(seqlist, lower=1): """Get a list of names from a list of Seq objects. lower=1 forces names to lower case""" namelist = [] for seq in seqlist: name = seq.getName() if lower: name = name.lower() namelist.append(name) return namelist def seqToMSF(file, seqList, type = 'n'): """Creates a MSF-format alignment from a list of sequence objects. file is an open file object. Type = {'n' | 'p'}""" # type reverts to if type.upper() not in ['N','P']: sys.stderr.write("*"*50 + '\nSeqIO.seqToMSF(): type "%s" not recognized - reverting to type=N\n' + "*"*50 + '\n') type = 'N' alignLength = len(seqList[0]) file.write( 'MSF: %i Type: %s Check: 1 ..\n\n' % (alignLength, type.upper()) ) # write header block for seq in seqList: # '-'s were replaced by '.' in interleave tup = (seq.getName(), alignLength, gcg_checksum(seq.getSeq().replace('-','.'))) file.write( 'Name: %-16s Len:%6i Check: %4i Weight: 1.00\n' % tup ) file.write( '\n//\n\n' ) interleave(file, seqList, width=50, blocksize=10, number=1, hea=0, gapchar='.', v=1) def seqToNexus(file, seqList, fixnames = 1): """Writes a list of Seq objects to nexus format to open file object file. Assumes that all sequences are the same length. fixnames, if 1, replaces all '-' characters with '_' (underscore), and all multple underscore characters with a single underscore.""" outstr = """ #NEXUS BEGIN DATA; DIMENSIONS NTAX=%i NCHAR=%i; FORMAT DATATYPE=DNA INTERLEAVE MISSING=-; MATRIX """ if fixnames: r = re.compile('[-_\W]+') # finds -,_ and non-alphanumeric chars for i in range(len(seqList)): seq = seqList[i] seqList[i].setName(r.sub('_', seqList[i].getName())) file.write(outstr % (len(seqList), len(seqList[0]) )) #interleave(file, seqList, width=50, blocksize=10, number=1, hea=0, v=1): interleave(file, seqList, width=50, blocksize=10, number=0, hea=0, gapchar='-', v=0) file.write('\n;\nEND;\n') def gcg_checksum(seq): """Calculates GCG checksum for a sequence. seq is a string or sequence object. returns an int""" if type(seq) != types.StringType: seq = seq.getSeq() seq = seq.upper() check = 0 count = 0 for c in seq: count += 1 check += count * ord(c) #ASCII int value of c if count == 57: count = 0 return check % 10000 def readDnapars(filename='', v = 0): """Reads in the output file from dnapars (phylip v3.6) and returns [dictOut, tofromlist]. Note that dnapars treats all ambiguity characters as (and replaces them with) 'N'. This function only works with dnapars files contining a single tree!""" #open file or standard input try: fastaFile = open( filename, 'r' ) except IOError, excpt: #check if standard input contains data fastaFile = sys.stdin #verbosity if v > 1: print '\n\n*\n*%30s\n*' % 'SeqIO.readFasta' if filename == '': print 'Reading from sys.stdin' else: print 'Reading from file', filename seqDict = {} fromtolist = [] #start looking at the file keepGoing = 1 if v > 1: print 'starting to read in lines...' while 1: line = fastaFile.readline() if line == '': break if v > 0 and not keepGoing: print '.', if line.find('State at upper node') != -1: keepGoing = 0 continue if keepGoing: continue fromSeq = line[0:4].strip() toSeq = line[4:19].strip() step = line[19:27].strip() seqLine = line[27:].strip() if v > 3: print line, print fromSeq + '|' + toSeq + '|' + step + '|' + seqLine, '\n' if toSeq != '': seqDict[toSeq] = seqDict.get(toSeq, '') + seqLine if fromSeq != '' and (fromSeq, toSeq) not in fromtolist: fromtolist.append( (fromSeq, toSeq) ) fastaFile.close() nameList = seqDict.keys() nameList.sort() dictOut = {} seqLengths = [] for name in nameList: #do some sequence QC here dictOut[name] = Seq.Seq( name, 'nucl', removeWhitespace(seqDict[name]) , hea='from dnapars') seqLengths.append( len(string.strip(dictOut[name].getSeq())) ) if v > 2: print '*'*30 print "original sequence:" print seqDict[name] print print "sequence object" print seqToFasta(dictOut[name]) if max(seqLengths) != min(seqLengths): print "SeqIO.readDnapars() error: not all sequences are the same length in " + filename for seqName in dictOut.keys(): print seqName, len(dictOut[seqName]) sys.exit() return dictOut, fromtolist def testReadDnapars(): parsfile = sys.argv[1] tfafile = sys.argv[2] verbosity = int(sys.argv[3]) print 'reading', sys.argv[1] parserDict, fromToList = readDnapars(parsfile, v=verbosity ) print 'reading', sys.argv[2] fastaDict = readFasta(tfafile) for seqName in fastaDict.keys(): if fastaDict[seqName] == parserDict[seqName]: print tfaname, seqName,'same' else: print seqName, 'different' print seqToFasta( fastaDict[seqName], linelength = 60, degap = 0, acc=0, hea=0, begEnd=None, triplets=0, v=0 ) print seqToFasta( parserDict[seqName], linelength = 60, degap = 0, acc=0, hea=1, begEnd=None, triplets=0, v=0 ) import types dictkeys = parserDict.keys() dictkeys.sort() for seqName in dictkeys: try: seqName = eval(seqName) except NameError: pass if type(seqName) == types.IntType: print seqToFasta( parserDict[`seqName`], linelength = 60, degap = 0, acc=0, hea=1, begEnd=None, triplets=0, v=0 ) #for pair in fromToList: #print pair okchar = re.compile(r"[\n\ _\-ACGTN\d]") def seqToPhylip( seq, name='', superclean=0, v=debugLevel ): """Writes a string in phylip format from a sequence object, seq. name is used instead of the name associated with seq if supplied. Mercilessly truncates names to 10 characters.""" if name == '': name = seq.getName() seqStr = seq.getSeq() str = '%-10s%s' % (name[:10], seqStr) # debug strange behavior of dnadist: # temporary measure to make seq string super clean (slow!!!) if superclean: seq_list = list(str) print seq.getName(), ok = list('ACGTN\n -1234567890S') for i in range(len(seq_list)): #if okchar.search(c) == None: if seq_list[i].upper() not in ok: print '[%s]' % seq_list[i], seq_list[i] = 'N' print '' str = string.join(seq_list,'') return breakLines(str, 60) def writePhylip( seqList, rename=0, superclean=0, v=debugLevel ): """Creates a string representing a phylip2 format sequence alignment from a list of sequences. If renum=1, sequentially renames sequences s1, s2 ... sN (useful for long names).""" str = '' # species, characters str = str + '%s %s\n' % (len(seqList), len(seqList[0])) for i in range(len(seqList)): if rename: thisName = 's' + `i+1` else: thisName = seqList[i].getName() str = str + seqToPhylip( seqList[i], name=thisName, superclean=superclean) badchar = re.compile(r"") return str def BegEnd( sequenceObj, v=debugLevel ): """Determines the beginning and end position of a sequence in an aligned fasta file relative to the alignment assuming the beginning is padded with gap characters""" #obtain sequence seq = sequenceObj.getSeq() if v>0: print '.', #find beginning of sequence relative to alignment numbering #all sequences start with a number of gaps, followed by the #gapped sequence and end with a character (last of sequence) #the span from first to last character is used to determine #the position of the sequence in the alignment it was taken #from reo = re.compile( r'\A(?P[-.~]*)(?P[a-z].*[a-z])[-.~]*\Z', re.I ) found = reo.search( seq ) return found.span('seq') def writeTable( NestedList, filename='writeTableOut.txt', presicion=2, digits=10, delim ='\t', pad=0, padchar=0,v=debugLevel): """A nested list containing numbers is written as an ascii matrix corrsponding to a file with the default filename writeTableOut.txt * presicion: number of digits after period [default 2] * digits: number of total digits [default 10] * delim: field delimiter [default tab] * pad: pad all rows to length of row 1 with padchar * verbose: verbose mode [default 1] added 020104 NH: if both presicion=0 and digits=0, uses %s instead of '%digits.presicionf' (%s should work with both ints and chars) also, will write to an existing file object if supplied in filename this function will write tables with rows containing different numbers of columns""" #making formatting strings if presicion == 0 and digits == 0: formatString = '%s' + delim formatMinusDelim = '%s' else: formatString = '%' + `digits`+'.' + `presicion` + 'f' + delim formatMinusDelim = '%' + `digits`+'.' + `presicion` + 'f' # is filename an open file object? close = 1 try: filename.isatty() except ValueError: print '-'*60 traceback.print_stack(file=sys.stdout) sys.exit( 'SeqIO.writeTable was passed a closed file object\n' + '-'*60) except AttributeError: # filename isn't a file object #open file for writing try: matFile = open( filename, 'w' ) except IOError, excpt: sys.exit( excpt ) else: matFile = filename close = 0 # don't close if passed an open file object ### 010709 NEW IMPLEMENTATION if v > 0: print "SeqIO.writeTable:\n\t Number of rows: ", len( NestedList ), "number of columns in first row:", len( NestedList[0] ) print "\tDelimiter", delim, "Presicion:", presicion, "Digits:", digits if pad: padlen = len(NestedList[0]) if type(NestedList[0]) in [types.ListType, types.TupleType] : for i in NestedList: temp = len(i) * formatString # added 021218 to make all columns same length if pad and len(i) < padlen: difference = padlen-len(i) temp += (difference*formatString) % ((padchar,)*difference) temp = temp.strip() matFile.write( temp % tuple( i ) + '\n' ) else: temp = ( len( NestedList ) - 1) * formatString + formatMinusDelim matFile.write( temp % tuple( NestedList ) + '\n' ) if close: matFile.close() def guessFrame(sequenceString, ignoreEndStop = 1, stopPercent = 3.0, debug = 0): """translates a dna sequence string in all three frames and returns int correctionFactor. Adding correctionFactor to the starting index will result in an in-frame translation""" if ignoreEndStop: frame0 = Seq.translate(sequenceString)[0:-1] frame1 = Seq.translate(sequenceString[1:])[0:-1] frame2 = Seq.translate(sequenceString[2:])[0:-1] else: frame0 = Seq.translate(sequenceString) frame1 = Seq.translate(sequenceString[1:]) frame2 = Seq.translate(sequenceString[2:]) if debug: print 'frame0', frame0, '\n' print 'frame1', frame1, '\n' print 'frame2', frame2, '\n' stops = [string.count(frame0, '*'), string.count(frame1, '*'), string.count(frame2, '*') ] if debug: print 'stops:', stops, 'min:', min(stops) if stops.count(0) > 1: if debug: print "There is more than one frame with zero stops!" correctionFactor = None elif stops.count(min(stops)) > 1: if debug: print "There is more than one frame with the same minimum number of stops!" correctionFactor = None elif min(stops) > stopPercent/100.0 * float(len(frame0))/stopPercent: if debug: print "More than",stopPercent, "% of positions are stops in all frames!" correctionFactor = None elif stops.index(min(stops)) == 0: correctionFactor = 0 elif stops.index(min(stops)) == 1: correctionFactor = 1 elif stops.index(min(stops)) == 2: correctionFactor = 2 return correctionFactor def guessType(seq, minfreq = .10): """ Guess if a sequence is nucleotide or protein based in character composition. Returns 'nucl' or 'prot', list of base compositions """ seqStr = seq[:].upper() seqLen = float(len(seqStr)) compList = [] for base in 'A C G T'.split(): tally = string.count(seqStr, base) compList.append( float(tally)/seqLen ) # are any of a,c,g,t represented at a frequency < minfreq? obsMin = min(compList) ans = 'nucl' if obsMin < minfreq: ans = 'prot' return ans, compList def codonShuffle( seq, name='', dict = backTransDictNoAmb ): """ Randomizes the codon usage of a nucleotide sequence while maintaining the amino acid sequence. Input is either a sequence object or a string; return type is determined by the input type.""" pep = Seq.translate( seq ) #make a slice copy: this will allow the function to work with #strings as well as with sequence objects s = string.upper( pep[:] ) warn = '' if s.count('*') > 0: warn = "Warning: the translation of this sequence contains %s stops" % ( s.count('*'),) codonlist = [] for aa in pep: thislist = list(backTransDictNoAmb.get( aa, ('NNN',))) random.shuffle( thislist ) codonlist.append( thislist[0] ) shuffledSeq = string.join(codonlist, '')[:len(seq)] #make sequence object if type( seq ) == types.StringType: return shuffledSeq else: if name == '': name = seq.getName() seqObj = Seq.Seq( name=name, seq=shuffledSeq, hea="codon shuffled " + warn ) return seqObj def encode( seq, encodingDict, unknown, v=debugLevel ): """ returns a list in which each element represents an encoding of a position of the given sequence string or sequence object; the encoding is directed by the given dictionary. the dictionary should have the form dict = { aa1: encoding1, aa2: encoding2,... }; if dict = 'aa', the neural net aa encoding is chosen if dict = 'nucl', the standard nucleotide encoding without ambiguities is used if dict = 'anucl', the standard nucleotide encoding with ambiguities is used any symbols not found in the dictionary are encoded as specified by the argument 'unknown' updated for SeqIO by NH 020103 """ encodedSeq = [] #find out which dictionary to use (determines encoding) if v > 2: print "SeqIO.encode was supplied an encodingDict of type %s" % type(encodingDict) if type(encodingDict) == types.DictType: dict = encodingDict else: try: dict = numDict[encodingDict] except KeyError: Sys.exit( 'SeqIO.encode: invalid encoding dictionary' ) # if seq is a string, make it upper case try: seq = seq.upper() except AttributeError: pass #encode for i in seq: #encode all positions according to the dictionary encodedSeq.append( dict.get( i, unknown ) ) return encodedSeq def encodeSeqList( seqList, dict, v=debugLevel, pad=0, unknown = -1 ): """ encodes all sequence objects in seqList using the given dictionary or one of the buit-in dictionaries: * dict: {'aa'|'nucl'|'anucl'} 'aa': the neural net aa encoding 'nucl': standard nucleotide encoding 'anucl': standard nucleotide encoding with ambiguities * verbose: {0|1}: prints out running commentary * pad: {0|1}: add gap characters to end of sequences to equalize length (for alignments) the encoded sequences are returned as a nested list of integers updated for SeqIO by NH 020103""" encoded = [] if pad: maxLength = 0 for i in seqList: maxLength = max( [maxLength, len(i)] ) if v > 0: print "\nSequenceIO.encodeSeqList: encoding sequences", for seq in seqList: seqString = str(seq) if pad: #pad end with gaps to maxLength seqString = seqString + (maxLength - len(seq))*'-' encoded.append( encode( seqString, dict, unknown ) ) return encoded def testEncodeList(filename, dict = 'aa'): """an alignment of sequences supplied as the first argument is encoded according to the dict specified in arg 2""" seqList = readFasta( filename ) enc = encodeSeqList( seqList, dict, v=debugLevel, pad=0, unknown = -1 ) for eSeq in enc: print eSeq def tabulate( seqList, v=debugLevel ): """calculate the abundance of each character in the columns of aligned sequences; tallies are returned as a list of dictionaries indexed by position (position 1 = index 0). Keys of dictionaries are characters appearing in each position (so dictionaries are of variable length). seqList: a list of sequence objects returns a list of dictionaries corresponding to each position""" # get length of alignment maxLength = 0 for seq in seqList: maxLength = max( [maxLength, len(seq)] ) # initialize a data structure for storing the tallies dictList = [] for i in range(maxLength): dictList.append({}) if v > 0: print 'Tabulating sequences' for seq in seqList: if v > 1: print '.', #make sure seq is padded seqString = str(seq) thisLen = len(seq) if thisLen < maxLength: seqString = seqString + '-'*(maxLength - thisLen) # increment the dictionaries for this sequence # dict[char] = dict.get(char, 0) + 1 # dict <--> dictList[i] # char <--> seqString[i] for i in range(maxLength): dictList[i][seqString[i]] = dictList[i].get(seqString[i], 0) + 1 if v > 1: print ' ' return dictList def consensus( dict, countGaps=0, plu=2, v=debugLevel ): """Given a dictionary from tabulate representing character frequencies at a single position, returns the most common char at that position subject to the rules below. countGaps { 0 | 1 } plu plurality for calling a consensus character Special cases: 1) The most abundant character at a position is a gap if countGaps=0, uses the next most common character or 'x' if all chars are gaps if countGaps=1, puts a gap character ('-') at this position""" gapChars = ['-', '.', '~'] if len(dict) == 1: thisChar = dict.keys()[0] if thisChar not in gapChars: return thisChar elif thisChar in gapChars: if not countGaps: return 'X' else: return '-' if not countGaps: for c in gapChars: try: del dict[c] except KeyError: pass if len(dict) == 1: return dict.keys()[0] # gap was one of only two chars rdict = invertDict(dict) vals = dict.values() vals.sort() vals.reverse() #largest value is first majority = vals[0] second = vals[1] if majority-second < plu: return 'X' return rdict[majority] def consFromList(seqList, countGaps=0, plu=2): """Returns a sequence object representing the consensus of seqList""" positionDictList = tabulate(seqList) consString = '' for d in positionDictList: consString += consensus(d, countGaps=0, plu=plu) return Seq.Seq( name='CON', seq=consString.upper() ) def diff(seq,templateseq, simchar=' '): """Compares seq and templateseq (can be Seq objects or strings) and returns an object is of same type as seq in which characters in seq that are identical at that position to templateseq are replaced with simchar. Return object is the length of the shorter of seq and templateseq""" seqstr = seq[:].upper() templatestr = templateseq[:].upper() newstr = '' for i in range(min([len(seqstr), len(templatestr)])): if seqstr[i] == templatestr[i]: newstr += simchar else: newstr += seqstr[i] if type(seq) == types.StringType: return newstr else: outseq = seq.clone() outseq.setSeq(newstr) return outseq def testTabulate( filename ): seqList = readFasta( filename ) dictList = tabulate( seqList, v = 3 ) firstStr = seqList[0].getSeq() for i in range(len(firstStr)): print i + 1, '\t', firstStr[i], '\t' , dictList[i], consensus( dictList[i], countGaps=0, plu=600 ) #From To Any Steps? State at upper node # # 1 CCCATTAGTC CTATTGAAAC TGTACCAGTA AAATTAAAGC # 1 u28651000 no CCCATTAGTC CTATTGAAAC TGTACCAGTA AAATTAAAGC # 1 u28652 no CCCATTAGTC CTATTGAAAC TGTACCAGTA AAATTAAAGC # 1 2 no CCCATTAGTC CTATTGAAAC TGTACCAGTA AAATTAAAGC # 13 u53871 no CCCATTAGCC CTATTGAGAC TGTACCAGTA AAATTAAAGC # 10 14 no CCCATTAGCC CTATTGAGAC TGTACCAGTA AAATTAAAGC # 14 m69212 yes CCCATTAGCC CCATTGAGAC TGTACCAGTA AAATTAAAGC # 14 m69213 no CCCATTAGCC CTATTGAGAC TGTACCAGTA AAATTAAAGC # 18 19 maybe ATGGAAAAGG AAGGAAAAAT TTCAAAAATT GGGCCTGAAA # 19 u16773 yes ATGGAGAAGG AAGGAAAAAT TTCAAAAATT GGACCTGAAA # 19 u16775 no ATGGAAAAGG AAGGAAAAAT TTCAAAAATT GGGCCTGAAA # 5 20 no ATGGAAAAGG AAGGGAAAAT TTCAAAAATT GGGCCTGAAA def describe(): """ SeqIO.py Module containing functions for reading and writing sequence information in various formats, translation of nucleotide sequences. Lots of useful functions in here! """ print describe.__doc__ if __name__ == '__main__': describe()