Source code for geneplot

#!/usr/bin/env python

import os
import gffutils
import subprocess
from reportlab.lib import colors
from reportlab.lib.units import cm
from reportlab.lib.colors import Color
from Bio.Graphics import GenomeDiagram
from Bio.SeqFeature import SeqFeature, FeatureLocation
from Bio.Graphics.GenomeDiagram import CrossLink
import matplotlib.pyplot as pl
import logging


logfile = 'geneplot.all.err.log'
galllog = open(logfile, 'w'); galllog.close()
logging.basicConfig(filename=logfile, level=logging.DEBUG, filemode='a', force=True,
                format='%(asctime)s %(levelname)s %(name)s  %(funcName)s  %(module)s %(message)s')

logger=logging.getLogger(__name__)
logger.info('module initialized...'); print('--', logger)


[docs]def createGFFdb(gff3file): ''' Creates a sqlite3 database of a GFF3 file with the gffutils Pyton package. :param gff3file: path to the GFF3 file :type gff3file: str ''' gffutils.create_db(gff3file, dbfn=gff3file + '.db', force=True, keep_order=True, merge_strategy='merge', sort_attribute_values=True) if os.path.isfile(gff3file + '.db') == True: print(os.path.basename(gff3file) + '.db', 'has been created')
[docs]class genome(object): ''' Instantiates a genome object with genome-associated as paths pointing to the data source and set as class attributes. Data include the GFF3 file of genome annotation (positional), InterproScan output of protein domains identified on protein-coding genes (keyword), and directory with VCF files of polymorphisms (keyword). :param gff3file: path to the GFF3 file :type gff3file: str :param iprfile: path to the InterproScan's output file :type iprfile: str :param vcffiles: path to the VCF files directory :type vcffiles: str ''' _snpimpact = {'LOW':Color(0,150/255,50/255), 'MODERATE':Color(204/255,153/255,0/255), 'MODIFIER':Color(255/255,0/255,255/255), 'HIGH':Color(255/255,0,0), 'NOANN':Color(0,0,0)} def __init__(self, gff3file, iprfile=None, vcffiles=None): print('Initializing genome class...') genome.gff3file = gff3file genome.iprfile = iprfile genome.vcffiles = vcffiles logger.info(' genome class initialized')
[docs] class gene(object): ''' Instantiates a gene object with the method plot() to represent the intron/exon structure of the gene from a GFF3 file, the protein domain topology from InterproScan's output, and single nucleotide polymofphisms (SNPs) from VCF files. :param mRNAid: gene identifier (ID) according to the GFF3 file annotations. :type mRNAid: str :param proteinid: protein identifier (ID) from the InterproScan output :type proteinid: str :param description: user-defined description of the gene :type description: str ''' def __init__(self, mRNAid, proteinid=None, description=None): print('Initializing gene class...') self.id = mRNAid self.proteinid = proteinid self.gff3file = genome.gff3file self.iprfile = genome.iprfile self.vcffiles = genome.vcffiles self.description = description self.db = gffutils.FeatureDB(genome.gff3file + '.db', keep_order=True); db = self.db assert db[mRNAid].featuretype == 'mRNA', 'only mRNA features are supported for \ this version of the library' self.chrom = db[mRNAid].chrom self.start = db[mRNAid].start self.end = db[mRNAid].end self.strand = db[mRNAid].strand logger.info(self.id + ' gene class initialized')
[docs] def transcriptpos_to_genomepos(self): ''' Calculates genome coordinates for every nucleotide position of the transcript according to the GFF3 and FASTA files provided as input during the instantiation of the gene class. ''' logger.info('starting...') db = self.db try: mrna = self.id coors = [] lencdsaccum = 0 for cds in db.children(db[mrna], featuretype='CDS', order_by='start'): lencds = cds.end + 1 - cds.start coors.append(list(range(cds.start, cds.end+1))) lencdsaccum += lencds coors = sum(coors, []) if db[mrna].strand == '-': coors.reverse() dcoors = {} for pos in range(lencdsaccum): dcoors[pos+1] = coors[pos] self.dcoors = dcoors return dcoors except Exception as e: logger.error(e) raise
[docs] def proteindoms(self, iprfile, proteinid): ''' Builds a dictionary from the InterproScan file provided as input of the class "gene". :param iprfile: path to the InterproScan's output file :type iprfile: str :param proteinid: protein identifier (ID) from the InterproScan output :type proteinid: str ''' logger.info('starting...') try: mrna = self.id f = open(iprfile, 'r') doms = {} for i in f: if i.split('\t')[0] == proteinid: doms.setdefault(i.split('\t')[3], []).append((i.split('\t')[4], i.split('\t')[5], int(i.split('\t')[6]), int(i.split('\t')[7]))) f.close() # print(len(doms), 'domain types found') print(', '.join(doms.keys())) self.doms = doms return doms except Exception as e: logger.error(e) raise
[docs] def getsnppos(self, sp, vcffiles, onlycoding=True): ''' Selects SNP data overlapping with genome coordinates of the gene ID (class object) from a VCF file whose sample ID matches the "sp" parameter of the function. SNP annotation by SNPEff is retrieved from the VCF file. If absent, de novo annotation of selected SNPs is performed. :param sp: Species ID to select SNP data from the VCF file :type sp: str :param vcffiles: path to the VCF files directory :type vcffiles: str :param onlycoding: to plot only SNPs located on coding areas of the gene :type onlycoding: boolean ''' logger.info('starting...') try: mrna = self.id db = self.db phvcfs = vcffiles galllog = open(logfile, 'a') fsp = 'file not found' for ffile in os.listdir(phvcfs): if ffile.endswith('.vcf'): logger.debug(ffile) p0 = subprocess.run(['vcf-query -l '+os.path.join(phvcfs, ffile)], shell=True, stdout=subprocess.PIPE, stderr=galllog) sample = p0.stdout.decode().strip() if sp == sample: print('getting SNPs from...', self.id, sample, ffile) fsp = ffile dcds, varssall, dmrna, dvarss = {}, [], {'snps':{}, 'posgposp':{}}, {} lencdsaccum = 0 # SNPs on coding areas if onlycoding == True: print('plotting only SNPs on coding areas...') for cds in db.children(db[mrna], featuretype='CDS', order_by='start'): lencds = cds.end + 1 - cds.start p1 = subprocess.run(['vcftools --vcf ' + os.path.join(phvcfs, fsp) +\ ' --chr '+db[mrna].chrom+' --from-bp '+\ str(cds.start)+' --to-bp '+str(cds.end)+\ ' --recode --recode-INFO-all --stdout'], shell=True, stdout=subprocess.PIPE, stderr=galllog) for var in p1.stdout.decode().split('\n')[:-1]: if var[0] != '#': if len(var.split('\t')[3]) == 1 and len(var.split('\t')[4]) == 1: varssall.append(var) if var.split('\t')[9].split(':')[0] == '0/1': dcds[int(var.split('\t')[1])-cds.start+1+lencdsaccum] = \ var.split('\t')[3] + '/' + var.split('\t')[4] dvarss[var.split('\t')[0]+':'+var.split('\t')[1]] = \ var.split('\t')[3] + var.split('\t')[4] dmrna['posgposp'][var.split('\t')[0]+':'+var.split('\t')[1]] = \ int(var.split('\t')[1])-cds.start+1+lencdsaccum if var.split('\t')[9].split(':')[0] == '1/1': dcds[int(var.split('\t')[1])-cds.start+1+lencdsaccum] = \ var.split('\t')[4] dmrna['posgposp'][var.split('\t')[0]+':'+var.split('\t')[1]] = \ int(var.split('\t')[1])-cds.start+1+lencdsaccum dvarss[var.split('\t')[0]+':'+var.split('\t')[1]] = \ var.split('\t')[4] + var.split('\t')[4] lencdsaccum += lencds # all SNPs else: p2 = subprocess.run(['vcftools --vcf ' + os.path.join(phvcfs, fsp) +\ ' --chr '+db[mrna].chrom+' --from-bp '+\ str(self.start)+' --to-bp '+str(self.end) +\ ' --recode --recode-INFO-all --stdout'], shell=True, stdout=subprocess.PIPE, stderr=galllog) for var in p2.stdout.decode().split('\n')[:-1]: if var[0] != '#': if len(var.split('\t')[3]) == 1 and len(var.split('\t')[4]) == 1: varssall.append(var) if var.split('\t')[9].split(':')[0] == '0/1': dvarss[var.split('\t')[0]+':'+var.split('\t')[1]] = \ var.split('\t')[3] + var.split('\t')[4] if var.split('\t')[9].split(':')[0] == '1/1': dvarss[var.split('\t')[0]+':'+var.split('\t')[1]] = \ var.split('\t')[4] + var.split('\t')[4] varssall = list(set(varssall)) # # SNPEff annotation dsnpsann = {} headerann = 'Allele | Annotation | Annotation_Impact | Gene_Name | Gene_ID | \ Feature_Type | Feature_ID | Transcript_BioType | Rank | HGVS.c | HGVS.p | \ cDNA.pos / cDNA.length | CDS.pos / CDS.length | AA.pos / AA.length | Distance |\ ERRORS / WARNINGS / INFO' def getann(item): dsnpsann = {} for i in item: if '#' not in i: for item in i.split('\t')[7].split(';'): if item.startswith('ANN'): ann = item[4:] dtmpann = {} for field in range(len(headerann.split('|'))): dtmpann[headerann.split('|')[field].strip()] = \ ann.split('|')[field] dsnpsann[':'.join(i.split('\t')[:2])] = dtmpann return dsnpsann dsnpsann = getann(varssall) # if len(dsnpsann) == 0: pass # dmrna['len'] = lencdsaccum if db[mrna].strand == '-': dtmp = {} for item in dcds: dtmp[dmrna['len']-item+1] = dcds[item] # dcds = dtmp dtmp = {} for item in dmrna['posgposp']: dtmp[item] = dmrna['len']-dmrna['posgposp'][item]+1 # dmrna['posgposp'] = dtmp del(dtmp) galllog.close() self.dcds = dcds self.varssall = varssall self.dmrna = dmrna self.dsnpsann = dsnpsann self.dvarss = dvarss return dsnpsann, dvarss except (TypeError): pass except Exception as e: logger.error(e) raise
[docs] def plot(self, domtype='Pfam', sp=None, onlycoding=True): ''' Plots features of the gene ID (class object) previously generated by the functions of the class, including exon and UTR features (the latter only if present in the GFF3 file), Interpro protein domains and SNPs. SNP data are labelled with the genotype according to the VCF file information, and colored based on SNPEff impact, i.e. LOW: green, MODERATE: amber, MODIFIER: pink, HIGH: red. A PNG image is generated. :param domtype: protein domain type (as specified in the InterproScan output) to be plotted (Pfam by default). :type domtype: str :param sp: Species ID to select SNP data from the VCF file. :type sp: str :param onlycoding: to plot only SNPs located on coding areas of the gene :type onlycoding: boolean ''' logger.info('starting...') try: mrna = self.id dcoors = self._transcriptpos_to_genomepos() doms = self._proteindoms(self.iprfile, self.proteinid) if \ (self.iprfile is not None and self.proteinid is not None) else {} dsnpsann, dvarss = self._getsnppos(sp, self.vcffiles, onlycoding) if \ self.vcffiles is not None else ({}, {}) db = self.db domsgncoors = [] ccolor = '#c5cc78' gdd = GenomeDiagram.Diagram('gene') sscale = 1 if self.end-self.start > 40000: sscale = 0 gdt_features = gdd.new_track(0, greytrack=True, name=mrna, greytrack_labels=1, greytrack_font_color='black', greytrack_fontsize=20, greytrack_font_rotation=90, scale=sscale, scale_largetick_interval=5000, scale_smalltick_interval=500, scale_fontsize=10, scale_fontangle=0) # gds_features = gdt_features.new_set() # for ft in db.children(mrna): if 'UTR' in ft.featuretype: ccolor = '#a6a6a6' feature = SeqFeature(FeatureLocation(ft.start, ft.end), strand=int(ft.strand+'1')) null = gds_features.add_feature(feature, name=ft.id, label=False, color=ccolor, sigil='BIGARROW', label_size=0, arrowshaft_height=1.0, arrowhead_length=0.1) # #protein domain independent gdt_features = gdd.new_track(1, greytrack=True, name='Dom', greytrack_labels=1, greytrack_font_color='black', greytrack_fontsize=30, height=0.3, scale=0, scale_largetick_interval=1000, scale_smalltick_interval=100) # gds_features = gdt_features.new_set() # cmap = pl.cm.get_cmap('tab20') # sort doms by size and define ldoms ldoms = [] for dom in doms.get(domtype, []): ldoms.append((dom[3]-dom[2],) + dom) ldoms = [dom[1:] for dom in sorted(ldoms, reverse=True)] # use ldoms from now on for indx,dom in enumerate(ldoms): pdcoors = sorted((dcoors[dom[2]*3-2], dcoors[dom[3]*3])) domsgncoors.append((dom[0], dom[1], pdcoors[0], pdcoors[1])) domcolor = Color(cmap(indx)[0], cmap(indx)[1], cmap(indx)[2], cmap(indx)[3]) #feature for domain name feature = SeqFeature(FeatureLocation(pdcoors[0], pdcoors[1]), strand=0) null = gds_features.add_feature(feature, name=dom[0], label=True, color='white', sigil='BOX', label_size=30, border=False, label_angle=25, label_position='start', label_color=domcolor) # for pos in range(pdcoors[0], pdcoors[1]): if pos in dcoors.values(): feature = SeqFeature(FeatureLocation(pos, pos+1), strand=0) null = gds_features.add_feature(feature, name='', label=False, color=domcolor, sigil='BOX', label_size=0) if pos not in dcoors.values(): feature = SeqFeature(FeatureLocation(pos, pos+1), strand=0) null = gds_features.add_feature(feature, name='.', label=True, color='white', border=False, sigil='BOX', label_size=10, label_strand=-1, label_color=domcolor) # #SNP positions snpimpact = genome._snpimpact gdt_features = gdd.new_track(2, greytrack=True, name='SNPs', greytrack_labels=1, greytrack_font_color='black', greytrack_fontsize=30, height=0.3, scale=0, scale_largetick_interval=1000, scale_smalltick_interval=100) # gds_features = gdt_features.new_set() # for snp in dvarss: snpcolor = snpimpact[dsnpsann.get(snp, {'Annotation_Impact': 'NOANN'}) ['Annotation_Impact']] feature = SeqFeature(FeatureLocation(int(snp.split(':')[1])-1, int(snp.split(':')[1])+1), strand=0) null = gds_features.add_feature(feature, name=dvarss[snp], label=True, color=snpcolor, sigil='OCTO', label_size=20, label_angle=90, label_position='middle', label_color=snpcolor) # gdd.draw(format='linear', pagesize=(100*cm,20*cm), fragments=1, start=db[mrna].start-100, end=db[mrna].end) # gdd.write('geneplot.' + str(self.id).replace(':', '_') + '.png', "png") # self.domsgncoors = domsgncoors except Exception as e: logger.error(e) raise