Hive Transaction Feature in Hive 1.0

The New Hive Transaction Feature Adds ACID Semantics at the Row-Level

This article describes the new Hive transaction feature introduced in Hive 1.0. This new feature adds initial support of the 4 traits of database transactions – atomicity, consistency, isolation and durability at the row level. With this new feature, you can add new rows in Hive while another application reads rows from the same partition without interference.

Please note that this feature is not intended to cover all the transactional capabilities that are typically found in a relational database management system (RDBMS). For instance, it does not include BEGIN, COMMIT, and ROLLBACK statements which are typically required in RDBMS environments. That said, the primary use cases for this feature include:

Streaming ingest of data. Without transactional support, you can potentially read an inconsistent view of your Hive tables if updates are being made concurrently with ingest tasks. This new feature ensures the ingest stream is isolated from your reads.

It’s worth noting that in MapR, an easier way to handle this use case is to take a snapshot and run your read queries on that snapshot. Since MapR snapshots are guaranteed to be consistent, your read queries on a snapshot will see a completely static view of your Hive tables.

Slow changing dimensions. When your Hive tables need the occasional insert or update of records, such as in a dimension table, this new features lets you make those incremental changes without having to rewrite the entire partition.

Data restatement. When data in your Hive tables need to be changed, whether due to a correction of data, a restatement of existing transaction data, or a deletion due to the end of the data lifecycle, this new feature lets you make the updates without an entire partition rewrite.

How to Enable Hive transactions

The Hive metastore service is required to use this feature, because it needs to spawn a set of compaction related threads. The minimum required configurations are:

For Client or HiveServer2:

- If Hive CLI is used, set the below configuration in /opt/mapr/hive/hive-1.0/hive-site.xml on the client nodes.
- If HiveServer2 is used to accept connections, set the below configuration in /opt/mapr/hive/hive-1.0/hive-site.xml before starting HiveServer2.
- If you do not want this feature to be enabled for all queries, set the below properties at the session level.

<property>
  <name>hive.support.concurrency</name>
  <value>true</value>
</property>

<property>
  <name>hive.enforce.bucketing</name>
  <value>true</value>
</property>

<property>
  <name>hive.exec.dynamic.partition.mode</name>
  <value>nonstrict</value>
</property>

<property>
  <name>hive.txn.manager</name>
  <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>

For Hive Metastore:

<property>
  <name>hive.compactor.initiator.on</name>
  <value>true</value>
</property>

<property>
  <name>hive.compactor.worker.threads</name>
  <value>1</value>
</property>

After updating the configuration in the corresponding hive-site.xml, restart the services – HiveServer2 and Hive Metastore.

More transaction related configurations are here.

File Format Support

In this initial release, only ORC bucketed non-sorted tables are supported. You can run, for example, the following queries successfully:

create table h1_test2(id int, id2 string) 
clustered by (id) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true'); 
insert into table h1_test2 values(1,'abc');
delete from h1_test2 where id=1;

Attempts to use this feature with other file formats will throw errors as follows:

Non-ORC table will fail with below error:

create table h1_test_normal(id int, id2 string) 
TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test_normal values(1,'abc');
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table h1_test_normal that does not use an AcidOutputFormat or is not bucketed

ORC non-bucketed table will fail with below error:

create table h1_test(id int, id2 string) 
stored as orc TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test values(1,'abc');
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table h1_test that does not use an AcidOutputFormat or is not bucketed

ORC bucketed sorted table will fail with below error:

create table h1_test4(id int, id2 string) 
clustered by (id) sorted by (id2) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test4 values(1,'abc');
FAILED: SemanticException [Error 10298]: ACID insert, update, delete not supported on tables that are sorted, table h1_test4

Minor and Major Compactions

To support transactions, each transaction of Insert/Update/Delete(aka DML) is stored in delta files. Occasionally these changes need to be merged into the base files. Both base and delta directory names contain the transaction IDs, just like SCN in Oracle.

drwxr--r--  2 mapr mapr  8 May 30 01:31 base_0000028
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000029_0000029
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000030_0000030
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000031_0000031
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000032_0000032
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000033_0000033
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000034_0000034
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000035_0000035
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000036_0000036

The compactions are done by a set of threads of Hive Metastore hive.compactor.worker.threads(default is 0 but value required for transactions: > 0 on at least one instance of the Thrift metastore service) controls the number of compactor worker threads that will be spawned by the Hive Metastore service. Besides worker threads, there is one initiator thread and one cleaner thread.
This can be seen by "jstack <pid of metastore>", here hive.compactor.worker.threads is set to 2.

$ jstack 18220  |grep -i compact
at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.run(Cleaner.java:141)
at org.apache.hadoop.hive.ql.txn.compactor.Worker.run(Worker.java:83)
at org.apache.hadoop.hive.ql.txn.compactor.Worker.run(Worker.java:83)
at org.apache.hadoop.hive.ql.txn.compactor.Initiator.run(Initiator.java:137) 

