Batched Puts into HBase

The following program illustrates a table load tool, which is a great utility program that can be used for batching puts into an HBase/M7 table. The program creates a simple HBase table with a single column within a column family and inserts 100000 rows with 100 bytes of data. The batch size for the puts is set to 500 here.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class LoadTableMTBatch {

	static long uniqueSeed = System.currentTimeMillis();
	static long[] count;
	static long[] latency;
	static int[] keySizes;
	public static long printPerNum = 10000;
	public static boolean noCRC = false;
	public static long keySize = 8;
	public static long startRow = 0;
	public static int batchSize = 500;
	public static int preSplit = 1;
	public static boolean flush = false;
	public static boolean autoFlush = false;
	public static final long startKey = 0L;
	public static final long endKey = 999999999999999L;
	public static final String HBASE_RESOURCE_NAME = "/opt/mapr/hbase/hbase-0.94.5/conf/hbase-site.xml";
	public static String ZOOKEEPER_NODES = "localhost";
	public static final Pair ZOOKEEPER_SETTINGS = new Pair(
			"hbase.zookeeper.quorum", ZOOKEEPER_NODES);

	public static void usage(String arg) {
		System.err.println("bad token: " + arg);
		System.err
				.println("loadMT -rows <100000> -valuesize <100 bytes>  -debug -path  -threads <10> -batchSize <500> -numCF <1> -numC <1> -preSplit <1> -zookeeperNodes  -AutoFlush -flush");
		System.exit(1);
	}

	public static void main(String[] args) throws java.io.IOException {
		Configuration conf = HBaseConfiguration.create();
		String tableName = null;
		long numRows = 100000;
		long numCF = 1;
		long numC = 1;
		long valueSize = 100;
		int numThreads = 10;
		boolean augment = false;

		for (int i = 0; i < args.length; ++i) {
			if (args[i].equals("-rows")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				numRows = Long.parseLong(args[i]);
			} else if (args[i].equals("-path")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				tableName = args[i];
			} else if (args[i].equals("-debug")) {
				conf.set("fs.mapr.trace", "debug");
			} else if (args[i].equals("-valuesize")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				valueSize = Long.parseLong(args[i]);
			} else if (args[i].equals("-threads")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				numThreads = Integer.parseInt(args[i]);
			} else if (args[i].equals("-p")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				printPerNum = Long.parseLong(args[i]);
			} else if (args[i].equals("-hbase")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				conf.addResource(new Path(args[i]));
			} else if (args[i].equals("-numCF")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				numCF = Integer.parseInt(args[i]);
			} else if (args[i].equals("-numC")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				numC = Integer.parseInt(args[i]);
			} else if (args[i].equals("-batchSize")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				batchSize = Integer.parseInt(args[i]);
			} else if (args[i].equals("-preSplit")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				preSplit = Integer.parseInt(args[i]);
			} else if (args[i].equals("-zookeeperNodes")) {
				i++;
				if (i >= args.length)
					usage(args[i]);
				ZOOKEEPER_NODES = args[i];
			} else if (args[i].equals("-AutoFlush")) {
				autoFlush = true;
			} else if (args[i].equals("-flush")) {
				flush = true;
			} else {
				usage(args[i]);
			}
		}
		if (tableName == null) {
			System.out.println("Must specify path");
			usage("path");
		}
		LoadTableMTBatch lt = new LoadTableMTBatch();
		try {
			LoadTableMTBatch.init(conf, tableName, numRows, numCF, numC,
					valueSize, augment);
			lt.loadTable(conf, tableName, numRows, numCF, numC, valueSize,
					numThreads);
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(-1);
		}
	}

	public void generateKeySizes() {
		Random rand = new Random(uniqueSeed);
		keySizes = new int[10];
		keySizes[0] = rand.nextInt(5) + 5;
		keySizes[1] = rand.nextInt(40) + 10;
		keySizes[2] = rand.nextInt(50) + 50;
		keySizes[3] = rand.nextInt(400) + 100;
		keySizes[4] = rand.nextInt(500) + 500;
		keySizes[5] = rand.nextInt(4000) + 1000;
		keySizes[6] = rand.nextInt(5000) + 5000;
		keySizes[7] = rand.nextInt(10000) + 10000;
		keySizes[8] = rand.nextInt(12000) + 20000;
		keySizes[9] = rand.nextInt(32 * 1024 - 1) + 1;
	}

	public void loadTable(Configuration conf, String tableName, long numRows,
			long numCF, long numC, long valueSize, int numThreads)
			throws Exception {
		Thread[] loadThreads = new Thread[numThreads];
		count = new long[numThreads];
		latency = new long[numThreads];

		if (keySize < 1) {
			generateKeySizes();
		}

		long offset = (endKey - startKey) / numThreads;
		for (int i = 0; i < loadThreads.length; i++) {
			latency[i] = 0;
			if (preSplit <= 1) {
				loadThreads[i] = new Thread(new LoadTableRunnable(conf,
						tableName, numRows, numCF, numC, valueSize, i,
						numThreads, batchSize));
			} else {
				loadThreads[i] = new Thread(new LoadTableRunnable(conf,
						tableName, numRows, numCF, numC, valueSize, i,
						numThreads, batchSize, startKey + i * offset, startKey
								+ ((i + 1) * offset) - 1));
			}
		}
		for (int i = 0; i < loadThreads.length; i++) {
			loadThreads[i].start();
		}
		long inserts = 0, insertsOld = 0, rate = 0, overallRate = 0, tA = 0, tB = 0, t0 = 0, elapsedTime = 0;
		long averageLatency = 0;
		long minLatency = 0;
		long maxLatency = 0;
		boolean alive = true;
		t0 = System.currentTimeMillis() - 1;
		tA = t0;
		tB = t0;
		while (true) {
			insertsOld = inserts;
			inserts = 0;
			tB = tA;
			tA = System.currentTimeMillis();
			alive = false;
			for (int i = 0; i < loadThreads.length; i++) {
				inserts += count[i];
				if (loadThreads[i].isAlive())
					alive = true;
			}
			rate = (inserts - insertsOld) * 1000 / (tA - tB);
			elapsedTime = (tA - t0);
			overallRate = inserts * 1000 / elapsedTime;
			// Min/Max/Average latency
			synchronized (latency) {
				Arrays.sort(latency);
				minLatency = latency[0];
				maxLatency = latency[numThreads - 1];
				averageLatency = (getSum(latency) / latency.length);
			}
			System.out.println("Elapsed time: " + elapsedTime / 1000
					+ "; Inserts: " + inserts + "; current rate " + rate
					+ "  inserts/sec; overall rate " + overallRate
					+ " inserts/sec; BatchSize " + batchSize + "; Min Latency "
					+ minLatency / 1000000L + "ms;" + " Max Latency "
					+ maxLatency / 1000000L + "ms;" + " Average Latency "
					+ averageLatency / 1000000L + "ms");
			if (!alive)
				break;
			// Print out interval
			Thread.sleep(1000);
		}
		for (int i = 0; i < loadThreads.length; i++) {
			loadThreads[i].join();
		}
	}

	public static long getSum(long[] array) {
		long sum = 0;
		for (long l : array) {
			sum += l;
		}
		return sum;
	}

	public static void createTable(Configuration conf, String tableName,
			long numCF) throws Exception {
		HBaseAdmin admin = new HBaseAdmin(conf);
		System.out.println("created admin object");
		HTableDescriptor des = new HTableDescriptor(tableName.getBytes());
		for (int i = 0; i < numCF; i++) {
			des.addFamily(new HColumnDescriptor("f" + i));
		}
		try {
			if (preSplit <= 1)
				admin.createTable(des);
			else {
				byte[] startKeyByte = Bytes.toBytes(startKey);
				byte[] endKeyByte = Bytes.toBytes(endKey);
				admin.createTable(des, startKeyByte, endKeyByte, preSplit);
			}
		} catch (TableExistsException te) {
			te.printStackTrace();
		} catch (IOException ie) {
			ie.printStackTrace();
		}
	}

	public static void init(Configuration conf, String tableName, long numRows,
			long numCF, long numC, long valueSize, boolean augment)
			throws IOException, Exception {
		if (augment) {
			HTable inTable = new HTable(conf, tableName);
			Result infoRes = inTable.get(new Get("homeRow".getBytes()));
			startRow = inTable.incrementColumnValue("homeRow".getBytes(),
					"f0".getBytes(), "c0".getBytes(), numRows)
					- numRows;
			numCF = Bytes.toLong(infoRes.getValue("f0".getBytes(),
					"c1".getBytes()));
			numC = Bytes.toLong(infoRes.getValue("f0".getBytes(),
					"c2".getBytes()));
			uniqueSeed = Bytes.toLong(infoRes.getValue("f0".getBytes(),
					"c3".getBytes()));
			keySize = Bytes.toLong(infoRes.getValue("f0".getBytes(),
					"c4".getBytes()));
		} else {
			createTable(conf, tableName, numCF);
			HTable inTable = new HTable(conf, tableName);
			Put info = new Put("homeRow".getBytes());
			info.add("f0".getBytes(), "c0".getBytes(), Bytes.toBytes(numRows));
			info.add("f0".getBytes(), "c1".getBytes(), Bytes.toBytes(numCF));
			info.add("f0".getBytes(), "c2".getBytes(), Bytes.toBytes(numC));
			info.add("f0".getBytes(), "c3".getBytes(),
					Bytes.toBytes(uniqueSeed));
			info.add("f0".getBytes(), "c4".getBytes(), Bytes.toBytes(keySize));
			inTable.put(info);
			inTable.flushCommits();
		}
	}

	public static void load(Configuration conf, String tableName, long numRows,
			long numCF, long numC, long valueSize, int threadNum,
			int numThreads, int batchSize, long startKey, long endKey)
			throws IOException {

		if (preSplit <= 1)
			System.out.println("Starting load thread " + threadNum);
		else
			System.out.println("Starting load thread " + threadNum
					+ " start key : " + startKey + "; end key :" + endKey);
		String family;
		String column;
		Put p = null;
		long counter = 0;
		HTable table = null;
		Random rand = new Random(uniqueSeed);
		incrementRandom(rand, (int) startRow);
		incrementRandom(rand, threadNum);
		long endRow = startRow + numRows;

		try {
			table = new HTable(createHBaseConfiguration(), tableName.getBytes());
			table.setAutoFlush(autoFlush);
			for (int i = threadNum + (int) startRow; i < endRow; i += numThreads) {
				byte[][] rowKeys = new byte[batchSize][];
				byte[][] families = new byte[batchSize][];
				byte[][] columns = new byte[batchSize][];
				byte[][] values = new byte[batchSize][];

				for (int batch = 0; batch < batchSize; batch++) {
					// Key
					byte[] rowKey = new byte[(int) keySize];
					if (keySize < 0) {
						int randSize = keySizes[rand.nextInt(Integer.MAX_VALUE) % 10];
						incrementRandom(rand, numThreads - 1);
						rowKey = new byte[randSize + 1];
					}
					if (preSplit <= 1) {
						StringBuilder keyBuilder = new StringBuilder();
						keyBuilder.append(i);
						keyBuilder.append(batch);
						createKey(rowKey, Long.valueOf(keyBuilder.toString())
								^ uniqueSeed);
						rowKeys[batch] = rowKey;
					} else {
						// Generate random long key
						rowKey = createKeyForRegion(rowKey, startKey, endKey);
						rowKeys[batch] = rowKey;
					}
					// Value
					byte[] value = new byte[(int) valueSize];
					fillBuffer(valueSize, value, batch);
					values[batch] = value;
					// CF + C
					family = "f" + (numCF - 1);
					families[batch] = family.getBytes();
					column = "c" + (numC - 1);
					columns[batch] = column.getBytes();
				}

				List puts = new ArrayList();
				long startTime = System.nanoTime();
				for (int batch = 0; batch < batchSize; batch++) {
					p = new Put(rowKeys[batch]);
					p.add(families[batch], columns[batch], values[batch]);
					puts.add(p);
				}
				try {
					table.put(puts);
					if (flush) {
						table.flushCommits();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
				long endTime = System.nanoTime();
				latency[threadNum] = (endTime - startTime);
				counter += batchSize;
				count[threadNum] = counter;
			}
		} finally {
			if (table != null)
				table.close();
		}
	}

	public static void incrementRandom(Random rand, int num) {
		for (int i = 0; i < num; i++) {
			rand.nextInt();
		}
	}

	public static void createKey(byte[] buffer, long seed) {
		Random rand = new Random(seed);
		CRC32 chksum = new CRC32();
		rand.nextBytes(buffer);
		chksum.update(buffer);
		return;
	}

	public static byte[] createKeyForRegion(byte[] buffer, long startKey,
			long endKey) {

		long key = new LongRandom().nextLong(endKey - startKey);
		buffer = Bytes.toBytes(startKey + key);
		return buffer;
	}

	public static void fillBufferNoCRC(long valueSize, byte[] buffer, int seed) {
		long newSeed = seed + System.currentTimeMillis();
		Random rand = new Random(newSeed);
		rand.nextBytes(buffer);
		return;
	}

	public static long fillBuffer(long valueSize, byte[] buffer, int seed) {
		long newSeed = seed + System.currentTimeMillis();
		Random rand = new Random(newSeed);
		CRC32 chksum = new CRC32();
		rand.nextBytes(buffer);
		chksum.update(buffer);
		return chksum.getValue();
	}

	public static Configuration createHBaseConfiguration() {
		Configuration conf = HBaseConfiguration.create();
		conf.addResource(new Path(HBASE_RESOURCE_NAME));
		conf.set(ZOOKEEPER_SETTINGS.getFirst(), ZOOKEEPER_SETTINGS.getSecond());
		return conf;
	}

	public class LoadTableRunnable implements Runnable {
		private Configuration conf;
		private String tableName;
		private long numRows, numCF, numC, valueSize;
		private int numThreads, threadNum;
		private int batchSize;
		private long startKey, endKey = -1;

		LoadTableRunnable(Configuration conf, String tableName, long numRows,
				long numCF, long numC, long valueSize, int threadNum,
				int numThreads, int batchSize) {
			this.conf = conf;
			this.tableName = tableName;
			this.numRows = numRows;
			this.numCF = numCF;
			this.numC = numC;
			this.valueSize = valueSize;
			this.threadNum = threadNum;
			this.numThreads = numThreads;
			this.batchSize = batchSize;
		}

		LoadTableRunnable(Configuration conf, String tableName, long numRows,
				long numCF, long numC, long valueSize, int threadNum,
				int numThreads, int batchSize, long startKey, long endKey) {
			this.conf = conf;
			this.tableName = tableName;
			this.numRows = numRows;
			this.numCF = numCF;
			this.numC = numC;
			this.valueSize = valueSize;
			this.threadNum = threadNum;
			this.numThreads = numThreads;
			this.batchSize = batchSize;
			this.startKey = startKey;
			this.endKey = endKey;
		}

		public void run() {
			try {
				if (endKey == -1) {
					LoadTableMTBatch.load(conf, tableName, numRows, numCF,
							numC, valueSize, threadNum, numThreads, batchSize,
							0, 0);
				} else {
					LoadTableMTBatch.load(conf, tableName, numRows, numCF,
							numC, valueSize, threadNum, numThreads, batchSize,
							startKey, endKey);
				}
			} catch (IOException e) {
				e.printStackTrace();
				System.exit(-1);
			}
		}

	}

	static class LongRandom extends Random {

		private static final long serialVersionUID = 1L;

		/**
		 * Generating a long in the range of 0<=value<=n
		 * 
		 * @param n
		 * @return
		 */
		public long nextLong(long n) {
			if (n <= 0L)
				throw new IllegalArgumentException();

			// for small n use nextInt and cast
			if (n <= Integer.MAX_VALUE) {
				return (long) nextInt((int) n);
			}

			// for large n use nextInt for both high and low ints
			int highLimit = (int) (n >> 32);
			long high = (long) nextInt(highLimit) << 32;
			long low = ((long) nextInt()) & 0xffffffffL;
			return (high | low);
		}
	}

}
Tags
HBase
M7