Nov 28, 2016

Optimizing Your Spark Cluster for Various Scenarios

Given a defined scenario for your system, how can you meet requirements?

Asking for more machines, memory and cores is always a valid idea, but your CFO may not agree and may decline your request.

"Know Your Data"
As Demi Ben Ari says: "know your data" is the answer.
You must understand your business needs to get more from your cluster.
By understading your needs you will be able to run more tasks faster.

Moreover, if you utilize cloud computing resources such as AWS, better understanding will let you select the right type of instances and getting more for same money (or even less as we will see next)

What can be controlled?
In this post, I will focus on YARN based clusters, but concepts can be applied to Mesos and stand alone clusters as well. Some more information can be found as well at Cloudera post.

How Many Nodes Do I Have?
Take a look at the number of servers installed w/ NodeManagers. This is the number of nodes you can spin (in this case we actually have 4, although there are total 8 YARN clients installed).
A node is an actual physical or virtual server (depends on your environment)

Define YARN Limitations:
yarn.nodemanager.resource.memory-mb: 
Total memory on a single node used by all executors 
(NodeMemInGB - 1)*1024

yarn.nodemanager.resource.cpu-vcores:
Total cores on a single node used by all executors
(NodeCores - 1)


Define Spark Executors Balancing
--executor-cores / spark.executor.cores: 
The number of cores per executor (number of tasks executor can run).

--executor-memory / spark.executor.memory: 
The executor memory (Actual upper limit is 64GB due to GC).
To which spark.yarn.executor.memoryOverhead will be added (default to max(384, .07 * spark.executor.memory))

Application master also requires default of 1GB and 1 core that runs on the driver and therefore we'll always decrease a single core and 1GB in our calculations. These can be controlled by the following parameters:
  --driver-memory

  --driver-cores

--num-executors / spark.executor.instances: 
The number of executors for a single job
Can be avoided using spark.dynamicAllocation.enabled

The massive interactive small data scenario
This is not a common scenario, but some people may want to run large number of tasks on relative small data.
This might not be the best practice for a Big Data, but it will provide us a good understanding of Spark cluster sizing based on this edge case.

We'll start w/ a default 4 nodes m4.xlarge 4 vCores, 16GB (on demand cost $0.239/hour of $717/month)

1. First we will maximize the resources for any given node:
yarn.nodemanager.resource.memory-mb: (16-1)*1024 = 15,360MB
Memory Before and After

yarn.nodemanager.resource.cpu-vcores: (4-1) = 3 vCores

CPU Before and after

Containers Before and After
2. Then we will maximize the number of executors, by minimize the number of cores per executor to 1
spark.executor.cores: 1
spark.executor.memory: (16-1)*(1-0.07)/3 = 4
spark.executor.instances: 3/1*4nodes = 12 executors

The Results
After finishing configuration, we enabled 12 tasks that run 24x7 using a manual schedualer written in our lab.
As you can see number actually fit our design: CPU jumps and hits the 100%, while cluster remain stable and not crushes.


The cluster utliliuzation jump after conifiguration to 12 executors and running 8 tasks every minute


The cluster utilization hits 100% CPU w/ 11 tasks every minute


