Nov 18, 2018

10 Things You Should Know about MongoDB Sharding

MongoDB Sharding is a great way to scale out your writes without changes to application code. However, it requires careful design to enjoy all the scale and performance benefits:
  1. You cannot rename a sharded collection. Think twice before any action!
  2. MongoDB collection cannot be resharded: To reshard a collection, you will have to copy the collection into a new sharded collection (or do it twice if you cannot rename the collection name in the application level). You will need a script like this:
    use app;
  3. When you run this script, make sure you do it on the cluster (or on a server with minimal latency to the cluster) to accelerate copy.
  4. If collection is already large, you will need to split the script and run multiple processes to fasten the copy.
  5. Avoid creating a unique index on the sharded collection. Sharded collection cannot utilize unique index..
  6. Be careful to avoid selecting a field that does not exist in all the documents. Otherwise, you will find tons of "missing documents".
  7. Follow the previous rule when selecting range based sharding.
  8. Choose the sharding key wise. If you do not sure 100% your keys will be distributed evenly, use a hashed key to ensure it
    db.c_to_shard.ensureIndex({user_id: "hashed"})
    sh.shardCollection("app.c_to_shard", {"user_id": "hashed"})
  9. Verify your shards and chunks are evenly distributed using getShardDistribution(). Verify it once sharded is done, and as data starts to get into the collections. If something is wrong, the smaller the collection, the easier the fix.
    db.c_to_shard.getShardDistribution() or do the same with the following script that will provide you a detailed information on each shard and chunk.
  10. If all chunks reside on a single shard, verify the balancer status, and start if needed:
Bottom Line
MongoDB sharding is a great way to scale out your data store write thoughput. However, you should select your sharding key wise, otherwise, it might require a significant effort to fix stuff.

Keep Performing,

Jun 10, 2018

Getting Enterprise Features to your MongoDB Community Edition

Many of us need MongoDB Enteprise Edition, but might be short of resources, or would like to compare the value.

I have summarized several key features of MongoDB Enteprise Edition and their alternatives

Monitoring Options:
  • MongoDB Cloud Manager: Performance monitoring ($500/yr/machine) => $1000-1500
  • Datadog/NewRelic => $120-$180/yr per machine, Datadog is better for this case
  • DYI using tools such mongotop, mongostat, mtools and integrate w/ grapha and other

Replication is super recommended and is part of the community edition:
Replica set => min 3 nodes, at least 2 data nodes in 3 data centers (2 major DC and one small).
Backup and Restore:
There are 3 major options (that can be combined of course):
  • fsync to mongodb and physical backup:
    • fast backup/restore
    • Might be inconsistent/unreliable
  • Logical backup: based on mongodump
    • Can be done w/ $2.5/GB using the cloud manager w/ Point in time recovery
    • Can be done w/ Percona hot backup
    • Incremental is supported 
  • Have a delayed node
The first two may be done using a 3rd data node in hidden for backup (high frequency backup) that enable

Encryption Alternatives:
  • Disk based encryption => data at rest (can be done in AWS and several storage providers)
  • eCryptFS => Percona => data at Rest
  • Encryption based application by the programmers in the class level before saving to disk.

Use Percona edition is a good alternative that may close many of your enterprise needs

BI :
Well supported with MongoDB BI Connector in the enterprise edition, but can be done also w/
  • Some BI tool supports MongoDB natively
  • 3rd party provider for JDBC connector: such as simba and
Bottom Line
Getting your MongoDB Community Edition to meet Enterprise Requirements is not simple but with the right effort it can be done.

Keep Performing,
Moshe Kaplan

Mar 5, 2018

Some Lessons of Spark and Memory Issues on EMR

In the last few days we went through several perfomrance issues with spark as data grow dramaticaly. The easiest way to go around might be increasing the instance sizes. However, as scaling up is not a scalable strategy, we were looking for alternate ways to back to track, as one of our Spark/Scala based pipelines strarted to crash.

Some Details About Our Process
We run a Scala (2.1) based job on a Spark 2.2.0/EMR 5.9.0 cluster w/ 64 r3.xlarge nodes.
The job analyzes several data sources each of few houndred GB (and growing) using the dataframe API and output data to S3 using ORC format. 

How Did We Recover?
Analyzing the logs of the crashed cluster resulted w/ the following error:

WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Setting the spark.yarn.executor.memoryOverhead to 2500 (the maximum on the instance type we used r3.xlarge) did not make a major change.

spark-submit --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=2500 ...

We raised the bar by disabling the virtual and physical memory checks and increasing the virtual to physical memory ratio to 4 (This is done step 1: Software and Steps of EMR creation by setting the following value of Edit software settings)
{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},{"classification":"yarn-site","properties":{" yarn.nodemanager.vmem-pmem-ratio":"4","yarn.nodemanager.pmem-check-enabled":"false","yarn.nodemanager.vmem-check-enabled":"false"}}

