Tuesday, January 15, 2013

HBase 0.95 Source Code Analyze


Just as the description of HBase homepage: Apache HBase is an open-source, distributed, versioned, column-oriented store modeled after Google's Bigtable: A Distributed Storage System for Structured Data by Change et al. Just as Bigtable leverages the distributed data storage provided by the Google File System. Apache HBase provides Bigtable-like capabilities on the top of Hadoop and HDFS.

HBase 0.95 is the most recent version, In this post, we will record the source code analysis in several aspects: 1) how client connects to HBase; 2) how servers inside HBase connect each other; 3) how data was written or gotten from HBase; 4) Finally, the important APIs and their implementations. To make this article meaningful to guys who are not currently reading the source, i will try my best do not list too much code, instead, i will describe how it works and why. Hope this post would be helpful for anyone interested in HBase.


SECTION 1. How Clients Connect to HBase

When programmers use Java clients, the way clients connect to HBase is the simplest. Below shows an example:

1:  HTable htable = ...   // instantiate HTable  
2:  Get get = new Get(Bytes.toBytes("row1"));  
3:  Result r = htable.get(get);  
4:  Put put = new Put(Bytes.toBytes(row));  
5:  put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data));     
6:  htable.put(put);  
To the HBase source, first step is instantiating HTable instance. To instantiate a HTable instance, you should at least provide the "table name", and a "configuration" instance, then we will call: 
public HTable(Configuration conf, final byte [] tableName)  
One of the most important tasks in this function is to get a "HConnection" instance, and to create a ThreadPoolExecutor. All "HConnection"s are managed by HConnectionManager. There is a inner class in HConnectionManager named HConnectionKey which denotes all the parameters set by the configuration file in a HBase instance. And according to this HConnectionKey, HBase will choose different HConnectionImplementation. This strategy means HBase support reuse connections if the HTable was came from the same users and with the same configurations. So, the real connection from users clients to HBase servers is  implemented through HConnectionManager.HConnectionImplementation. In the constructor of HConnectionImplementation, we set all the private variables, like conf, managed, pause, numRetries, maxRPCAttempts, rpcTimeout, and cluster Id. Besides these statics, two class instances were created for future use: adminClass and clientClass. These two variables are created through reflection of Java like this:
1:  String adminClassName = conf.get(REGION_PROTOCOL_CLASS,  
2:      DEFAULT_ADMIN_PROTOCOL_CLASS);  
3:  this.adminClass =  
4:       (Class<? extends AdminProtocol>) Class.forName(adminClassName);  
5:  String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,  
6:      DEFAULT_CLIENT_PROTOCOL_CLASS);  
7:  this.clientClass =  
8:       (Class<? extends ClientProtocol>) Class.forName(clientClassName);  
When applications do use a HConnection, HConnectionImplementation takes charge of return a RPC Protocol instance according to the requests (for admin or clients). So there is a "getProtocol" method in HConnectionImplementation. RPC implementation is the core conception to understand how clients connect to HBase, so in next section, we describe it in detail.

1.1 RPC and Protocol in HBase

RPC in HBase works as Hadoop: they does not use the build-in java RPC mechanism, instead, they both use a similar approach: implementing their own RPC mechanism. The difference is HBase also combine Protobuf as the serialization component.

All the RPC relevant source files are located in ./org/apache/hadoop/hbase/ipc folder. And all the protocol relevant source files includes the .proto file and generated *Proto files are located in hadoop-protocol folders.

Before talking how HBase implement RPC and combine it with Protobuf, we firstly introduce the build-in dynamic proxy mechanism in Java. It works like the Proxy Design Pattern except using Proxy and Invoker class makes the proxy pattern become automatic in Java. 
Fig.1 Java Dynamic Proxy

As Fig.1 shows, the proxy object was generated by calling Proxy.newProxyInstance method instead of written by hand, and the method call will be automatically transfered to the real object using InvokerHandler's invoke method.

In HBase,  RPC series classes were mapped as Fig.2 shows:
Fig.2 HBase RPC protocol dynamic proxy mechanism

HBase uses lots of concurrent methods to accelerate the performance of client reads/writes. To connect to the a RegionServer of HBase, we need a "ClientProtocol" instance, which is generated by call HConnectionImplementation's getClient method. Start form here, there would be a serial call stack that combines all the RPC and ProtoBuf together:

HConnectionImplementation.getClient -> getProtocol -> HBaseClientRPC.waitForProxy -> HBaseClientRPC.getProxy -> RpcClientEngine.getProxy -> ProtobufRpcClientEngine.getProxy

