Issue getting target-associated information via pyspark

Hello,
I tied to download dataset and select interest data via pyspark,here is my code:

# create Spark session
spark = (
    SparkSession.builder
    .master('local[*]')    
    .config('spark.executor.memory', '2g') 
    .config('spark.driver.memory', '1g')
		.config("spark.executor.cores", '4')
    .config('spark.cores.max', '8')  
    .config('spark.default.parallelism', '16')
		.config('spark.logConf', True)
		.config('spark.network.timeout', 300)
		.getOrCreate()
)

# set location of dataset (Parquet format)
drug_data_path = "Data/opentarget/molecule"
targets_data_path = "Data/opentarget/targets"
disease_data_path = "Data/opentarget/diseases"
evidence_data_path = "Data/opentarget/evidence"
associations_direct_data_path = "Data/opentarget/associationByOverallDirect"
associations_indirect_data_path = "Data/opentarget/associationByOverallIndirect"

# read dataset
drug_data = spark.read.parquet(drug_data_path)
targets_data = spark.read.parquet(targets_data_path)
disease_data = spark.read.parquet(disease_data_path)
evidence_data = spark.read.parquet(evidence_data_path)
associations_direct_data = spark.read.parquet(associations_direct_data_path)
associations_indirect_data = spark.read.parquet(associations_indirect_data_path)


# create subset with relevant fields
drug_data_subset = (drug_data.select(
											"drugType",
											F.col("id").alias("drugId"),
											F.col("name").alias("drugName"))
										)


targets_data_subset = (targets_data.select(
												F.col("id").alias("targetId"),
												F.col("tractability.modality"), 
												F.col("pathways.pathway"))
											 )

disease_data_subset = (disease_data.select(
												F.col("id").alias("diseaseId"),
												F.col("name").alias("diseaseLabel"),
												F.col("directLocationIds"),
												F.col("therapeuticAreas"))
											  )

evidence_data_subset = (evidence_data.select(
												"diseaseId", "drugId", "literature",
												F.col("clinicalPhase").alias("Max clinical phase"),
												F.col("datatypeId").alias("Datatype"),
												F.col("resourceScore").alias("Resource Score"))
												)

associations_direct_data_subset = (associations_direct_data.select(
																		"targetId","diseaseId",
																		 F.col("score").alias("overallAssociationScore"))
																	 )

associations_indirect_data_subset = (associations_indirect_data.select(
																			"targetId","diseaseId", 
																			 F.col("score").alias("overallAssociationScore"))
																			)



# merge associations and diseases data
output_direct = (associations_direct_data_subset \
									.join(evidence_data_subset, on="diseaseId", how="inner") \
									.join(disease_data_subset, on="diseaseId", how="inner")	 \
									.join(drug_data_subset, on="drugId", how="full"))



query_id = 'ENSG00000105974'
query_result = output_direct.filter(output_direct.targetId == query_id)
query_result_df = query_result.toPandas()

all things are fine until the merge of data, it confused me and could someone help me out?

I tried the follow code,it works fine:

output_direct = (associations_direct_data_subset \
  .join(disease_data_subset, on="diseaseId", how="inner")	 \
  .join(target_data_subset, on="targetId", how="inner"))

maybe, the error is associated with evidence dataset? here is error message:

Py4JJavaError: An error occurred while calling o141.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 7.0 failed 1 times, most recent failure: Lost task 13.0 in stage 7.0 (TID 35) (192.168.1.118 executor driver): java.io.FileNotFoundException: /tmp/blockmgr-7458735d-1fb4-4367-a76d-4b611854f637/19/temp_shuffle_f9cfc1ff-e579-4041-90af-3bbd97a7d55f (Too many open files)

Nice integration of datasets.

I think the join of evidence + associations only using “diseaseId” and without any prior aggregation or distinct might be resulting in a significantly large dataset. Probably in the order of billions of records. That’s ok for Spark if this is what you are after, but the moment you try to export it to pandas it struggles to put that much amount of data in memory.

I can only guess what you are trying to do but I would suggest revising if the datasets you are trying to join really contain the information you are trying to get.

it seems just like you said, maybe the evidence dataset is too big. and even I tried to add filter conditions as follow:

evidence_data_subset = (evidence_data
                        .select(
                          "datasourceId", "targetId", "diseaseId",
                          "drugId", "literature", "studyId",
                          F.col("reactionId").alias("reaction type"),
                          F.col("reactionName").alias("Mechanism of reaction"),
                          F.col("clinicalPhase").alias("Max clinical phase"),
                          F.col("datatypeId").alias("Datatype"),
                          F.col("resourceScore").alias("Resource Score"))
                        .dropna(how="any", subset=["drugId", "Resource Score"])
                        .dropDuplicates(["targetId", "diseaseId", "drugId"])
                        )