However, this made the magic till hitting the next limit (probably spark tasks were killed when they trying to abuse the physical memory) with the following error:

ExecutorLostFailure (executor  exited caused by one of the running tasks) Reason: Container marked as failed: container_ on host:. Exit status: -100. Diagnostics: Container released on a *lost* node 

This one was solved  by increasing the number of dataframe partitions (in this case from 1024 to 2048), that reduced the needed memory per partition.

Note: if you want to change dataframe default partitions number (200) use the following:
setConf("spark.sql.shuffle.partitions", partitions.toString)
setConf("spark.default.parallelism", partitions.toString)

If you want to take another look on the default partitioning and how to automate the numbers, take a look at Romi Kuntsman's lecture.

Right now, we run in full power ahead. yet when we may hit the next limit, it may worth an update.

Bottom Line
As Spark heavily utilizes cluster RAM as an effective way to maximize speed, it is highly important to monitor it and verify your cluster settings and partitioning strategy meet your growing data needs.

Keep Performing,
Moshe Kaplan

Nov 19, 2017

Encapsultaing Processes using Docker

One of the problems with monoliths that when one part of the system goes mad, you are going to work very hard to get out of the mess.

This time we'll discuss how Docker can save your day, by encapsulating a running process while minimizing affects on current system architecture.

An example for this kind of process is FFMPEG: a complete, cross-platform solution to record, convert and stream audio and video. This process is CPU bound, and may leave your other processes w/ no option to serve due to lack of CPU resources.

Encapulate the Process as a Docker Container
We'll encapulate FFMPEG in a Docker container, in a way that we could use the same interfaces we used to call FFMPEG before:
1. The FFMPEG input folder will be mapped to a host folder
2. The FFMPEG outut folder will be mapped to a host folder
3. The container can be run from command line resulting in FFMPEG process execution

The bonus for this design, as docker resources can be easily manages, we will be able to limit the amount of memory and CPU used by the FFMPEG process

Creating the Docker Container
As the Docker community is highly active, we can easily find a good baked Docker image for this task. This specific one is using the Alpine Linux flavour that ensures us a minimum overhead

Running the Docker Container from Command Line
In the following lines we'll:

  1. Pull the image 
  2. Run it, while mapping the internal /tmp folder to the host /tmp folder
  3. Limit the number of CPU cores to 4 (in this case)
  4. Add to the run any relevant parameter

docker pull alfg/ffmpeg
docker run -it --cpus="4" -v /tmp:/tmp --rm alfg/ffmpeg ffmpeg -buildconf [EXTRA_PARAMS]

Bottom Line
Docker is a great solution for microservices based architecture, but it can also provide a real value for monolith low hanging fruites cases

Keep Performing,
Moshe Kaplan

Sep 14, 2017

11 MongoDB Assesment Questions You Should Ask Before Contacting an Expert

If you are facing issues w/ your MongoDB setup, these are the questions to ask, to analyze your gaps:
  1. What is the issue you are facing?
    1. Performance?
    2. Sizing?
    3. Cost?
    4. Data Model?
    5. Backup/Monitoring/System?
    6. Security?
  2. What is the current sizing in GB of the database?
  3. What MongoDB version do you use?
  4. What storage engine?
  5. Do you use Replica Set?
  6. Do you use Sharding?
  7. How do you backup your database?
  8. Where do you host your servers?
  9. What is the sizing of your machines (CPU, Disk size and type and RAM)?
  10. Do you have any monitoring traces of your machines (CPU, disk usage and RAM)?
  11. Did you implemented indexes?
Keep Performing

Jul 21, 2017

Cassandra Design Best Practices

Cassandra is a great NoSQL product, it provides near real time performance for designed queries and enables high availablity w/ linear scale growth as it uses the eventually consistent paradigm.
 In this post we will focus on some best practices for this great product: 

 How many nodes do you need? 
  1. Number of nodes should be odd in order to support votes during downtime/network cut.
  2. Minimial number should 5, as lower number (3) will result in high stress on the machines during node failure (replicaiton factor is 2 in this case, and each node will have to read 50% of the data and write 50% of data. When you select replication factor 3, each node will need to read 15% of data and write 15% of the data. Therefore, recovery will be much faster, and higher chances performance and availablility will not be affected.
C* like any other data store loves fast disks (SSD) although its SSTables and INSERT only architecture and as much memory as your data.
In particular your nodes should be 32GB to 512GB RAM each (and not less than 8GB in produciton and 4GB in development). This is a common issue since C* was coded in Java. For small machines you should avoid G1 and keep w/ CMS.

JVM heap size should be max of 8GB too avoid too long "stop the world" during GC.
If you feel the default Heap size (max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB)) does not fit your needs, try to set it between 1/4 to 1/2 of your RAM, but not more than 8GB. 