If compactions are in progress, we can use the "show compactions" command to check. For example, below is one major compaction:

hive> show compactions;
OK
Database Table Partition Type State Worker Start Time
default h1_test2 NULL MAJOR working h3.poc.com-25 1432949449000
And also in hive.log of metastore node, below lines show up:
2015-05-30 01:20:49,158 INFO  [Thread-8]: compactor.Initiator
(Initiator.java:requestCompaction(282)) - Requesting MAJOR compaction for default.h1_test2
2015-05-30 01:20:49,512 INFO  [h3.poc.com-29]: compactor.Worker (Worker.java:run(139)) - Starting MAJOR compaction for default.h1_test2

Minor compactions take a set of existing delta files and rewrites them to a single delta file per bucket
By default, if the number of delta directories in a table or partition is larger than 10 (hive.compactor.delta.num.threshold), minor compaction will be triggered. So when the 11th delta directory is created:

drwxr--r--   - 8 2015-06-01 21:08 /user/hive/warehouse/h1_test2/base_0000059
drwxr--r--   - 8 2015-06-01 21:16 /user/hive/warehouse/h1_test2/delta_0000060_0000070
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000071_0000071
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000072_0000072
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000073_0000073
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000074_0000074
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000075_0000075
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000076_0000076
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000077_0000077
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000078_0000078
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000079_0000079
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000080_0000080

After minor compaction:

drwxr--r--   - 8 2015-06-01 21:08 /user/hive/warehouse/h1_test2/base_0000059
drwxr--r--   - 8 2015-06-01 21:21 /user/hive/warehouse/h1_test2/delta_0000060_0000080

So the base directory is still at transaction ID 59, but the aggregated delta directory contains data from transaction ID 60 to 80.

Major compaction takes one or more delta files and the base file for the bucket and rewrites them into a new base file per bucket
By default, if total size of delta data reaches 10%(hive.compactor.delta.pct.threshold) of base data, major compaction will be triggered. Before major compaction:

drwxr--r--   - 3 2015-06-02 00:26 /user/hive/warehouse/h1_test2/base_0000082
drwxr-xr-x   - 1 2015-06-02 01:43 /user/hive/warehouse/h1_test2/delta_0000083_0000083
drwxr-xr-x   - 1 2015-06-02 01:43 /user/hive/warehouse/h1_test2/delta_0000084_0000084
After major compaction:
drwxr--r--   - 2 2015-06-02 01:45 /user/hive/warehouse/h1_test2/base_0000084

So the base directory name will contain the latest transaction ID -- 84.

Compactions can be initiated manually

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])]  COMPACT 'compaction_type';

If you do not want the automatic compaction to affect the system performance in peak time, here is an option to set "NO_AUTO_COMPACTION" table property, eg:

hive> create table test_nocompact(id int) TBLPROPERTIES("NO_AUTO_COMPACTION"="true");
hive> alter table test_nocompact set TBLPROPERTIES("NO_AUTO_COMPACTION"="true");

With automatic compactions disabled compactions can be triggered manually in quiet time window. Note, this command just triggers the compaction, and it does not wait for its completion. Sometimes the compaction is enqueued, and you can always use "show compactions" to check the status.
If auto compactions are not happening or not enabled, the tables could have too many small files so that the performance will be hugely impacted. It will be important to regularly run manual compaction if auto compaction is disabled.

After a compaction the system waits until all readers of the old files have finished and then removes the old files
Take above 3.2 minor compaction for example. It firstly generates "delta_0000060_0000080", and waits for all old queries to finish, then it removes old delta directories. If cleaning starts, the below log message shows up in the Hive metastore log (Default location is /opt/mapr/hive/hive-1.0/logs/mapr/hive.log):

[Thread-12]: compactor.Cleaner (Cleaner.java:clean(178)) - Starting cleaning for default.h1_test2
Assume before compaction starts, there is a huge query running for days, then the old delta directories can not be cleaned until the query finishes or fails.

Cleaner thread is a separate thread to clean directories after compaction
By default, every 5 seconds(hive.compactor.cleaner.run.interval) the cleaner thread will check if any directories need to be cleaned. The logic of cleaner thread is well documented in

org.apache.hadoop.hive.ql.txn.compactor.Cleaner.java:
// First look for all the compactions that are waiting to be cleaned.  If we have not
// seen an entry before, look for all the locks held on that table or partition and
// record them.  We will then only clean the partition once all of those locks have been
// released.  This way we avoid removing the files while they are in use,
// while at the same time avoiding starving the cleaner as new readers come along.
// This works because we know that any reader who comes along after the worker thread has
// done the compaction will read the more up to date version of the data (either in a
// newer delta or in a newer base).

If you find the cleaner thread is not cleaning some really old delta directories, please run "show locks" to see if any query is holding the lock on that table for a long time.

References:
Hive Transactions
LanguageManual DML

no

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free