Optimizing the Instance Types
Since our bottleneck is CPU vCores, by knowing our data we should seek another type of AWS instance. c3.2xlarge is an acutal match as the C3/4 series is optimized to provide more CPU and cores for less money (a c4.2xlarge instance has w/ 8 vCores and 15GB at $0.419/hour, so for $629/month we'll get 2 c4.2xlarge nodes).

1. First we will maximize the resources for any given node:
yarn.nodemanager.resource.memory-mb: (15-1)*1024 = 14,336MB
yarn.nodemanager.resource.cpu-vcores: (8-1) = 7 vCores

2. Then we will maximize the number of executors, by minimize the number of cores per executor to 1
spark.executor.cores: 1
spark.executor.memory: (15-1)*(1-0.07)/7 = 1
spark.executor.instances: 7/1*2nodes = 14 executors

Bottom Line
With some little tweeking, we could get 17% more exectores for 12% less money, or bottom line of 33% more value/money!

Keep Performing,
Moshe Kaplan

P.S You should explore more ways to save money on this kind of architrecture by utilizing:
 - Reserve instacnes
 - Spot instances

Nov 13, 2016

Creating LVM disk for MongoDB: Right or Not?

One of the benefits of using replicated environments is the option of having large high IOPS disks using commodity disks, without the need to create complex HA disk based solutions like RAID 10.

We wil demostrate this method by creatign a LVM based disk array.

Creating the LVM Disk Array
Based on environment w/ two disks:
/dev/xvdb
/dev/xvdc

0. Setup prequistions
sudo apt-get -y install lvm2 xfsprogs
1. Create physical volume for the two devices
sudo pvcreate /dev/xvdb /dev/xvdc
2. Create a Volume Group 
sudo vgcreate vg_data /dev/xvdb /dev/xvdc
3. Create new logical volume from all the space on the disks:
sudo lvcreate -l 100%FREE -n data vg_data
4. Create xfs file system
sudo mkfs.xfs -f /dev/vg_data/data
5. Mount the disk
sudo echo "/dev/vg_data/data /var/lib/mongodb xfs defaults 0 2" >> /etc/fstab && sudo mount -a 
sudo chown -R mongodb:mongodb /var/lib/mongodb
Note: This method may be effective only when you have large number of disks (>3) and relatively high disk space utilization. Otherwise, you may face high IOPS utilization only on part of the disks as can be seen below
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           3.77    0.00    0.42    0.14    0.02   95.65
Device:            tps    kB_read/s    kB_wrtn/s    kB_read    kB_wrtn
xvda              0.00         0.00         0.00          0          0
xvdb            176.00         0.00     21294.00          0      42588
xvdc              8.00         0.00        16.00          0         32

dm-0            170.00         0.00     20560.00          0      41120

Bottom Line
Choosing LVM can be a great soluton to support your big data intallation. However, you should test your case to verify it is the right solution for your needs,

Keep Perforrming,
Moshe Kaplan

Sep 6, 2016

Ruby is Dead! (You nead to take care of its memory issues)

One of the common problems with Ruby is its memory usage, just like JVM based languages (yes Java, I'm talking about you). Actually Ruby just like Java is based on garbage collector. This GC if not used correctly, can cause "stop the world scenarios" (as we can see in the attached diagram). In this cases, GC actually stops responding while consuming servers whole CPU and in some cases even causes server reboot.
Unicorn and Ruby memory leak causes server downtime
What can be done?

1. Reboot your Ruby Periodically

If you are using the popular Unicorn web server, memory issues may be even more severe, as Unicorn forks. During forking, it's copying the whole memory footprint of the parent (Copy on Write or CoW). Therefore, you may use the "Unicorn Worker Killer" gem that will monitor your server and gracefully will restart specific worker when memory reaches a new high or specific number of requrests were served. Since the gem supports randomization, it assures in a very high probablilty that the server will continue server.

Installing the killer:
gem 'unicorn-worker-killer'

Add to config.ru above "require ::File.expand_path('../config/environment', __FILE__)"
# Unicorn self-process killer
require 'unicorn/worker_killer'

And decide your worker graceful reboot method: 
# Max requests per worker
use Unicorn::WorkerKiller::MaxRequests, 3072, 4096
# Max memory size (RSS) per worker
use Unicorn::WorkerKiller::Oom, (192*(1024**2)), (256*(1024**2))
If you are using Ruby 2.X, you can exploit the CoW by better configuring Unicorn.
config/unicorn.rb (and a detailed explenation on setting at sirupsen)
 - worker_processes: 1x your cores
 - timeout: worker request timeout, should be 15 to 30 sec max
 - preload_app: enables CoW, but requires managning connect/disconnect on fork


3. Take care of GC configuration
You can take a look at a detailed discussion at collective idea's post.

Bottom Line
Dynamic languages has their downs, yet w/ the right design you can keep them up and running,

Keep Performing,

Jun 27, 2016

Neo4j Cluster Performance Tuning

Neo4j is one of the leading graph database these days, and it is very popular in recommendation systems, fraud detetion and social networks scenarios.

While the single instance (that is included in the community edition) performs very well (usually w/ under 10ms response time), You may face challenges in cluster mode

Why Should You Expect for Performance Degradation in Neo4j Cluster?
Two simple reasons:
  • Neo4j cluster is a Master-Slave cluster w/ an auto failover method (much like MongoDB). However, unlike MongoDB, primary node deteciton by client is done by a server side load balancer and not by the client's driver. 
  • Cluster replication is syncronious by default, unlike MongoDB async default behviour.
How Much will it Cost us?
  • The various nodes of the cluster should be behind a LB. If you select AWS ELB, it will cost you 7 to 30ms according to our measures below. The ELB latency is increased as request and response become larger (see details on the bottom). Note: impelementing a MongoDB like driver could be a great improvment and will help saving this latency and minimize system cost. and it's a great idea for a side project!
  • The nodes behind the ELB replicate changes.from master to clients. The level of syncronization is controlled by the ha.tx_push_factor parameter w/ a default value of 1. This parameter controls the number of slaves that should recieve the commit before answering the client. By setting it to 0, you avoid syncronization and get a similar result to a single node. Changing the factor wil save 70ms at average (and much more at peak time), and will leave us w/ an average 40ms per query (inc. ELB cost).
You can find the diffrences below, where in the tested environment a community edition instance was replaces by a 3 nodes cluster behind an ELB:
  • In the left section you can see an average of 9ms in the initial state (single community edition instance)
  • In the middle you can see a flactuating response time of 40ms (reads) to 300ms (writes) for a 3 nodes cluster behind ELB w./  ha.tx_push_factor parameter w/ default value 1.
  • In the right section you can see a steady 40ms for both reads and writes for or a 3 nodes cluster behind ELB w./  ha.tx_push_factor parameter w/ value set to 0 (async replication).
Neo4j performance as measured by DataDog client side metrics
Bottom Line
HA have some cost by its side. Better implmementation of the load balancing and right seleciton of syncronization model, can help you gain the needed performance

Keep Performing,


Measure from Inside the Server:
> mytime="$(time ( curl http://localhost:7474/db/manage/server/ha/master ) 2>&1 1>/dev/null )"
> echo "$mytime"
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    581      0 --:--:-- --:--:-- --:--:--   666

real    0m0.006s
user    0m0.005s
sys     0m0.000s

Measure from the Application Server Direct to Master:

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    988      0 --:--:-- --:--:-- --:--:--  1333

real    0m0.006s
user    0m0.000s
sys     0m0.000s

Measure from the Application Server to Master through the ELB:

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    417      0 --:--:-- --:--:-- --:--:--   444

real    0m0.015s
user    0m0.000s
sys     0m0.000s

Some More Measures to Gain Data to Explore the Neo4j Performance:
  • Enable slow log query and filter server time
  • Install monitoring like Datadog in the application level:

Mar 2, 2016

Providing MongoDB User Granular Access to User Cluster

Unlike a single instance MongoDB setup or even a ReplicaSet one, when it gets to a Sharded installation, things may get thougher.

For example, if you gave a user a reading permissions to use MongoChef (a most recommended MongoDB client), when it comes to a clustered intallation, in order to avoid the "not authorized to run inprog" error when running db.currentOp(), you should provide the user with some more permissions (in this case the inprog permissions).

Actually it is pretty simple, but it is also a good example for a secured environment management:

Providing inprog Permissions

1. Get to the admin database
use admin; 

2. Authorize as a permitted user
db.auth("admin","admin_password");

3. Create a new role that will have permissions to manage the processes
db.createRole(

role: "manageOpRole", 
privileges: [ 

resource: { cluster: true }, 
actions: [ "killop", "inprog" ] 
}, 

resource: { db: "", collection: "" }, 
actions: [ "killCursors" ] 

], 
roles: [] 

);

4. Provide the permissions to the user:
db.grantRolesToUser(
"reading",
[
      { role: "manageOpRole", db: "admin" }
    ]
);

5. Authenticate as the reading user
db.auth("reading","reading_password");

6. Verify things actually work! (or doing the definition of done);
db.currentOp()

Bottom Line
Simple, tested and secured like we always love our environments!

Keep Performing,
Moshe Kaplan

Feb 25, 2016

Spark, Python and Windows

Yetxtit may not be common, but if you want a quick start for a Windows guy on the hotest Big Data platform around, you will find this tutorial relevant for you:

Get the Needed Prerequistes Software
git (for building Spark)
http://git-scm.com/download/win
Install and verify git was added to your path

Python 3.5.1
https://www.python.org/ftp/python/3.5.1/python-3.5.1-amd64.exe
Or even better, try the Anaconda version

Define Java
Get Java 8 (Oracle JDK):
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Set JAVA_HOME environment variable to c:\Program Files\Java\jdk1.8.0_73\ (don't use double quotes if you are used to it)

Prepare SBT
Get SBT 0.13.11 (for building spark compilation)
https://dl.bintray.com/sbt/native-packages/sbt/0.13.11.2/sbt-0.13.11.2.msi

Spark build is huge, therefore we need to increase the memory limit of the sbt from 256MB to something larger (2048MB for example, but saw some cases where 6GB were needed). This can be performed by modify the sbt runner file (C:\Program Files (x86)\sbt\bin\sbt.bat):
"%_JAVACMD%" %_JAVA_OPTS% %SBT_OPTS% -Xmx2048m -Xms2048m -cp "%SBT_HOME%sbt-launch.jar" xsbt.boot.Boot %*

Or better, change these settings at C:\Program Files (x86)\sbt\conf\sbtconfig.txt

Buid Spark
Download Spark 1.6 (No need to intall now, see details below)
http://www.apache.org/dyn/closer.lua/spark/spark-1.6.0/spark-1.6.0.tgz
  1. Extract the source using 7zip (or anything else that can use tgz files) for example to C:\Spark
  2. Build spark:
    1. cd C:\Spark
    2. "c:\Program Files (x86)\sbt\bin\sbt.bat" package
    3. "c:\Program Files (x86)\sbt\bin\sbt.bat" assembly
Get Winutils
Create Hadoop folder (for example c:\hadoop)
Define HADOOP_HOME environment variable (c:\hadoop)
Download to c:\hadoop\bin the winutils.exe file

Run It Like a Pro
Create a test file c:\spark\verify.txt and enter to it the following text: this is a trial
Run pyspark:
> c:\spark\bin\pyspark
In pyspark enter the following code:
text_file = sc.textFile("file://c:/spark/verify.txt")
text_file.collect()
The file should be printed

Keep Performing,
Moshe Kaplan

ShareThis

Intense Debate Comments

Ratings and Recommendations