C* is also CPU intensive and 16 cores are recommended (and not less than 2 cores for development).

Repair and Repair Strategy
nodetool repair is probably one of the most common tasks on C* cluster. 
  1. You can run it on a single node or on a whole cluster.
  2. Repair should run before reaching the gc_grace_seconds (default 10 days) that will remove thombstones
  3. You should run it durring off-peak hours (probably during weekend) if you keep w/ the gc_grace_seconds.
  4. You can take this numbers down, but it will affect your backup and recovery strategy (see details about recovery from failure using hints).
You can optimize the repair process by using the following flags:
  1. -seq: repair token after token: slower and safer
  2. -local: run only on the local data center to avoid downtime of both in any case
  3. -parallel: fastest mode: run on all datacenters in parallel
  4. -j: the number parallel jobs on a node (1-4), using more threads will stress the nodes, but will help end the task faster.
We recommend to select your strategy based on height of your peaks and the sensitivity of your data. If your system has the same level of traffic 24/7, consider doing things slow and sequencial.
The higher your peaks, the more stress your should do on your system during off peak hours.

There are several backup strategies you can have:
  1. Utilize your storage/cloud storage snapshot capabilities. 
  2. Use C*  nodetool  snapshot command. This one is very similar to your storage capabilities but enables backup only the data and not the whole machine.
  3. Use C* incremental backup that will enable point in time recovery. This process is not a daily process, but requires copying and managing small files all the time
  4. Mix C* snapshots and incremental backups to minimize the time of recovery while keeping the point of time recovery option.
  5. Snapshots and commit log: complex process to recover that supports point in time recovery, as you need to reply the commit log.
We recommend to use the daily snapshot if your data is not critical and you want to minimize your Ops costs, or the mix C* snapshots and incremental backup when you must have a point in time recovery.

There are several approaches to go:
  1. Commerical Software:
    1. DataStax OpsCenter solution: as almost every other OSS, DataStax that provides the commerical version of C*, provides a pay for managment and moniotring solution
  2. Commercial Service including
    1. NewRelic: provides a C* plugin as part of its platform
    2. DataDog: with a nice hint on what should be monitored.
  3. Use Open Source with common integration:
    1. Graphite, Grafana:or Prometheus: 3 tools that can work together or apart and integrated w/ time series and relevant metrics.
    2. Old style Nagios and Zabbix that provides communitry plugins
If you choose a  DIY solution there some hints that you can find in the commercial products and services and also in the folowing resources:
  1. Basic monitoring thresholds
  2. Nagios out of the box plugins that thresholds can be extracted from
For example:
  1. Heap usage: 85% (warning), 95% (error)
  2. GC ConcurrentMarkSweep: 9 (warning), 15 Error
Our recommendation is starting when possible with an existing service/product, get experience w/ the metrics that are relevant for your environment, and if needed, implement based on them your own setup.

Lightweight transactions are meant to enable case studies that requires sequence (or some type of transactions) in an eventually consistent environment.
Yet notice, that it's a minimal solution that is aimed to serialize tasks in a single table.

We believe that this is a good solution, but the if your data requires consistent soluton, you should avoid eventually consitent solution and look for SQL solution (with native transactions) or NoSQL solution like MongoDB.

C* Internals
What to know more? use the following videos or get the O'REILY book 

Bottom Line
C* is indeed a great product. However, it definittly not an entry level solution for data storage, and managing it requires skills and expertise

Keep Performing,
Moshe Kaplan

Jun 21, 2017

Installing Elastic Stack in a Single Click using Docker Compose

The Elastic team made a great work w/ packaging the various elastic stack compnents into Docker images.
Yet, this manual will help you boot the entire stack including Logstash and Kibana in a single click using Docker Compose.

1. Run the following command in your host machine 
sudo sysctl -w vm.max_map_count=262144

2. Install Docker Compose
In ubuntu for example use:
sudo apt-get -y install docker-compose

3. Create a docker-compose.yml according to the follow link:
Copy the docker-compose.yml to your machine

4. Start the cluster
sudo docker-compose up

5. Verify the cluster using the password changeme
curl -u elastic
Enter host password for user 'elastic':
1498046576 12:02:56 docker-cluster green 2 2 12 6 0 0 0 0 - 100.0%

6. And Kibana using 

Bottom Line
Docker changes the DevOps world as we know it, and complex tasks that took hours, can be done in few clicks

Keep Performing,


Intense Debate Comments

Ratings and Recommendations