How to Minimize the Performance Impact of Re-Replication

For various reasons, data may need to be re-replicated between nodes in a MapR cluster. For example, if a disk goes bad, the content it stored will need to be re-replicated to ensure that the data is fully protected. Decommissioning a node is another instance where re-replication is needed. When a node is moved into a topology which does not have any volumes associated with it, it forces re-replication of the data that it stores onto nodes that are in other topologies.
 
When re-replication is in progress, data will be read back from disks on one node and sent across the network to another node, where it will be written to disk. As nodes have finite hardware resources, the load from re-replication competes with loads generated by user applications such as map/reduce jobs or HBase operations. This can result in competition for network or disk bandwidth, which can cause degradation for latency or throughput-sensitive user applications.
 
When re-replication uses a significant amount of network bandwidth, it could potentially result in longer response times to HBase operations that need to be transmitted across the same network paths. Depending upon your use case, this may have a significant impact.
 
In MapR v3.0.1 and later, re-replication traffic can be throttled back in order to minimize the impact on user applications. In previous software versions, MFS re-replication traffic would aggressively use hardware resources. However, with MapR v3.0.1 and later, administrators can instruct the software to give higher priority to user-generated load.
 
There are two configuration variables used to control the throttling, both of which are implemented in the MFS service. The parameters are set in /opt/mapr/conf/mfs.conf on each node and are named "mfs.disk.resynciothrottle.factor" and "mfs.network.resynciothrottle.factor."
 
The source of container resyncs will use the throttle to add delay/sleep time in between individual operations for container resyncs.
 
Without the throttle, an MFS node would read content back from disk as fast as possible and quickly send it out onto the network. For example:
 
Thread 1 to read from local disk into MFS memory:
while (moreDataToReadFromDisk) {
copyIntoMemoryBuffer( buffer, readFromLocalDisk() ); 
}
 
Thread 2 to send from MFS memory to a remote MFS node:
while ( bufferIsNotEmpty(buffer) ) {
sendContentFromBuffer( buffer, remoteNode );
}
 
Both of these threads would be running as fast as possible; a node would read data back from the local disks as fast as possible into MFS memory, and then send the content from the MFS memory to the remote node receiving the resync.
 
With the throttle, an MFS node sleeps after each local disk read and after each network send of replication data. The length of the sleep is determined by the number of concurrent resync operations being performed, the time it took to execute the read/send, and the throttle factor from the mfs.conf.
The threads will look something like this:
Thread 1 to read from local disk into MFS memory: 
while (moreDataToReadFromDisk) { 
startTime=now();
copyIntoMemoryBuffer( buffer, readFromLocalDisk() );
timeDelta=now() - startTime;
sleepTime= (constant) 10 * timeDelta * numberOfResyncThreads / diskThrottleFactor
sleep(sleepTime);
}
 
Thread 2 to send from MFS memory to a remote MFS node:
while ( bufferIsNotEmpty(buffer) ) {
startTime=now();
sendContentFromBuffer( buffer, remoteNode );
timeDelta=now() - startTime;
sleepTime= (constant) 10 * timeDelta * numberOfResyncThreads / networkThrottleFactor
sleep(sleepTime);
}
 
Essentially, every time we do a disk read as part of re-replication, we note how long it takes and how many other threads are doing disk reads for re-replication. We then multiply these numbers. For example, if it took 10ms to do one disk read, and there are 10 different threads doing these reads, the result would be 100ms. We then multiple thatnumber by 10, and divide that result by the throttle factor set in mfs.conf. So if you set the mfs.conf throttle factor to 1, then we would wait 1000ms in this scenario before doing the next disk read from this resync thread. This keeps the disks available to provide quick responses to user requests.
 
Similarly, we do the same thing for network IO. We look at how long it takes to transmit a unit of data to a remote MFS and get a response back. We then look at how many concurrent threads are sending data to remote MFS nodes. We multiply those numbers, multiply again by 10, and then divide by the resync throttle factor.
 
This is not an exact description of the throttling implementation, but you now have an idea of the behavioral differences that can be observed between MapR software versions.
 

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free