Skip to content

How to use the parameter tableNameAttribute in streamsx.hbase toolkit

Ahmad Nouri edited this page May 2, 2019 · 3 revisions

How to use the parameter tableNameAttribute in streamsx.hbase toolkit

This SPL sample application demonstrates

How to use tableNameAttribute .

How to delete rows from a HBASE table.

How to put data into a HBASE table.

How to use the parameter tableNameAttribute in all operators.

How to scan rows from a HBASE table.

We show 6 ways to scan data from a HBASE table.

Prerequisites software for this sample:

IBM Streams 4.3.0.0 or higher

https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.3.0/com.ibm.streams.welcome.doc/doc/kc-homepage.html

streamsx.hbase toolkit Version 3.6.0 or higher

https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.6.0

Apacahe HBase Version 1.2 or higher

https://hbase.apache.org/

SPL sample

/* Copyright (C) 2019, International Business Machines Corporation */
/* All Rights Reserved	*/
namespace application ;

use com.ibm.streamsx.hbase::HBASEDelete ;
use com.ibm.streamsx.hbase::HBASEPut ;
use com.ibm.streamsx.hbase::HBASEScan ;

/**  
 * This SPL application demonstrates: 
 * How to use the parameter **tableNameAttribute**
 * How to delete rows from a HBASE table. 
 * How to put data into a HBASE table. 
 * How to use the parameter tableNameAttribute in all operators.
 * How to scan rows from a HBASE table.
 * We show three ways to scan data from a HBASE table.
 *  
 * 
 * In HBASE, the column is divided up into two parts, the columnFamily
 * and the columnQualifier. All columns in a columnFamily are grouped
 * together on disk, so that which affects the efficiency of access. 
 * The table has a fixed set of column families, and you may not add a 
 * tuple to any other family. ColumnQualifiers, on the other hand, may be added
 * at runtime. 
 * 
 * 
 * To run this example, initialize a table with name 'hbase-test-table' and column families appearance and location.
 * In hbase shell, this is 
 * `create 'hbase-test-table','appearance','location'`
 * 
 * After running, if you scan the table
 * scan 'hbase-test-table'
 ROW                                      COLUMN+CELL                                                                                                        
 row0   column=location:location0, timestamp=1555512212660, value=value-0                                                  
 row1   column=location:location1, timestamp=1555512213338, value=value-1                                                  
 row10  column=location:location10, timestamp=1555512217965, value=value-10                                                
 row11  column=location:location11, timestamp=1555512218481, value=value-11                                                
 row12  column=location:location12, timestamp=1555512218986, value=value-12                                                
 row13  column=location:location13, timestamp=1555512219493, value=value-13                                                
 row14  column=location:location14, timestamp=1555512219997, value=value-14                                                
 row15  column=location:location15, timestamp=1555512220506, value=value-15                                                
 row16  column=location:location16, timestamp=1555512221036, value=value-16                                                
 row17  column=location:location17, timestamp=1555512221575, value=value-17                                                
 row18  column=location:location18, timestamp=1555512222593, value=value-18                                                
 row19  column=location:location19, timestamp=1555512223097, value=value-19                                                
 row2   column=location:location2, timestamp=1555512213842, value=value-2                                                  
 row3   column=location:location3, timestamp=1555512214348, value=value-3                                                  
 row4   column=location:location4, timestamp=1555512214853, value=value-4                                                  
 row5   column=location:location5, timestamp=1555512215357, value=value-5                                                  
 row6   column=location:location6, timestamp=1555512215859, value=value-6                                                  
 row7   column=location:location7, timestamp=1555512216364, value=value-7                                                  
 row8   column=location:location8, timestamp=1555512216885, value=value-8                                                  
 row9   column=location:location9, timestamp=1555512217416, value=value-9                                                  
 20 row(s) in 1.5760 seconds
 
 * 
 * Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project.
 * And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"
 * If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.
 */
 composite HbaseScan {
	param
		expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hbase.service.keytab");
		expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "hbase/hdp21.fyre.ibm.com@HDP2.COM");
		expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml");
		expression<rstring> $tableName : "hbase-test-table" ;
	type
		rowSchema = rstring tbName, rstring row, rstring colF, rstring colQ, rstring value ;
		resultSchema = rstring row, rstring columnFamily, rstring columnQualifier, int32 numResults, list<tuple<rstring value, int64 timeStamp>> value ;
	graph
		
		// generates data to delete rows from a HBASE table
		stream<rowSchema> genDeleteData = Beacon(){
			param
				initDelay : 1.0 ;
				iterations : 20u ;
			output
				genDeleteData : tbName = $tableName, row = "row" +(rstring)IterationCount(), colF = "location", 
				    colQ = "location" +(rstring)IterationCount(), value = "value-" + (rstring)IterationCount();
		}

		// Delete all versions explicitly
		stream<boolean success> deleteRows = HBASEDelete(genDeleteData){
			param
				tableNameAttribute : tbName ;
				rowAttrName : "row" ;
				columnFamilyAttrName : "colF" ;
				columnQualifierAttrName : "colQ" ;
				deleteAllVersions : true ;
				successAttr : "success" ;
				hbaseSite : $hbaseSite;
		}
 
		// generates rows to put into a HBASE table.
		stream<rowSchema> genPutData = Custom(deleteRows){
			logic
				state : {
					mutable int32 i2 = 0 ;
				}

				onPunct deleteRows : {
					if(currentPunct()== Sys.FinalMarker){
						for(int32 i in range(20)){
							submit({ tbName = $tableName, row = "row" +(rstring)i, colF = "location", 
							         colQ = "location" +(rstring)i, value = "value-" +(rstring)i }, genPutData);
						}

					}

				}

		}
 
		// put rows into a HBASE table.
		// Input tuple contains table name , rows and value
		(stream<boolean success> putData ; stream<rstring errorText, tuple<rowSchema> inTuple> errorPutData)= HBASEPut(genPutData){
        	// (stream<boolean success> putData ; stream<rstring errorText, rstring inTuple> errorPutData)= HBASEPut(genPutData){
			param
				tableNameAttribute : tbName ;
				//tableName : $tableName ;
				rowAttrName : "row" ;
				columnFamilyAttrName : "colF" ;
				columnQualifierAttrName : "colQ" ;
				valueAttrName : "value" ;
				// authPrincipal : $authPrincipal; 
				// authKeytab : $authKeytab;
				hbaseSite : $hbaseSite;
				successAttr : "success" ;
		}

		// pass the table name to the next stream
		stream<rstring tbName> passTableName = Custom(putData){
			logic
				onPunct putData : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName }, passTableName);
					}

				}

		}

		// scans an HBASE table and return all rows form table  
		// The input tuple includes only the table name.    
		stream<resultSchema> inputFullScan = HBASEScan(passTableName){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring startRow> tableNameStartRow = Custom(inputFullScan){
			logic
				onPunct inputFullScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, startRow = "row2" }, tableNameStartRow);
					}

				}

		}

		// scans an HBASE table and return all rows form startRow till end of table  
		// The input tuple includes table name and start row.    
		stream<resultSchema> inputStartRowScan = HBASEScan(tableNameStartRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring endRow> tableNameEndRow = Custom(inputStartRowScan){
			logic
				onPunct inputStartRowScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, endRow = "row7" }, tableNameEndRow);
					}

				}

		}

		// scans an HBASE table and return all rows form begin till endRow   
		// The input tuple includes table name and end row.    
		stream<resultSchema> inputEndRowScan = HBASEScan(tableNameEndRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		stream<rstring tbName, rstring startRow, rstring endRow> tableNameStartEndRow = Custom(inputEndRowScan){
			logic
				onPunct inputEndRowScan : {
					if(currentPunct()== Sys.FinalMarker){
						submit({ tbName = $tableName, startRow = "row1", endRow = "row13" }, tableNameStartEndRow);
					}

				}

		}

		// scans an HBASE table and return all rows between starRow and endRow   
		// The input tuple includes table name, start row and end row.    
		stream<resultSchema> inputStartEndRowScan = HBASEScan(tableNameStartEndRow){
			param
				tableNameAttribute : tbName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		// full scan without any input tuple
		stream<resultSchema> fullScan = HBASEScan(){
			param
				initDelay : 40.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				hbaseSite : $hbaseSite;
		}

		// scans from start row till end of table without any input tuple
		stream<resultSchema> startRowScan = HBASEScan(){
			param
				initDelay : 50.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				startRow : "row2" ;
				hbaseSite : $hbaseSite;
		}

		// scans from begin till end row of table without any input tuple
		stream<resultSchema> endRowScan = HBASEScan(){
			param
				initDelay : 60.0 ;
				tableName : $tableName ;
				outputCountAttr : "numResults" ;
				endRow : "row9" ;
				hbaseSite : $hbaseSite;
				
		}

		// returns tuples as string and forward them to the print streams
		stream<rstring streamsName, rstring result> returnTuplesAsString = Custom(putData; errorPutData; deleteRows ; inputFullScan ; 
			inputStartRowScan ; inputEndRowScan ; inputStartEndRowScan ; fullScan ; startRowScan ; endRowScan){
			logic
				onTuple deleteRows : submit({ streamsName = "deleteRows", result =(rstring)deleteRows }, returnTuplesAsString);
				onTuple errorPutData : submit({ streamsName = "errorPutData", result =(rstring)errorPutData }, returnTuplesAsString);
				onTuple putData : submit({ streamsName = "putData", result =(rstring)putData }, returnTuplesAsString);
				onTuple inputFullScan : submit({ streamsName = "inputFullScan", result =(rstring)inputFullScan }, returnTuplesAsString);
				onTuple inputStartRowScan : submit({ streamsName = "inputStartRowScan", result =(rstring)inputStartRowScan }, returnTuplesAsString);
				onTuple inputEndRowScan : submit({ streamsName = "inputEndRowScan", result =(rstring)inputEndRowScan }, returnTuplesAsString);
				onTuple inputStartEndRowScan : submit({ streamsName = "inputStartEndRowScan", result =(rstring)inputStartEndRowScan }, returnTuplesAsString);
				onTuple fullScan : submit({ streamsName = "fullScan", result =(rstring)fullScan }, returnTuplesAsString);
				onTuple startRowScan : submit({ streamsName = "startRowScan", result =(rstring)startRowScan }, returnTuplesAsString);
				onTuple endRowScan : submit({ streamsName = "endRowScan", result =(rstring)endRowScan }, returnTuplesAsString);
		}

		// print results on console
		()as printResult = Custom(returnTuplesAsString){
			logic
				state : {
					mutable int32 tupleCnt = 0 ;
					mutable rstring streamsName1 = "" ;
					mutable rstring streamsName2 = "" ;
				}

				onTuple returnTuplesAsString : {
					streamsName1 = streamsName ;
					if(streamsName1 != streamsName2){
						println("");
						println("************************   " + streamsName + "  **************************");
						streamsName2 = streamsName1 ;
						tupleCnt = 1 ;
					}

					println("Result of " + streamsName + " : " +(rstring)tupleCnt + "  " + result);
					tupleCnt ++ ;
				}

		}

}

Clone this wiki locally