This topic describes how to use Java APIs to manage Kudu tables.
Prerequisites
An E-MapReduce (EMR) cluster is created, and Kudu is selected from the optional services when you create the cluster. For more information, see Create a cluster.
Background information
You can perform the following operations on Kudu tables:
Create a table
Use the createTable method to create a table. In the code, you must configure the schema and partition information of the table.
public static void createTable(String masterAddress, String tableName) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
List<ColumnSchema> columnSchemas = new ArrayList<>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("ID", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("score", Type.INT32).nullable(false).build());
Schema tableSchema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
List<String> partitionKey = new ArrayList<>();
partitionKey.add("ID");
options.addHashPartitions(partitionKey, 16);
options.setNumReplicas(1);
KuduTable personTable = client.createTable(tableName, tableSchema, options);
System.out.println("Table " + personTable.getName());
}
Note In the sample code, a table that contains the ID and score columns is created. ID
is a primary key field of the Int32 type. score is a non-primary key field of the
Int32 type and can be left empty. The table is hash partitioned into 16 tablets based
on the ID column.
Modify a table
- Use the alterTable method to add a column named newCol to a Kudu table.
public static void alterTable(String masterAddress, String tableName) throws KuduException { System.out.println("Connecting to " + masterAddress); KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); String newColumnName = "newCol"; AlterTableOptions addCollumnOptions = new AlterTableOptions(); addCollumnOptions.addColumn(newColumnName, Type.STRING, ""); AlterTableResponse response = client.alterTable(tableName, addCollumnOptions); System.out.println("Add column " + response.getElapsedMillis()); KuduTable table = client.openTable(tableName); List<ColumnSchema> columnSchemas = table.getSchema().getColumns(); for (int i=0; i<columnSchemas.size(); ++i) { ColumnSchema columnSchema = columnSchemas.get(i); System.out.println("Column " + i + ":" + columnSchema.getName());; } }
- Use the alterTable method to delete a column named newCol from a Kudu table.
public static void alterTable(String masterAddress, String tableName) throws KuduException { System.out.println("Connecting to " + masterAddress); KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); String newColumnName = "newCol"; AlterTableOptions deleteColumnOption = new AlterTableOptions(); deleteColumnOption.dropColumn(newColumnName); response = client.alterTable(tableName, deleteColumnOption); System.out.println("Delete column " + response.getElapsedMillis()); table = client.openTable(tableName); columnSchemas = table.getSchema().getColumns(); for (int i=0; i<columnSchemas.size(); ++i) { ColumnSchema columnSchema = columnSchemas.get(i); System.out.println("Column " + i + ":" + columnSchema.getName());; } }
Write data to a table
Use the newSession method to insert multiple records into an existing Kudu table and check the return results.
public static void insertRows(String masterAddress, String tableName, int numRows) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
for (int i=0; i<numRows; ++i) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
row.addInt("ID", i+1);
row.addInt("score", i+10);
session.apply(upsert);
}
// Call session.close() to end the session and ensure the rows are
// flushed and errors are returned.
// You can also call session.flush() to do the same without ending the session.
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended
// for most workloads, you must check the pending errors as shown below, since
// write operations are flushed to Kudu in background threads.
session.close();
if (session.countPendingErrors() != 0) {
System.out.println("errors inserting rows");
org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
int numErrs = Math.min(errs.length, 5);
System.out.println("there were errors inserting rows to Kudu");
System.out.println("the first few errors follow:");
for (int i = 0; i < numErrs; i++) {
System.out.println(errs[i]);
}
if (roStatus.isOverflowed()) {
System.out.println("error buffer overflowed: some errors were discarded");
}
throw new RuntimeException("error inserting rows to Kudu");
}
System.out.println("Inserted " + numRows + " rows");
}
Note In the sample code,
numRows
specifies the number of records that are inserted into the table.
Read data from a table
Use the newScannerBuilder method to read all records from a Kudu table.
public static void scanTable(String masterAddress, String tableName, int numRows) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
KuduTable table = client.openTable(tableName);
Schema schema = table.getSchema();
// Scan with a predicate on the 'key' column, returning the 'value' and "added" columns.
List<String> projectColumns = new ArrayList<>(2);
projectColumns.add("ID");
projectColumns.add("score");
int lowerBound = 0;
KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("ID"),
KuduPredicate.ComparisonOp.GREATER_EQUAL,
lowerBound);
int upperBound = numRows / 2;
KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("ID"),
KuduPredicate.ComparisonOp.LESS,
upperBound);
KuduScanner scanner = client.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns)
.addPredicate(lowerPred)
.addPredicate(upperPred)
.build();
// Check the correct number of values and null values are returned, and
// that the default value was set for the new column on each row.
// Note: scanning a hash-partitioned table will not return results in primary key order.
int resultCount = 0;
int nullCount = 0;
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
resultCount++;
}
}
int expectedResultCount = upperBound - lowerBound -1;
if (resultCount != expectedResultCount) {
throw new RuntimeException("scan error: expected " + expectedResultCount +
" results but got " + resultCount + " results");
}
System.out.println("Scanned some rows and checked the results");
}
Delete a table
Use the deleteTable method to delete a Kudu table.
public static void deleteTable(String masterAddress, String tableName) throws KuduException {
System.out.println("Connecting to " + masterAddress);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
DeleteTableResponse response = client.deleteTable(tableName);
System.out.println("Table delete " + response.getElapsedMillis());
}