Source code for galaxy_utils.sequence.fasta

#Dan Blankenberg

[docs]class fastaSequence( object ): def __init__( self ): self.identifier = None self.sequence = '' #holds raw sequence string: no whitespace def __len__( self ): return len( self.sequence ) def __str__( self ): return "%s\n%s\n" % ( self.identifier, self.sequence )
[docs]class fastaReader( object ): def __init__( self, fh ): self.file = fh
[docs] def close( self ): return self.file.close()
[docs] def next( self ): line = self.file.readline() #remove header comment lines while line and line.startswith( '#' ): line = self.file.readline() if not line: raise StopIteration assert line.startswith( '>' ), "FASTA headers must start with >" rval = fastaSequence() rval.identifier = line.strip() offset = self.file.tell() while True: line = self.file.readline() if not line or line.startswith( '>' ): if line: self.file.seek( offset ) #this causes sequence id lines to be read twice, once to determine previous sequence end and again when getting actual sequence; can we cache this to prevent it from being re-read? return rval #454 qual test data that was used has decimal scores that don't have trailing spaces #so we'll need to parse and build these sequences not based upon de facto standards #i.e. in a less than ideal fashion line = line.rstrip() if ' ' in rval.sequence or ' ' in line: rval.sequence = "%s%s " % ( rval.sequence, line ) else: rval.sequence += line offset = self.file.tell()
def __iter__( self ): while True: yield self.next()
[docs]class fastaNamedReader( object ): def __init__( self, fh ): self.file = fh self.reader = fastaReader( self.file ) self.offset_dict = {} self.eof = False
[docs] def close( self ): return self.file.close()
[docs] def get( self, sequence_id ): if not isinstance( sequence_id, basestring ): sequence_id = sequence_id.identifier rval = None if sequence_id in self.offset_dict: initial_offset = self.file.tell() seq_offset = self.offset_dict[ sequence_id ].pop( 0 ) if not self.offset_dict[ sequence_id ]: del self.offset_dict[ sequence_id ] self.file.seek( seq_offset ) rval = self.reader.next() self.file.seek( initial_offset ) else: while True: offset = self.file.tell() try: fasta_seq = self.reader.next() except StopIteration: self.eof = True break #eof, id not found, will return None if fasta_seq.identifier == sequence_id: rval = fasta_seq break else: if fasta_seq.identifier not in self.offset_dict: self.offset_dict[ fasta_seq.identifier ] = [] self.offset_dict[ fasta_seq.identifier ].append( offset ) return rval
[docs] def has_data( self ): #returns a string representation of remaining data, or empty string (False) if no data remaining eof = self.eof count = 0 rval = '' if self.offset_dict: count = sum( map( len, self.offset_dict.values() ) ) if not eof: offset = self.file.tell() try: fasta_seq = self.reader.next() except StopIteration: eof = True self.file.seek( offset ) if count: rval = "There were %i known sequences not utilized. " % count if not eof: rval = "%s%s" % ( rval, "An additional unknown number of sequences exist in the input that were not utilized." ) return rval
[docs]class fastaWriter( object ): def __init__( self, fh ): self.file = fh
[docs] def write( self, fastq_read ): #this will include color space adapter base if applicable self.file.write( ">%s\n%s\n" % ( fastq_read.identifier[1:], fastq_read.sequence ) )
[docs] def close( self ): return self.file.close()