HBase migration across major versions of HBase (0.2x -> 0.9x)

September 30, 2011


I inherited a legacy version of the hadoop stack (0.20.2) running hbase v. 0.20.3 on top.
As this is quite an old and unsupported version, we obviously wanted to upgrade.
The main problem with this is that this system is already in use in production, thus increasing the upgrade risks. In addition, the instance installed was compiled and patched manually, so an in-place upgrade was out of the question.

For the new cluster, installed side by side the old one, I chose the “Cloudera Distribution including Apache Hadoop (CDH3)” which is very easy to install and set up via aptitude, and prepares all the standardized stuff like config file locations and init scripts.
This should also hopefully enable relatively painless updates in the future.

Sematext already has a nice blog post on the subject of various HBase backup options, so I won’t go over them in too much detail.

All I will say is that the built-in CopyTable MR job is out of the question since it only works with two clusters of the same major version of HBase. Otherwise life would be much easier…

What I’ve done is emulate the functionality of CopyTable without actually having to backport the job to fit the old codebase (seems others have done this, but the source was nowhere to be found, what more, the whole idea is ‘iffy’).

The script works like so:

  • Export from HBase to HDFS with MR job
  • Pull with distcp MR job via the HFTP protocol to new HDFS cluster (Note: see below)
  • Run import MR job on new cluster

There are three use cases, which of course correlate with the original CopyTable functionality:

  1. Full table copy
  2. Partial table from specified time stamp forward
  3. Partial table between two specified time stamps

The idea here is that due to the time it takes to export the table, new data might be written (the whole process can take a while, especially with large tables, and as we mentioned the original cluster is still in production use, so a minimal data discrepancy is preferred till cut off time).

Thus we can either transfer the table in chunks, taking advantage of the built in time stamp of every row in HBase, and eventually transfer a smaller and smaller selection of the table till the switch is made.

The script itself still lacks some robustness, and includes some admittedly ugly parts (wrapping shell commands with os.popen) but it does the job and of course feedback and improvements are welcome.

Note regarding the use of the HFTP protocol when using the distcp MR job – as per the official guide for the job, the appendix states the following under the title “Copying between versions of HDFS“:

For copying between two different versions of Hadoop, one will usually use HftpFileSystem. This is a read-only FileSystem, so DistCp must be run on the destination cluster (more specifically, on TaskTrackers that can write to the destination cluster). Each source is specified as hftp:/// (the default dfs.http.address is :50070).

Theoretically, it could also be modified to be used as a part of a backup mechanism, but that’s up to you.

I will be uploading it to github as well for the sake of mankind.

Have fun! Your input is valued as always!

Code (of course you’ll have to change the config settings to suit your environment):

#!/usr/bin/env python

# This script copies tables (either whole or partially) across HBase clusters
# It is to be run from the destination cluster
# Requires thrift servers to be running on both clusters
# Requires full network access from destination to origin (not only thrift port)
# This is for running the MR jobs remotely

# config
source_hbase_server = 'source_thrift_server'
source_thrift_port = 9090
source_hftp_server = 'source_hdfs_datanode'
source_hftp_port = 50070
source_hadoop_bin = '/usr/local/hadoop/bin/hadoop'
source_hbase_jar = '/usr/local/hbase/hbase-0.20.3.jar'
source_export_path = '/export' # path in hdfs
source_ssh_port = 22
source_ssh_user = 'hadoop'
dest_hbase_server = ''
dest_thrift_port = 9090
dest_hadoop_bin = '/usr/bin/hadoop'
dest_hbase_jar = '/usr/lib/hbase/hbase-0.90.3-cdh3u1.jar'
dest_import_path = '/import' # path in hdfs
no_of_versions = 3

logdir = '/var/log/hbase_migrate'
logfile = 'hbase_migrate.log'
loglevel = 1

import sys,os
from itertools import imap
# hbase and thrift are available from easy_install
from hbase import Hbase
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase.ttypes import *
from optparse import OptionParser

# define logger
import logging
# get file location from config
logfilename = "%s/%s" % (logdir, logfile)
                    format='%(asctime)s %(levelname)-8s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S',
logger = logging.getLogger("hbase_migrate")
# check config file for reporting level
if loglevel == 1:

# connect to servers
source_transport = TSocket.TSocket(source_hbase_server, source_thrift_port)
source_transport = TTransport.TBufferedTransport(source_transport)
source_protocol = TBinaryProtocol.TBinaryProtocol(source_transport)
source_client = Hbase.Client(source_protocol)

dest_transport = TSocket.TSocket(dest_hbase_server, dest_thrift_port)
dest_transport = TTransport.TBufferedTransport(dest_transport)
dest_protocol = TBinaryProtocol.TBinaryProtocol(dest_transport)
dest_client = Hbase.Client(dest_protocol)

