Cannot reproduce python code of 'Accessing and querying datasets'

Dear all,

I tried to reproduce the python code provided at the link Download datasets - Open Targets Platform Documentation.

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# path to ClinVar (EVA) evidence dataset 
# directory stored on your local machine
evidencePath = "local directory path - e.g. /User/downloads/sourceId=eva"

# establish spark connection
spark = (
    SparkSession.builder
    .master('local[*]')
    .getOrCreate()
)

# read evidence dataset
evd = spark.read.parquet(evidencePath)

# Browse the evidence schema
evd.printSchema()

# select fields of interest
evdSelect = (evd
 .select("targetId",
         "diseaseId",
         "variantRsId",
         "studyId",
         F.explode("clinicalSignificances").alias("cs"),
         "confidence")
 )
evdSelect.show()

# Convert to a Pandas Dataframe
evdSelect.toPandas()

However I got the following output

2021-11-28 15:36:34 WARN  Utils:66 - Your hostname, mac503381.local resolves to a loopback address: 127.0.0.1; using 192.168.113.132 instead (on interface en1)
2021-11-28 15:36:34 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2021-11-28 15:36:36 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Py4JJavaError                             Traceback (most recent call last)
~/opt/anaconda3/envs/my-rdkit-env/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/opt/anaconda3/envs/my-rdkit-env/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o27.parquet.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/arr4007/Documents/Projects/Alex/COVPDB/Scripts/local directory path - e.g. /User/downloads/sourceId=eva;
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:643)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
~/Documents/Projects/Alex/COVPDB/Scripts/Pathology_classificator.py in <module>
     63 
     64 # read evidence dataset
---> 65 evd = spark.read.parquet(evidencePath)
     66 
     67 # Browse the evidence schema

~/opt/anaconda3/envs/my-rdkit-env/lib/python3.7/site-packages/pyspark/sql/readwriter.py in parquet(self, *paths)
    314         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
    315         """
--> 316         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    317 
    318     @ignore_unicode_prefix

~/opt/anaconda3/envs/my-rdkit-env/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

~/opt/anaconda3/envs/my-rdkit-env/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Path does not exist: file:/Users/arr4007/Documents/Projects/Alex/COVPDB/Scripts/local directory path - e.g. /User/downloads/sourceId=eva;'

any idea what it might be?

Hello @arr88! :wave:

Welcome to the Open Targets Community! :tada:

After reviewing your script, I can see that the exception is raised because it cannot find the files at the path you provided:

The script is looking for the following path:

/Users/arr4007/Documents/Projects/Alex/COVPDB/Scripts/local directory path - e.g. /User/downloads/sourceId=eva;

Did you download the files into another directory on your local machine?

If so, you can update the evidencePath variable with the correct location:

evidencePath = "local directory path - e.g. /User/downloads/sourceId=eva"

Once updated, the script should work!

Good luck! :slight_smile:

~ Andrew

Hello @ahercules ,
Thanks for your help! :blush:
I have edited the variable name pointing to directory where the .parquet files are stored and re-run the script.

This is the code

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# path to ClinVar (EVA) evidence dataset 
# directory stored on your local machine
evidencePath = "/Users/arr4007/Downloads/diseases"

# establish spark connection
spark = (
    SparkSession.builder
    .master('local[*]')
    .getOrCreate()
)

# read evidence dataset
evd = spark.read.parquet(evidencePath)

# Browse the evidence schema
evd.printSchema()

# select fields of interest
evdSelect = (evd
 .select("targetId",
         "diseaseId",
         "variantRsId",
         "studyId",
         F.explode("clinicalSignificances").alias("cs"),
         "confidence")
 )
evdSelect.show()

# Convert to a Pandas Dataframe
evdSelect.toPandas()

Then I got the following error

AnalysisException                         Traceback (most recent call last)
~/Documents/Projects/Alex/COVPDB/Scripts/Uniprot_searcher.py in <module>
     69          "studyId",
     70          F.explode("clinicalSignificances").alias("cs"),
---> 71          "confidence")
     72  )
     73 evdSelect.show()

~/opt/anaconda3/envs/Bioservices/lib/python3.7/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1318         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1319         """
-> 1320         jdf = self._jdf.select(self._jcols(*cols))
   1321         return DataFrame(jdf, self.sql_ctx)
   1322 

~/opt/anaconda3/envs/Bioservices/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

~/opt/anaconda3/envs/Bioservices/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve '`targetId`' given input columns: [ontology, dbXRefs, parents, children, code, therapeuticAreas, indirectLocationIds, id, obsoleteTerms, description, sko, descendants, name, directLocationIds, ancestors, synonyms];;\n'Project ['targetId, 'diseaseId, 'variantRsId, 'studyId, explode('clinicalSignificances) AS cs#205, 'confidence]\n+- Relation[id#173,code#174,dbXRefs#175,description#176,name#177,directLocationIds#178,obsoleteTerms#179,parents#180,sko#181,synonyms#182,ancestors#183,descendants#184,children#185,therapeuticAreas#186,indirectLocationIds#187,ontology#188] parquet\n"

