Jul 21, 2017

Cassandra Design Best Practices

Cassandra is a great NoSQL product, it provides near real time performance for designed queries and enables high availablity w/ linear scale growth as it uses the eventually consistent paradigm.
 In this post we will focus on some best practices for this great product: 

 How many nodes do you need? 
  1. Number of nodes should be odd in order to support votes during downtime/network cut.
  2. Minimial number should 5, as lower number (3) will result in high stress on the machines during node failure (replicaiton factor is 2 in this case, and each node will have to read 50% of the data and write 50% of data. When you select replication factor 3, each node will need to read 15% of data and write 15% of the data. Therefore, recovery will be much faster, and higher chances performance and availablility will not be affected.
C* like any other data store loves fast disks (SSD) although its SSTables and INSERT only architecture and as much memory as your data.
In particular your nodes should be 32GB to 512GB RAM each (and not less than 8GB in produciton and 4GB in development). This is a common issue since C* was coded in Java. For small machines you should avoid G1 and keep w/ CMS.

JVM heap size should be max of 8GB too avoid too long "stop the world" during GC.
If you feel the default Heap size (max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB)) does not fit your needs, try to set it between 1/4 to 1/2 of your RAM, but not more than 8GB. 

C* is also CPU intensive and 16 cores are recommended (and not less than 2 cores for development).

Repair and Repair Strategy
nodetool repair is probably one of the most common tasks on C* cluster. 
  1. You can run it on a single node or on a whole cluster.
  2. Repair should run before reaching the gc_grace_seconds (default 10 days) that will remove thombstones
  3. You should run it durring off-peak hours (probably during weekend) if you keep w/ the gc_grace_seconds.
  4. You can take this numbers down, but it will affect your backup and recovery strategy (see details about recovery from failure using hints).
You can optimize the repair process by using the following flags:
  1. -seq: repair token after token: slower and safer
  2. -local: run only on the local data center to avoid downtime of both in any case
  3. -parallel: fastest mode: run on all datacenters in parallel
  4. -j: the number parallel jobs on a node (1-4), using more threads will stress the nodes, but will help end the task faster.
We recommend to select your strategy based on height of your peaks and the sensitivity of your data. If your system has the same level of traffic 24/7, consider doing things slow and sequencial.
The higher your peaks, the more stress your should do on your system during off peak hours.


There are several backup strategies you can have:
  1. Utilize your storage/cloud storage snapshot capabilities. 
  2. Use C*  nodetool  snapshot command. This one is very similar to your storage capabilities but enables backup only the data and not the whole machine.
  3. Use C* incremental backup that will enable point in time recovery. This process is not a daily process, but requires copying and managing small files all the time
  4. Mix C* snapshots and incremental backups to minimize the time of recovery while keeping the point of time recovery option.
  5. Snapshots and commit log: complex process to recover that supports point in time recovery, as you need to reply the commit log.
We recommend to use the daily snapshot if your data is not critical and you want to minimize your Ops costs, or the mix C* snapshots and incremental backup when you must have a point in time recovery.

Monitoring
There are several approaches to go:
  1. Commerical Software:
    1. DataStax OpsCenter solution: as almost every other OSS, DataStax that provides the commerical version of C*, provides a pay for managment and moniotring solution
  2. Commercial Service including
    1. NewRelic: provides a C* plugin as part of its platform
    2. DataDog: with a nice hint on what should be monitored.
  3. Use Open Source with common integration:
    1. Graphite, Grafana:or Prometheus: 3 tools that can work together or apart and integrated w/ time series and relevant metrics.
    2. Old style Nagios and Zabbix that provides communitry plugins
If you choose a  DIY solution there some hints that you can find in the commercial products and services and also in the folowing resources:
  1. Basic monitoring thresholds
  2. Nagios out of the box plugins that thresholds can be extracted from
For example:
  1. Heap usage: 85% (warning), 95% (error)
  2. GC ConcurrentMarkSweep: 9 (warning), 15 Error
Our recommendation is starting when possible with an existing service/product, get experience w/ the metrics that are relevant for your environment, and if needed, implement based on them your own setup.

Lightweight transactions are meant to enable case studies that requires sequence (or some type of transactions) in an eventually consistent environment.
Yet notice, that it's a minimal solution that is aimed to serialize tasks in a single table.

We believe that this is a good solution, but the if your data requires consistent soluton, you should avoid eventually consitent solution and look for SQL solution (with native transactions) or NoSQL solution like MongoDB.

C* Internals
What to know more? use the following videos or get the O'REILY book 

Bottom Line
C* is indeed a great product. However, it definittly not an entry level solution for data storage, and managing it requires skills and expertise