def parse_args():
    usage = "%prog table_name [ begin_timestamp [[ end_timestamp ]] ]\n\
              If 'all' is specified as <table_name>, all tables will be handled. \n\
              If only one timestamp is provided, it will be used as the start point \n\
              and all records from this point in time and forward will be copied."

    parser = OptionParser(usage=usage)

    options, args = parser.parse_args()
    logger.debug('arguments passed: %s' % args)

    if len(args) == 1:
        # only the table name was passed
        logger.debug('table: %s passed' % args[0])
            type(args[0]) == str
            logger.error('bad argument passed')
            parser.error('timestamp must be an integer')
    elif len(args) == 2:
        # this means only the begin timestamp was passed
        # so we process all the records from then to infinity
        # first though check if the timestamp is really something semblant of a timestamp
            type(args[0]) == str
            logger.error('bad argument passed')
            parser.error('timestamp must be an integer')
    elif len(args) == 3:
        # both timestamps were passed, we should handle all the records in between
            type(args[0]) == str
            logger.error('bad argument passed')
            parser.error('timestamps must be integers')
        parser.error("Must supply table name, and optional timestamps. Use --help for usage instructions.")

    return options, args

def check_table_exists(table_name):
    """Verify existence of table and return error if doesn't exist."""
        table_list = source_client.getTableNames()
        logger.error('failed to get list of tables')
    if table_name in table_list:
        logger.debug('%s found in table_list' % table_name)
        return True
        logger.error('%s not found in table_list' % table_name)
        return False

def get_tables():
    """'all' was passed, get the list of tables from the origin server."""
        table_list = source_client.getTableNames()
        return table_list
        logger.error('failed to get list of tables')

def copy_table(table_name, begin_timestamp="", end_timestamp=""):
    """export the table to hdfs, use hftp to hdfs copy to destination server
       and then import to hbase"""

    logger.debug('was passed table: %s will try to copy to %s' % (table_name, dest_hbase_server))

    # export to hfds on source
        # clean export dir before actually exporting
        logger.debug('deleting hdfs dir on source: %s/%s' % (source_export_path, table_name))
        delete_exportdir_command = 'ssh -p %s %s@%s "%s dfs -rmr %s/%s"' % (
        source_ssh_port, source_ssh_user, source_hbase_server, source_hadoop_bin, source_export_path, table_name)
        logger.debug('problem deleting %s/%s on source hdfs server' % (source_export_path, table_name))
        # we'll pass empty strings instead of timestamps if not specified because they are optional
        export_command = 'ssh -p %s %s@%s "%s jar %s export %s %s/%s %s %s %s"' % (
        source_ssh_port, source_ssh_user, source_hbase_server, source_hadoop_bin,
        source_hbase_jar, table_name, source_export_path, table_name, no_of_versions, begin_timestamp, end_timestamp)
        logger.debug('attempting to run: %s' % export_command)
        logger.error('problem exporting table: "%s" on source server to hdfs - aborting' % table_name)
    # perform distcp hftp to hdfs pull
        # please remember this script should be run on the destination server
        distcp_command = 'su - hdfs -c \'%s distcp -overwrite -p "hftp://%s:%s%s/%s" "hdfs://localhost%s/%s"\'' % (
        dest_hadoop_bin, source_hftp_server, source_hftp_port, source_export_path, table_name,
        dest_import_path, table_name)
        logger.debug('attempting to run: %s' % distcp_command)
        logger.error('problem performing distcp for table: "%s" - aborting' % table_name)

        # get column families and definitions and create on dest server
        # must be created before attempting to import
        desc = source_client.getColumnDescriptors(table_name)
        column_families = []
        for name,cf in desc.items():
            # parse origin schema and make nice for create
            cf.name = '%s:' % name
            # little patch due to inconsistencies in API
            if cf.bloomFilterType == 'false':
                cf.bloomFilterType = 'NONE'
        dest_client.createTable(table_name, column_families)
        logger.debug('created table %s' % table_name)

    except AlreadyExists, tx:
        logger.debug ('table %s exists already... %s' % (table_name, tx.message))
        # run the local import
        import_command = 'su - hbase  -c "%s jar %s import %s %s/%s"' % (
        dest_hadoop_bin, dest_hbase_jar, table_name, dest_import_path, table_name)
        logger.debug('attempting to run: %s' % import_command)
        logger.error('problem importing table: "%s" - aborting' % table_name)

def main():

        print("connection to %s with port %s failed" % (source_hbase_server,source_thrift_port))

        print("connection to %s with port %s failed" % (dest_hbase_server,dest_thrift_port))
    options, args = parse_args()
    table_name = args[0]
    if table_name == 'all':
        # handle all the tables
        table_list = get_tables()
        # here we differentiate between the cases (if timestamps were provied or not at runtime)
        if len(args) == 3:
            begin_timestamp = args[1]
            end_timestamp = args[2]
            for table in table_list:
                copy_table(table, begin_timestamp, end_timestamp)
        elif len(args) == 2:
            begin_timestamp = args[1]
            for table in table_list:
                copy_table(table, begin_timestamp)
            for table in table_list:

    elif check_table_exists(table_name):
        # handle a single table copy
        if len(args) == 3:
            begin_timestamp = args[1]
            end_timestamp = args[2]
            copy_table(table_name, begin_timestamp, end_timestamp)
        elif len(args) == 2:
            begin_timestamp = args[1]
            copy_table(table_name, begin_timestamp)
        # should probably not reach this point...
        logger.error('something went wrong with main operation')

if __name__ == "__main__":