I assume the error is because there is no acceptable ‘targetId’ name as input? This means I should pick up other accepted suggestions e.g. parents, code, id…
Bests
Atilio

Hi @arr88!

Correct - the targetId field is not present in the diseases dataset that you are parsing.

If you run the following script, you will see the schema for a given dataset and that will let you know what fields are available:

# import relevant libraries
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

# create Spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .getOrCreate()
)

# set location of diseases dataset (Parquet format)
disease_data_path = "/Desktop/platform-data-analysis/data/diseases"

# read diseases dataset
disease_data = spark.read.parquet(disease_data_path)

# print diseases dataset schema
disease_data.printSchema()

For example, the schema for the diseases dataset is below:

root
 |-- id: string (nullable = true)
 |-- code: string (nullable = true)
 |-- dbXRefs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- directLocationIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- obsoleteTerms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- parents: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sko: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- synonyms: struct (nullable = true)
 |    |-- hasBroadSynonym: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- hasExactSynonym: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- hasNarrowSynonym: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- hasRelatedSynonym: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- ancestors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- descendants: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- therapeuticAreas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- indirectLocationIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ontology: struct (nullable = true)
 |    |-- isTherapeuticArea: boolean (nullable = true)
 |    |-- leaf: boolean (nullable = true)
 |    |-- sources: struct (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- name: string (nullable = true)

After reviewing the schema and the fields that are available, you can then query for specific fields and export to a Pandas dataframe:

# generate subset of diseases dataset with relevant fields
disease_data_subset = (disease_data.select(F.col("id").alias("disease_id"), "name", "description"))

# convert to Pandas dataframe
disease_df = disease_data_subset.toPandas()

# print first 5 rows of disease dataframe
disease_df.head(5)

Thanks again for your prompt help, :blush:
I can now access to the root and dig even further.

What key are the disease counts and phenotypes associated with a target passed?

E.g. CDK7 (UniProt: P50613) has 134 associated pathologies).
It would be interesting to use this information to rationalize/predict/organize the selection of my 350000 potential targets in my case study :thinking:

PS: note that I downloaded the Target - Disease evidence dataset from Open Targets Platform.

PPS: It can even be I picked up the wrong one…

@arr88, if you want to find the diseases and phenotypes associated with CDK7, you will need to use one of our associations datasets.

For example, if you use our “Associations - direct (overall score)” dataset, you can find diseases and phenotypes associated with CDK7 and combine with our “Disease/Phenotype” dataset to get key annotation information (e.g. disease name).

# import relevant libraries
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

# create Spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .getOrCreate()
)

# set location of associations dataset (Parquet format)
associations_data_path = "/Desktop/platform-data-analysis/data/associationByOverallDirect"

# read associations dataset
associations_data = spark.read.parquet(associations_data_path)

# print associations dataset schema
# associations_data.printSchema()

# create subset with relevant fields
associations_data_subset = (associations_data.select("targetId","diseaseId", F.col("score").alias("overallAssociationScore")))

# set location of diseases dataset (Parquet format)
disease_data_path = "/Desktop/platform-data-analysis/data/diseases"

# read diseases dataset
disease_data = spark.read.parquet(disease_data_path)

# print diseases dataset schema
# disease_data.printSchema()

# create subset with relevant fields
disease_data_subset = (disease_data.select(F.col("id").alias("diseaseId"), F.col("name").alias("diseaseLabel")))

# merge associations and diseases data
output = (associations_data_subset
              .join(disease_data_subset, on="diseaseId", how="inner")
         )

# show output of merged data
# output.show()

# convert output to pandas dataframe
output_df = output.toPandas()

# filter dataframe for CDK7 (ENSG00000134058)
output_df = output_df[output_df["targetId"] == "ENSG00000134058"]

# print length of filtered dataframe
print(len(output_df))

# # export dataframe to CSV
output_df.to_csv("CDK7_associated_diseases.csv")

To learn more about our associations, including the differences between direct and indirect associations, please read our associations documentation.

Hello @ahercules,
Thanks for the suggestion of how to improve the script.
I rewrote it to simplify database access. Only thing I did was to merge the json files with jq. The result is the same with only two modules - maybe it could be useful to other users. Except for the Associations - direct (overall score), the length of the three datasets matches the length given in version 21.11 (Release notes - Open Targets Platform Documentation).

Bests
Atilio

import os
import pandas as pd

associations_data_path = "/Users/atilioreyesromero/Desktop/DATA/associationByOverallDirect"
df_association=pd.read_json(os.path.join(associations_data_path, "associations.json"))
# print(len(df_association.index))
#2139835

disease_data_path = "/Users/atilioreyesromero/Desktop/DATA/diseases".
df_diseases=pd.read_json(os.path.join(disease_data_path, "diseases.json"))
# print(len(df_diseases.index))
#18706

drugs_data_path = "/Users/atilioreyesromero/Desktop/DATA/molecule"
df_drugs=pd.read_json(os.path.join(drugs_data_path, "molecules.json"))
# print(len(df_drugs.index))
#12594

# filter dataframe for CDK7 (ENSG00000134058)
print((df_association["targetId"] == "ENSG00000134058").sum())
#134

Thanks for sharing @arr88! :slight_smile: