Hi Violeta,
OK, I think I get it now. IF the therapeutic area is measurement of phenotype, you want a more granular parent term if possible. AND you want to get these values for a larger number of associations. Is this correct?
For such use case I would highly recommend to use our flat files on ftp (associations, diseases), rather using the frontend or graphql api. (The ontology widget on the frontend uses a json to load all EFO terms.)
So if I understand the problem right, the logic would flow like this:
- We need to get all the EFO terms that lies directly under phenotype and measurements.
- Get all associations of interest.
- Join diseases with associations with exploding ancesties.
- Join ancestries with the more granular terms.
- Aggregate associations.
therapeutic_area_to_resolve = [
'EFO_0001444', # Measurement
'EFO_0000651' # Phenotype
]
gene_of_interest = 'ENSG00000115977'
# Loading disease dataset:
diseases = spark.read.parquet('/Users/dsuveges/project_data/diseases/')
# We are looking for the least granular terms for phenotypes and measurements,
# Which means, we want to get a list of terms where the only parent term is measurement or phenotype
resolved_phenotypes_measurements = (
diseases
.select(
f.col('id').alias('resolvedDiseaseId'),
f.col('name').alias('resolvedDiseaseName'),
f.explode('therapeuticAreas').alias('therapeuticArea'),
'parents', 'ancestors'
)
# Get all terms that are directly under phenotypes and measurements:
.filter(
f.col('therapeuticArea').isin(therapeutic_area_to_resolve) &
(
(f.array_contains(f.col('parents'), therapeutic_area_to_resolve[0])) |
(f.array_contains(f.col('parents'), therapeutic_area_to_resolve[1]))
)
)
.select('resolvedDiseaseId', 'resolvedDiseaseName')
# .show(truncate=False)
.persist()
)
# Load associations:
associations = (
spark.read.parquet('/Users/dsuveges/project_data/associationByOverallDirect/')
.select('diseaseId', 'targetId', 'score')
# Filter gene of interest:
.filter(f.col('targetId') == gene_of_interest)
)
resolved_associations = (
associations
# Joining associations with diseases where the ancestors column is exploded:
.join(
diseases.select(
f.col('id').alias('diseaseId'),
f.col('name').alias('diseaseName'),
f.explode(f.col('ancestors')).alias('resolvedDiseaseId'),
f.col('therapeuticAreas')
),
on='diseaseId', how='left'
)
# Joining with the resolved disease dataset:
.join(resolved_phenotypes_measurements, on='resolvedDiseaseId', how='left')
.drop('resolvedDiseaseId')
# Aggregating associations:
.groupBy(['diseaseId', 'targetId'])
.agg(
f.first('score').alias('score'),
f.first('diseaseName').alias('diseaseName'),
f.first('therapeuticAreas').alias('therapeuticAreas'),
f.collect_set('resolvedDiseaseName')
)
.persist()
)
Which gives rows like this:
diseaseId | EFO_0004736
targetId | ENSG00000115977
score | 0.021152588627945577
diseaseName | aspartate aminotransferase measurement
therapeuticAreas | [EFO_0001444]
collect_set(resolvedDiseaseName) | [liver enzyme measurement, protein measurement] ````````
Please let me know if I misunerstood anything.