Keep Performing,
Moshe Kaplan

Jun 21, 2017

Installing Elastic Stack in a Single Click using Docker Compose

The Elastic team made a great work w/ packaging the various elastic stack compnents into Docker images.
Yet, this manual will help you boot the entire stack including Logstash and Kibana in a single click using Docker Compose.

1. Run the following command in your host machine 
sudo sysctl -w vm.max_map_count=262144

2. Install Docker Compose
In ubuntu for example use:
sudo apt-get -y install docker-compose

3. Create a docker-compose.yml according to the follow link:
Copy the docker-compose.yml to your machine

4. Start the cluster
sudo docker-compose up

5. Verify the cluster using the password changeme
curl -u elastic http://127.0.0.1:9200/_cat/health
Enter host password for user 'elastic':
1498046576 12:02:56 docker-cluster green 2 2 12 6 0 0 0 0 - 100.0%

6. And Kibana using 

Bottom Line
Docker changes the DevOps world as we know it, and complex tasks that took hours, can be done in few clicks

Keep Performing,

Mar 26, 2017

Docker is Amazing. Here are 10 tips how to use it!

Docker is Amazing!
If you are not sure yet why should you use it, think of it as Digital Ocean tutorials that are as a simple file you can always run over and over:

Step 1: Create a Dockerfile
Dockerfile includes the list of commands you are going to perform to create your image.
The most common commands in this file

FROM And MAINTAINER in the top to select image baseline and the one to blame things
FROM ubuntu:14.04
MAINTAINER Moshe Kaplan

RUN: run shell command:
RUN apt-get update

ADD: add a file from local directory to target in the docker image
ADD unicorn.rb /app/config/unicorn.rb

ENV: add an environment variable
ENV RAILS_ENV staging

Step 2: Make sure you have a forever command in the Dockerfile
Docker image stops as soon as its last command stops to run. If you don't have a service or ever running daemon (or you are not going to use this image as a base image), add an infinite loop  
CMD while true; do sleep 1000; done

Step 3: Install docker on target machine
Well, this is why you have Chef/Ansible/Puppet for
sudo yum -y install docker &  sudo service docker start

or in ubuntu:
sudo apt-get update &&\
sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D &&\
sudo apt-add-repository 'deb https://apt.dockerproject.org/repo ubuntu-xenial main' &&\
sudo apt-get update &&\
apt-cache policy docker-engine &&\
sudo apt-get install -y docker-engine

sudo systemctl status docker

Step 4: Build and Tag an Image
sudo docker build -t my_company/my_app .
sudo docker rm my_app

Use the -p flag to map internal port to actual external port while running the docker container
sudo docker create -p 80:80 --name my_app my_company/my_app

Step 5: Run and inspect your image
Start the container
sudo docker start my_app
sudo docker run -d -v /home/ubuntu/java:/home/ubuntu/java bat/spark

Show the running containers
sudo docker ps -a

Show the logs
sudo docker logs my_app

Connect to a running container
sudo docker exec -it my_app /bin/bash

Stop a running container
sudo docker stop my_app

Step 6: Perform tasks on github
If you are going to get code from github and deliver a bundled application use the following:
sudo vi ~/.ssh/id_rsa
sudo chmod 600 ~/.ssh/id_rsa
git clone git@github.com:company/repository.git
cd repository
bundle exec rake assets:precompile

Step 7: Working w/ AWS ECR
AWS ECR is a reporitory solution that can be used to get built images to being deployed in various solutions such CodeBuild

Set credentials
aws configure

Add to ECR
sudo $(aws ecr get-login --region us-east-1)
sudo docker build -t lemonade_app .
sudo docker tag my_app:latest AWS_ID.dkr.ecr.us-east-1.amazonaws.com/my_app:latest
sudo docker push AWS_ID.dkr.ecr.us-east-1.amazonaws.com/my_app:latest

Step 8: Attaching Volumes
If you want to attach a volume for the underneath host (for example for DB persistant data) you should define the mapped volume inside the image

RUN mkdir -p /data
VOLUME ["/data"]


and map it when you run the container
sudo docker run -d -v /data:/data bat/spark


Please notice that you can also map volume to another docker container, so you will be able to share data between containers (for example unix sockets):
The source container
sudo docker create -v /dbdata --name dbstore training/postgres /bin/true
The target container
sudo docker run -d --volumes-from dbstore --name db1 training/postgres

Step 9: Adding hosts to /etc/hosts file

Just menation them in the run command
sudo docker run -d --add-host=SERVER_NAME:127.0.0.1 bat/spark

Step 10: Debugging your application