evidence_query = evidence_data_subset
                  .filter(
                     (evidence_data_subset.targetId == "ENSG00000105974") & 
                     (evidence_data_subset.diseaseId == "EFO_0000589")
                  )
evidence_query_df = evidence_query.toPandas()

the same error was appeared, it confused me? how can I get the all evidence of a union of (targetId & diseaseID) via download data? just like the website search results? and the this count of evidence seems not too big:


a = associations_data_subset.filter(
    (associations_data_subset.targetId == "ENSG00000105974")& 
    (associations_data_subset.diseaseId == "EFO_0000589")
    )


a.show()

Output from spyder call 'get_namespace_view':
+---------------+-----------+-------------+------------------+-----------------------+
|       targetId|  diseaseId|evidenceCount|Direct association|overallAssociationScore|
+---------------+-----------+-------------+------------------+-----------------------+
|ENSG00000105974|EFO_0000589|            6|                 1|   0.016912944654587342|
|ENSG00000105974|EFO_0000589|          285|                 0|     0.6873630862076273|
+---------------+-----------+-------------+------------------+-----------------------+


Could you paste a reproducible example?

here is my code ,I want to use target_id and disease id to find some evidence.

def get_evidence(target_id, disease_id)

    spark = (
            SparkSession.builder
            .master('local[*]')    
    		.getOrCreate()
    )
    
    try:
        # set location of dataset (Parquet format)
        drug_data_path = "Data/opentarget/molecule"
        evidence_data_path ="Data/opentarget/evidence/sourceId=chembl"
        
        
        # read dataset
        drug_data = spark.read.parquet(drug_data_path)
        evidence_data = spark.read.parquet(evidence_data_path)
        
        # create subset with relevant fields           
        drug_data_subset = (drug_data
                            .select(
                              'drugType','hasBeenWithdrawn','withdrawnNotice',
                              F.col('id').alias('drugId'),
                              F.col('name').alias('drugName'))
                            )  
  
        evidence_data_subset = (evidence_data
                                .select(
                                  'datasourceId', 'targetId', 'diseaseId',
                                  'drugId', 'studyId', 'literature',
                                  F.col('reactionId').alias('reaction type'),
                                  F.col('reactionName').alias('Mechanism of reaction'),
                                  F.col('clinicalPhase').alias('Max clinical phase'),
                                  F.col('datatypeId').alias('Datatype'),
                                  F.col('resourceScore').alias('Resource Score'))
                                )
  
  
        # merge drug and evidence data 
        evidence_info = (evidence_data_subset
                         .join(drug_data_subset, on='drugId', how='leftouter')
                         )
  
        evidence = evidence_info
                   .filter(
                   (evidence_info.targetId == target_id) & 
                   (evidence_info.diseaseId == disease_id)
                   ).toPandas()
    
    except Exception as e:
        print("Exception in Disease Association function, pt2:")
        print(e)
        print_exc()
    return evidence


target_id = "ENSG00000105974"
disease_id = "EFO_0000589"
get_evidence(target_id, disease_id)

but the error appeared,and the association dataset shows that the number of evidence of the union of (targetId & diseaseID) is 285, seems not too big

Py4JJavaError: An error occurred while calling o141.collectToPython.

1 Like

If you haven’t been able to resolve this, could you provide the full stack trace.

My working theory would be that there isn’t enough memory in the driver for the toPandas call, as the documentation carries a warning that “This method should only be used if the resulting Pandas’s DataFrame is expected to be small, as all the data is loaded into the driver’s memory.”

1 Like

Maybe you can put all the code in a github page and then Jarrod can test the full code.

Hi @recherHE,

thank you for the snippet above! I have been able to run it without any errors, however the resulted dataframe is empty because we don’t have any ChEMBL evidence relating ENSG00000105974 and EFO_0000589.

Empty DataFrame
Columns: [drugId, datasourceId, targetId, diseaseId, studyId, literature, reaction type, Mechanism of reaction, Max clinical phase, Datatype, Resource Score, drugType, hasBeenWithdrawn, withdrawnNotice, drugName]
Index: []

This is actually consistent with the data in the Platform (ENSG00000105974/EFO_0000589).
Have you been able to solve it? Please let us know if we used to have ChEMBL evidence for this association so that we can report it back to their team.

Thanks for your question!
Best,
Irene