Some Details About Our Process
We run a Scala (2.1) based job on a Spark 2.2.0/EMR 5.9.0 cluster w/ 64 r3.xlarge nodes.
The job analyzes several data sources each of few houndred GB (and growing) using the dataframe API and output data to S3 using ORC format.
How Did We Recover?
Analyzing the logs of the crashed cluster resulted w/ the following error:
WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Setting the spark.yarn.executor.memoryOverhead to 2500 (the maximum on the instance type we used r3.xlarge) did not make a major change.
spark-submit --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=2500 ...
We raised the bar by disabling the virtual and physical memory checks and increasing the virtual to physical memory ratio to 4 (This is done step 1: Software and Steps of EMR creation by setting the following value of Edit software settings)
[
{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},{"classification":"yarn-site","properties":{" yarn.nodemanager.vmem-pmem-ratio":"4","yarn.nodemanager.pmem-check-enabled":"false","yarn.nodemanager.vmem-check-enabled":"false"}}
]
However, this made the magic till hitting the next limit (probably spark tasks were killed when they trying to abuse the physical memory) with the following error:
ExecutorLostFailure (executor exited caused by one of the running tasks) Reason: Container marked as failed: container_ on host:. Exit status: -100. Diagnostics: Container released on a *lost* node
This one was solved by increasing the number of dataframe partitions (in this case from 1024 to 2048), that reduced the needed memory per partition.
Note: if you want to change dataframe default partitions number (200) use the following:
setConf("spark.sql.shuffle.partitions", partitions.toString)
setConf("spark.default.parallelism", partitions.toString)
If you want to take another look on the default partitioning and how to automate the numbers, take a look at Romi Kuntsman's lecture.
Right now, we run in full power ahead. yet when we may hit the next limit, it may worth an update.
Bottom Line
As Spark heavily utilizes cluster RAM as an effective way to maximize speed, it is highly important to monitor it and verify your cluster settings and partitioning strategy meet your growing data needs.
Keep Performing,
Moshe Kaplan
ExecutorLostFailure (executor exited caused by one of the running tasks) Reason: Container marked as failed: container_ on host:. Exit status: -100. Diagnostics: Container released on a *lost* node
This one was solved by increasing the number of dataframe partitions (in this case from 1024 to 2048), that reduced the needed memory per partition.
Note: if you want to change dataframe default partitions number (200) use the following:
setConf("spark.sql.shuffle.partitions", partitions.toString)
setConf("spark.default.parallelism", partitions.toString)
If you want to take another look on the default partitioning and how to automate the numbers, take a look at Romi Kuntsman's lecture.
Right now, we run in full power ahead. yet when we may hit the next limit, it may worth an update.
Bottom Line
As Spark heavily utilizes cluster RAM as an effective way to maximize speed, it is highly important to monitor it and verify your cluster settings and partitioning strategy meet your growing data needs.
Keep Performing,
Moshe Kaplan