When modifying files and need to debug, just restart the docker container, and rerun tasks
sudo docker restart [container_id]

Few more tips:
No sudo
"Cannot connect to the Docker daemon. Is the docker daemon running on this host?"
You probably run the command w/o sudo

No Response
sudo docker does not respond even for simple calls like sudo docker ps -a. 
1. Take a look at /var/log/docker
2. If you find out message like this one, delete the container by issuing:
sudo rm -rf /var/run/docker/libcontainerd/containerd/a7764ce62032af2fdb57d3016a68bd079590667ca07
time="2017-03-16T09:42:32.781797550Z" level=info msg="libcontainerd: new containerd process, pid: 2577"
time="2017-03-16T09:42:32.787797862Z" level=fatal msg="open /var/run/docker/libcontainerd/containerd/a7764ce62032af2fdb57d3016a68bd079590667ca073702a7b96ec2068ec7a73/state.json: no such file or directory"

Playing w/ dockers
sudo docker rm spark
sudo docker build -t bat/spark .
sudo docker create  --name spark bat/spark

sudo docker build -t bat/simulator .
sudo docker create  --name simulator bat/simulator
sudo docker run -d -v /home/ubuntu/java:/home/ubuntu/java bat/simulator
sudo docker logs peaceful_shirley

sudo docker build -t bat/processor .
sudo docker create  --name processor bat/processor
sudo docker run -d -v /home/ubuntu/java:/home/ubuntu/java bat/processor

sudo docker ps -a
sudo docker exec -it fervent_ardinghelli /bin/bash

Evaluate Failed Container
If your container fails to start, export its filesystem to a tar file and evaluate the relevant files. In this case we evaluate the unicorn log file:
mkdir content &&\
cd content &&\
sudo docker export [CONTAINTER_ID] > contents.tar &&\

tar -xvf contents.tar

sudo more shared/log/unicorn.stderr.log

Cleanup You Disk from Orphaned Volumes
You will probably keep see that disk fills up as you keep deploying, as old deploy volumes are no longer needed.
Therefore, on every deploy run the following script to clean the disk:
sudo docker volume ls -qf dangling=true | sudo xargs -r docker volume rm


And another recommended cleanup: 
Clean your old and unused images::
#delete old unused images
if [[ $(sudo docker images --filter "dangling=true" -q --no-trunc) ]]; then
   sudo docker rmi $(sudo docker images --filter "dangling=true" -q --no-trunc)

fi

If you can the following error:
/bin/sh: Text file busy
Consider adding the sync command between several commands in the same RUN
RUN chmod 777 /app/scripts/build.code.sh; sync; /app/scripts/build.code.sh

Bottom Line
Docker is really fun, and takes the whole DevOps process to a new level!

Keep Performing,

Nov 28, 2016

Optimizing Your Spark Cluster for Various Scenarios

Given a defined scenario for your system, how can you meet requirements?

Asking for more machines, memory and cores is always a valid idea, but your CFO may not agree and may decline your request.

"Know Your Data"
As Demi Ben Ari says: "know your data" is the answer.
You must understand your business needs to get more from your cluster.
By understading your needs you will be able to run more tasks faster.

Moreover, if you utilize cloud computing resources such as AWS, better understanding will let you select the right type of instances and getting more for same money (or even less as we will see next)

What can be controlled?
In this post, I will focus on YARN based clusters, but concepts can be applied to Mesos and stand alone clusters as well. Some more information can be found as well at Cloudera post.

How Many Nodes Do I Have?
Take a look at the number of servers installed w/ NodeManagers. This is the number of nodes you can spin (in this case we actually have 4, although there are total 8 YARN clients installed).
A node is an actual physical or virtual server (depends on your environment)

Define YARN Limitations:
yarn.nodemanager.resource.memory-mb: 
Total memory on a single node used by all executors 
(NodeMemInGB - 1)*1024

yarn.nodemanager.resource.cpu-vcores:
Total cores on a single node used by all executors
(NodeCores - 1)


Define Spark Executors Balancing
--executor-cores / spark.executor.cores: 
The number of cores per executor (number of tasks executor can run).

--executor-memory / spark.executor.memory: 
The executor memory (Actual upper limit is 64GB due to GC).
To which spark.yarn.executor.memoryOverhead will be added (default to max(384, .07 * spark.executor.memory))

Application master also requires default of 1GB and 1 core that runs on the driver and therefore we'll always decrease a single core and 1GB in our calculations. These can be controlled by the following parameters:
  --driver-memory

  --driver-cores

--num-executors / spark.executor.instances: 
The number of executors for a single job
Can be avoided using spark.dynamicAllocation.enabled

