-
Notifications
You must be signed in to change notification settings - Fork 12
How to use the parameter tableNameAttribute in streamsx.hbase toolkit
Ahmad Nouri edited this page May 2, 2019
·
3 revisions
This SPL sample application demonstrates
We show 6 ways to scan data from a HBASE table.
IBM Streams 4.3.0.0 or higher
streamsx.hbase toolkit Version 3.6.0 or higher
https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.6.0
Apacahe HBase Version 1.2 or higher
/* Copyright (C) 2019, International Business Machines Corporation */
/* All Rights Reserved */
namespace application ;
use com.ibm.streamsx.hbase::HBASEDelete ;
use com.ibm.streamsx.hbase::HBASEPut ;
use com.ibm.streamsx.hbase::HBASEScan ;
/**
* This SPL application demonstrates:
* How to use the parameter **tableNameAttribute**
* How to delete rows from a HBASE table.
* How to put data into a HBASE table.
* How to use the parameter tableNameAttribute in all operators.
* How to scan rows from a HBASE table.
* We show three ways to scan data from a HBASE table.
*
*
* In HBASE, the column is divided up into two parts, the columnFamily
* and the columnQualifier. All columns in a columnFamily are grouped
* together on disk, so that which affects the efficiency of access.
* The table has a fixed set of column families, and you may not add a
* tuple to any other family. ColumnQualifiers, on the other hand, may be added
* at runtime.
*
*
* To run this example, initialize a table with name 'hbase-test-table' and column families appearance and location.
* In hbase shell, this is
* `create 'hbase-test-table','appearance','location'`
*
* After running, if you scan the table
* scan 'hbase-test-table'
ROW COLUMN+CELL
row0 column=location:location0, timestamp=1555512212660, value=value-0
row1 column=location:location1, timestamp=1555512213338, value=value-1
row10 column=location:location10, timestamp=1555512217965, value=value-10
row11 column=location:location11, timestamp=1555512218481, value=value-11
row12 column=location:location12, timestamp=1555512218986, value=value-12
row13 column=location:location13, timestamp=1555512219493, value=value-13
row14 column=location:location14, timestamp=1555512219997, value=value-14
row15 column=location:location15, timestamp=1555512220506, value=value-15
row16 column=location:location16, timestamp=1555512221036, value=value-16
row17 column=location:location17, timestamp=1555512221575, value=value-17
row18 column=location:location18, timestamp=1555512222593, value=value-18
row19 column=location:location19, timestamp=1555512223097, value=value-19
row2 column=location:location2, timestamp=1555512213842, value=value-2
row3 column=location:location3, timestamp=1555512214348, value=value-3
row4 column=location:location4, timestamp=1555512214853, value=value-4
row5 column=location:location5, timestamp=1555512215357, value=value-5
row6 column=location:location6, timestamp=1555512215859, value=value-6
row7 column=location:location7, timestamp=1555512216364, value=value-7
row8 column=location:location8, timestamp=1555512216885, value=value-8
row9 column=location:location9, timestamp=1555512217416, value=value-9
20 row(s) in 1.5760 seconds
*
* Copy the HBASE configuration file from your HBASE server in etc directory in your SPL project.
* And set the value of hbaseSite parameter in all HBASE operator to "etc/hbase-site.xml"
* If you use kerberos authentication add the authKeytab and authPrincipal parameters in all HBASE operators.
*/
composite HbaseScan {
param
expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hbase.service.keytab");
expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "hbase/hdp21.fyre.ibm.com@HDP2.COM");
expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml");
expression<rstring> $tableName : "hbase-test-table" ;
type
rowSchema = rstring tbName, rstring row, rstring colF, rstring colQ, rstring value ;
resultSchema = rstring row, rstring columnFamily, rstring columnQualifier, int32 numResults, list<tuple<rstring value, int64 timeStamp>> value ;
graph
// generates data to delete rows from a HBASE table
stream<rowSchema> genDeleteData = Beacon(){
param
initDelay : 1.0 ;
iterations : 20u ;
output
genDeleteData : tbName = $tableName, row = "row" +(rstring)IterationCount(), colF = "location",
colQ = "location" +(rstring)IterationCount(), value = "value-" + (rstring)IterationCount();
}
// Delete all versions explicitly
stream<boolean success> deleteRows = HBASEDelete(genDeleteData){
param
tableNameAttribute : tbName ;
rowAttrName : "row" ;
columnFamilyAttrName : "colF" ;
columnQualifierAttrName : "colQ" ;
deleteAllVersions : true ;
successAttr : "success" ;
hbaseSite : $hbaseSite;
}
// generates rows to put into a HBASE table.
stream<rowSchema> genPutData = Custom(deleteRows){
logic
state : {
mutable int32 i2 = 0 ;
}
onPunct deleteRows : {
if(currentPunct()== Sys.FinalMarker){
for(int32 i in range(20)){
submit({ tbName = $tableName, row = "row" +(rstring)i, colF = "location",
colQ = "location" +(rstring)i, value = "value-" +(rstring)i }, genPutData);
}
}
}
}
// put rows into a HBASE table.
// Input tuple contains table name , rows and value
(stream<boolean success> putData ; stream<rstring errorText, tuple<rowSchema> inTuple> errorPutData)= HBASEPut(genPutData){
// (stream<boolean success> putData ; stream<rstring errorText, rstring inTuple> errorPutData)= HBASEPut(genPutData){
param
tableNameAttribute : tbName ;
//tableName : $tableName ;
rowAttrName : "row" ;
columnFamilyAttrName : "colF" ;
columnQualifierAttrName : "colQ" ;
valueAttrName : "value" ;
// authPrincipal : $authPrincipal;
// authKeytab : $authKeytab;
hbaseSite : $hbaseSite;
successAttr : "success" ;
}
// pass the table name to the next stream
stream<rstring tbName> passTableName = Custom(putData){
logic
onPunct putData : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName }, passTableName);
}
}
}
// scans an HBASE table and return all rows form table
// The input tuple includes only the table name.
stream<resultSchema> inputFullScan = HBASEScan(passTableName){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring startRow> tableNameStartRow = Custom(inputFullScan){
logic
onPunct inputFullScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, startRow = "row2" }, tableNameStartRow);
}
}
}
// scans an HBASE table and return all rows form startRow till end of table
// The input tuple includes table name and start row.
stream<resultSchema> inputStartRowScan = HBASEScan(tableNameStartRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring endRow> tableNameEndRow = Custom(inputStartRowScan){
logic
onPunct inputStartRowScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, endRow = "row7" }, tableNameEndRow);
}
}
}
// scans an HBASE table and return all rows form begin till endRow
// The input tuple includes table name and end row.
stream<resultSchema> inputEndRowScan = HBASEScan(tableNameEndRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
stream<rstring tbName, rstring startRow, rstring endRow> tableNameStartEndRow = Custom(inputEndRowScan){
logic
onPunct inputEndRowScan : {
if(currentPunct()== Sys.FinalMarker){
submit({ tbName = $tableName, startRow = "row1", endRow = "row13" }, tableNameStartEndRow);
}
}
}
// scans an HBASE table and return all rows between starRow and endRow
// The input tuple includes table name, start row and end row.
stream<resultSchema> inputStartEndRowScan = HBASEScan(tableNameStartEndRow){
param
tableNameAttribute : tbName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
// full scan without any input tuple
stream<resultSchema> fullScan = HBASEScan(){
param
initDelay : 40.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
hbaseSite : $hbaseSite;
}
// scans from start row till end of table without any input tuple
stream<resultSchema> startRowScan = HBASEScan(){
param
initDelay : 50.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
startRow : "row2" ;
hbaseSite : $hbaseSite;
}
// scans from begin till end row of table without any input tuple
stream<resultSchema> endRowScan = HBASEScan(){
param
initDelay : 60.0 ;
tableName : $tableName ;
outputCountAttr : "numResults" ;
endRow : "row9" ;
hbaseSite : $hbaseSite;
}
// returns tuples as string and forward them to the print streams
stream<rstring streamsName, rstring result> returnTuplesAsString = Custom(putData; errorPutData; deleteRows ; inputFullScan ;
inputStartRowScan ; inputEndRowScan ; inputStartEndRowScan ; fullScan ; startRowScan ; endRowScan){
logic
onTuple deleteRows : submit({ streamsName = "deleteRows", result =(rstring)deleteRows }, returnTuplesAsString);
onTuple errorPutData : submit({ streamsName = "errorPutData", result =(rstring)errorPutData }, returnTuplesAsString);
onTuple putData : submit({ streamsName = "putData", result =(rstring)putData }, returnTuplesAsString);
onTuple inputFullScan : submit({ streamsName = "inputFullScan", result =(rstring)inputFullScan }, returnTuplesAsString);
onTuple inputStartRowScan : submit({ streamsName = "inputStartRowScan", result =(rstring)inputStartRowScan }, returnTuplesAsString);
onTuple inputEndRowScan : submit({ streamsName = "inputEndRowScan", result =(rstring)inputEndRowScan }, returnTuplesAsString);
onTuple inputStartEndRowScan : submit({ streamsName = "inputStartEndRowScan", result =(rstring)inputStartEndRowScan }, returnTuplesAsString);
onTuple fullScan : submit({ streamsName = "fullScan", result =(rstring)fullScan }, returnTuplesAsString);
onTuple startRowScan : submit({ streamsName = "startRowScan", result =(rstring)startRowScan }, returnTuplesAsString);
onTuple endRowScan : submit({ streamsName = "endRowScan", result =(rstring)endRowScan }, returnTuplesAsString);
}
// print results on console
()as printResult = Custom(returnTuplesAsString){
logic
state : {
mutable int32 tupleCnt = 0 ;
mutable rstring streamsName1 = "" ;
mutable rstring streamsName2 = "" ;
}
onTuple returnTuplesAsString : {
streamsName1 = streamsName ;
if(streamsName1 != streamsName2){
println("");
println("************************ " + streamsName + " **************************");
streamsName2 = streamsName1 ;
tupleCnt = 1 ;
}
println("Result of " + streamsName + " : " +(rstring)tupleCnt + " " + result);
tupleCnt ++ ;
}
}
}