Here, the ProtobufRpcClientEngine.getProxy() method will generate a proxy instance that is the proxy of the core 'ClientProtocol' instance came from the "HCI.getClient()" method. The "invoke" function in ProtobufRpcClientEngine's "Invoker" static class will do the RPC client stuffs:
1:  public Object invoke(Object proxy, Method method, Object[] args)  
2:      throws ServiceException {  
3:     long startTime = 0;  
4:     if (LOG.isDebugEnabled()) {  
5:      startTime = System.currentTimeMillis();  
6:     }  
7:     RpcRequestBody rpcRequest = constructRpcRequest(method, args);  
8:     Message val = null;  
9:     try {  
10:      val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);  
11:      if (LOG.isDebugEnabled()) {  
12:       long callTime = System.currentTimeMillis() - startTime;  
13:       if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);  
14:      }  
15:      return val;  
16:     } catch (Throwable e) {  
17:      if (e instanceof RemoteException) {  
18:       Throwable cause = ((RemoteException)e).unwrapRemoteException();  
19:       throw new ServiceException(cause);  
20:      }  
21:      throw new ServiceException(e);  
22:     }  
23:    }  
It constructs the RPC requestes and calls client.call() to do the real communications. Here, HBase uses Google's protobuf to construct the rpc requests: a RpcRequestBody. Then, the "HBaseClient" class takes charge of all the rpc requests and return a Message object as the results.

Till then, all the client code has finished, the method call in the ClientProtocol will be collected and reconstructed by Protobuf and sent to RegionServer. SECTION 1 finished.



SECTION 2. How Servers inside HBase Connect to Each other

There are two different types of roles in HBase: HMaster, HRegion. HMaster was elected and stared since then, HRegion runs on each server. The start point of HMaster is inside the HMaster.java file, which includes the static main method. All the main functions in HBase are delegated to an abstract ServerCommandLine class and its doMain method. Inside the doMain method, HBase call ToolRunner.run(conf, this, arg) to do the real JVM start job. The ToolRunner class was defined in Hadoop Core, so we won't describe it here. The only thing we need to know is once the ToolRunner.run began, it will call "this.run" method. To our HBase case, we will run HMasterCommandLine.run method, and finally call the "startMaster" method.

After parsing the arguments, things rollbacks to HBase again. 1) call HMaster.constructMaster() to build a HBase instance; 2) call the start method of this instance; 3) call join method of this instance; 4) Check some errors.

In constructMaster(), HBase uses reflection mechanism to build a HBase Class instance. In fact, i am still curious why construct a new instance this way? Is it faster? or more flexible?

As HBase is a subclass of abstract class: HasThread, so it automatically is Runnable. the start method means HMaster thread begin to work, and join means the JVM won't exit until HMaster exits. The way HMaster starts is interesting: all the servers would start a HMaster instance when users try to bootup a cluster, and these servers will race with others to write something on ZooKeeper. As ZooKeeper can guarantee, there would be only one success HMaster. All the others will block to wait until the success one fails and re-race again; The success one will initialization itself and begin an infinit loop to provide service.

So, since then, HMaster has started, and all other RegionServers will connect to it.

As we have described before, all servers will run HRegionServer (HRS for short) on itself too.  HRS starts using the exact same way like HMaster: In the main function, call the HRegionServerCommandLine.doMain() method. In the run method of HRS, we firstly try and register with the HMaster, tell it we are here. After registering with HMaster, HRS will go into an infinit loop. Inside the loop, HRS takes care of the region stop stuff, and report to HMaster every msgInterval seconds. There is a heartbeat report implemented by function: tryRegionServerReport(lastMsg, now). Until now, HRS started. We can communicate inside the HBase now.



2.1 RPC Server Start in HBase

We know that there are different RPC protocols in HBase, and there are only two servers: HMaster and HRegionServer running in an HBase instance. So, each server worked as a RPC server for different protocols.

HMaster implements: MasterMonitorProtocol, MasterAdminProtocol, RegionServerStatusProtocol
HRS implements: ClientProtocol, AdminProtocol

Though the protocols they implemented are quite different, the RPC server start procedures are quite similar. For example, in HRS, the RPC server was initialized in its Constructor method as fellow:
 this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,  
     new Class<?>[]{ClientProtocol.class,  
       AdminProtocol.class, HBaseRPCErrorHandler.class,  
       OnlineRegions.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     conf.getInt("hbase.regionserver.handler.count", 10),  
     conf.getInt("hbase.regionserver.metahandler.count", 10),  
     conf.getBoolean("hbase.rpc.verbose", false),  
     conf, HConstants.QOS_THRESHOLD);  
In HMaster, the RPC server was initialized in the exactly same place with similar code:
 this.rpcServer = HBaseServerRPC.getServer(MasterMonitorProtocol.class, this,  
     new Class<?>[]{MasterMonitorProtocol.class,  
       MasterAdminProtocol.class, RegionServerStatusProtocol.class},  
     initialIsa.getHostName(), // BindAddress is IP we got for this server.  
     initialIsa.getPort(),  
     numHandlers,  
     0, // we dont use high priority handlers in master  
     conf.getBoolean("hbase.rpc.verbose", false), conf,  
     0); // this is a DNC w/o high priority handlers  
So, all these RPC servers startup is handled by HBaseServerRPC class.

How HBaseServerRPC starts the RPC server? It is identical with HBaseClientRPC starts, as following figure shows.
Figure 3. How RPC Server starts in HBase

After these servers start, the communication inside the HBase cluster would be very clear:
Figure 4. How Servers communication inside HBase



... to be continue.



REFERENCE
----------------------
[1] Java Proxy, http://docs.oracle.com/javase/1.4.2/docs/api/java/lang/reflect/Proxy.html
[2] HBase Project, http://hbase.apache.org 


No comments:

Post a Comment