The massive interactive small data scenario
This is not a common scenario, but some people may want to run large number of tasks on relative small data.
This might not be the best practice for a Big Data, but it will provide us a good understanding of Spark cluster sizing based on this edge case.

We'll start w/ a default 4 nodes m4.xlarge 4 vCores, 16GB (on demand cost $0.239/hour of $717/month)

1. First we will maximize the resources for any given node:
yarn.nodemanager.resource.memory-mb: (16-1)*1024 = 15,360MB
Memory Before and After

yarn.nodemanager.resource.cpu-vcores: (4-1) = 3 vCores

CPU Before and after

Containers Before and After
2. Then we will maximize the number of executors, by minimize the number of cores per executor to 1
spark.executor.cores: 1
spark.executor.memory: (16-1)*(1-0.07)/3 = 4
spark.executor.instances: 3/1*4nodes = 12 executors

The Results
After finishing configuration, we enabled 12 tasks that run 24x7 using a manual schedualer written in our lab.
As you can see number actually fit our design: CPU jumps and hits the 100%, while cluster remain stable and not crushes.


The cluster utliliuzation jump after conifiguration to 12 executors and running 8 tasks every minute


The cluster utilization hits 100% CPU w/ 11 tasks every minute


Optimizing the Instance Types
Since our bottleneck is CPU vCores, by knowing our data we should seek another type of AWS instance. c3.2xlarge is an acutal match as the C3/4 series is optimized to provide more CPU and cores for less money (a c4.2xlarge instance has w/ 8 vCores and 15GB at $0.419/hour, so for $629/month we'll get 2 c4.2xlarge nodes).

1. First we will maximize the resources for any given node:
yarn.nodemanager.resource.memory-mb: (15-1)*1024 = 14,336MB
yarn.nodemanager.resource.cpu-vcores: (8-1) = 7 vCores

2. Then we will maximize the number of executors, by minimize the number of cores per executor to 1
spark.executor.cores: 1
spark.executor.memory: (15-1)*(1-0.07)/7 = 1
spark.executor.instances: 7/1*2nodes = 14 executors

Bottom Line
With some little tweeking, we could get 17% more exectores for 12% less money, or bottom line of 33% more value/money!

Keep Performing,
Moshe Kaplan

P.S You should explore more ways to save money on this kind of architrecture by utilizing:
 - Reserve instacnes
 - Spot instances

Nov 13, 2016

Creating LVM disk for MongoDB: Right or Not?

One of the benefits of using replicated environments is the option of having large high IOPS disks using commodity disks, without the need to create complex HA disk based solutions like RAID 10.

We wil demostrate this method by creatign a LVM based disk array.

Creating the LVM Disk Array
Based on environment w/ two disks:
/dev/xvdb
/dev/xvdc

0. Setup prequistions
sudo apt-get -y install lvm2 xfsprogs
1. Create physical volume for the two devices
sudo pvcreate /dev/xvdb /dev/xvdc
2. Create a Volume Group 
sudo vgcreate vg_data /dev/xvdb /dev/xvdc
3. Create new logical volume from all the space on the disks:
sudo lvcreate -l 100%FREE -n data vg_data
4. Create xfs file system
sudo mkfs.xfs -f /dev/vg_data/data
5. Mount the disk
sudo echo "/dev/vg_data/data /var/lib/mongodb xfs defaults 0 2" >> /etc/fstab && sudo mount -a 
sudo chown -R mongodb:mongodb /var/lib/mongodb
Note: This method may be effective only when you have large number of disks (>3) and relatively high disk space utilization. Otherwise, you may face high IOPS utilization only on part of the disks as can be seen below
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           3.77    0.00    0.42    0.14    0.02   95.65
Device:            tps    kB_read/s    kB_wrtn/s    kB_read    kB_wrtn
xvda              0.00         0.00         0.00          0          0
xvdb            176.00         0.00     21294.00          0      42588
xvdc              8.00         0.00        16.00          0         32

dm-0            170.00         0.00     20560.00          0      41120

Bottom Line
Choosing LVM can be a great soluton to support your big data intallation. However, you should test your case to verify it is the right solution for your needs,

Keep Perforrming,
Moshe Kaplan

Sep 6, 2016

Ruby is Dead! (You nead to take care of its memory issues)

One of the common problems with Ruby is its memory usage, just like JVM based languages (yes Java, I'm talking about you). Actually Ruby just like Java is based on garbage collector. This GC if not used correctly, can cause "stop the world scenarios" (as we can see in the attached diagram). In this cases, GC actually stops responding while consuming servers whole CPU and in some cases even causes server reboot.
Unicorn and Ruby memory leak causes server downtime
What can be done?

1. Reboot your Ruby Periodically

If you are using the popular Unicorn web server, memory issues may be even more severe, as Unicorn forks. During forking, it's copying the whole memory footprint of the parent (Copy on Write or CoW). Therefore, you may use the "Unicorn Worker Killer" gem that will monitor your server and gracefully will restart specific worker when memory reaches a new high or specific number of requrests were served. Since the gem supports randomization, it assures in a very high probablilty that the server will continue server.

Installing the killer:
gem 'unicorn-worker-killer'

Add to config.ru above "require ::File.expand_path('../config/environment', __FILE__)"
# Unicorn self-process killer
require 'unicorn/worker_killer'

And decide your worker graceful reboot method: 
# Max requests per worker
use Unicorn::WorkerKiller::MaxRequests, 3072, 4096
# Max memory size (RSS) per worker
use Unicorn::WorkerKiller::Oom, (192*(1024**2)), (256*(1024**2))
If you are using Ruby 2.X, you can exploit the CoW by better configuring Unicorn.
config/unicorn.rb (and a detailed explenation on setting at sirupsen)
 - worker_processes: 1x your cores
 - timeout: worker request timeout, should be 15 to 30 sec max
 - preload_app: enables CoW, but requires managning connect/disconnect on fork


3. Take care of GC configuration
You can take a look at a detailed discussion at collective idea's post.

Bottom Line
Dynamic languages has their downs, yet w/ the right design you can keep them up and running,

Keep Performing,

Jun 27, 2016

Neo4j Cluster Performance Tuning

Neo4j is one of the leading graph database these days, and it is very popular in recommendation systems, fraud detetion and social networks scenarios.

While the single instance (that is included in the community edition) performs very well (usually w/ under 10ms response time), You may face challenges in cluster mode

Why Should You Expect for Performance Degradation in Neo4j Cluster?
Two simple reasons:
  • Neo4j cluster is a Master-Slave cluster w/ an auto failover method (much like MongoDB). However, unlike MongoDB, primary node deteciton by client is done by a server side load balancer and not by the client's driver. 
  • Cluster replication is syncronious by default, unlike MongoDB async default behviour.
How Much will it Cost us?
  • The various nodes of the cluster should be behind a LB. If you select AWS ELB, it will cost you 7 to 30ms according to our measures below. The ELB latency is increased as request and response become larger (see details on the bottom). Note: impelementing a MongoDB like driver could be a great improvment and will help saving this latency and minimize system cost. and it's a great idea for a side project!
  • The nodes behind the ELB replicate changes.from master to clients. The level of syncronization is controlled by the ha.tx_push_factor parameter w/ a default value of 1. This parameter controls the number of slaves that should recieve the commit before answering the client. By setting it to 0, you avoid syncronization and get a similar result to a single node. Changing the factor wil save 70ms at average (and much more at peak time), and will leave us w/ an average 40ms per query (inc. ELB cost).
You can find the diffrences below, where in the tested environment a community edition instance was replaces by a 3 nodes cluster behind an ELB:
  • In the left section you can see an average of 9ms in the initial state (single community edition instance)
  • In the middle you can see a flactuating response time of 40ms (reads) to 300ms (writes) for a 3 nodes cluster behind ELB w./  ha.tx_push_factor parameter w/ default value 1.
  • In the right section you can see a steady 40ms for both reads and writes for or a 3 nodes cluster behind ELB w./  ha.tx_push_factor parameter w/ value set to 0 (async replication).
Neo4j performance as measured by DataDog client side metrics
Bottom Line
HA have some cost by its side. Better implmementation of the load balancing and right seleciton of syncronization model, can help you gain the needed performance

Keep Performing,


Measure from Inside the Server:
> mytime="$(time ( curl http://localhost:7474/db/manage/server/ha/master ) 2>&1 1>/dev/null )"
> echo "$mytime"
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    581      0 --:--:-- --:--:-- --:--:--   666

real    0m0.006s
user    0m0.005s
sys     0m0.000s

Measure from the Application Server Direct to Master:

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    988      0 --:--:-- --:--:-- --:--:--  1333

real    0m0.006s
user    0m0.000s
sys     0m0.000s

Measure from the Application Server to Master through the ELB:

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0    417      0 --:--:-- --:--:-- --:--:--   444

real    0m0.015s
user    0m0.000s
sys     0m0.000s

Some More Measures to Gain Data to Explore the Neo4j Performance:
  • Enable slow log query and filter server time
  • Install monitoring like Datadog in the application level:

ShareThis

Intense Debate Comments

Ratings and Recommendations