This topic describes how to use Java APIs to manage Kudu tables.

Prerequisites

An E-MapReduce (EMR) cluster is created, and Kudu is selected from the optional services when you create the cluster. For more information, see Create a cluster.

Background information

You can perform the following operations on Kudu tables:

Create a table

Use the createTable method to create a table. In the code, you must configure the schema and partition information of the table.

public static void createTable(String masterAddress, String tableName) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();

    List<ColumnSchema> columnSchemas = new ArrayList<>();
    columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("ID", Type.INT32).key(true).build());
    columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("score", Type.INT32).nullable(false).build());
    Schema tableSchema = new Schema(columnSchemas);

    CreateTableOptions options = new CreateTableOptions();
    List<String> partitionKey = new ArrayList<>();
    partitionKey.add("ID");
    options.addHashPartitions(partitionKey, 16);
    options.setNumReplicas(1);
    KuduTable personTable = client.createTable(tableName, tableSchema, options);
    System.out.println("Table " + personTable.getName());
}
Note In the sample code, a table that contains the ID and score columns is created. ID is a primary key field of the Int32 type. score is a non-primary key field of the Int32 type and can be left empty. The table is hash partitioned into 16 tablets based on the ID column.

Modify a table

  • Use the alterTable method to add a column named newCol to a Kudu table.
    public static void alterTable(String masterAddress, String tableName) throws KuduException {
        System.out.println("Connecting to " + masterAddress);
        KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
        String newColumnName = "newCol";
    
        AlterTableOptions addCollumnOptions = new AlterTableOptions();
        addCollumnOptions.addColumn(newColumnName, Type.STRING, "");
    
        AlterTableResponse response = client.alterTable(tableName, addCollumnOptions);
        System.out.println("Add column " + response.getElapsedMillis());
    
        KuduTable table = client.openTable(tableName);
        List<ColumnSchema> columnSchemas = table.getSchema().getColumns();
        for (int i=0; i<columnSchemas.size(); ++i) {
            ColumnSchema columnSchema = columnSchemas.get(i);
            System.out.println("Column " + i + ":" + columnSchema.getName());;
        }
    }
                        
  • Use the alterTable method to delete a column named newCol from a Kudu table.
    public static void alterTable(String masterAddress, String tableName) throws KuduException {
        System.out.println("Connecting to " + masterAddress);
        KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
        String newColumnName = "newCol";
    
        AlterTableOptions deleteColumnOption = new AlterTableOptions();
        deleteColumnOption.dropColumn(newColumnName);
        response = client.alterTable(tableName, deleteColumnOption);
        System.out.println("Delete column " + response.getElapsedMillis());
    
        table = client.openTable(tableName);
        columnSchemas = table.getSchema().getColumns();
        for (int i=0; i<columnSchemas.size(); ++i) {
            ColumnSchema columnSchema = columnSchemas.get(i);
            System.out.println("Column " + i + ":" + columnSchema.getName());;
        }
    }
                        

Write data to a table

Use the newSession method to insert multiple records into an existing Kudu table and check the return results.

public static void insertRows(String masterAddress, String tableName, int numRows) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
    KuduTable table = client.openTable(tableName);
    KuduSession session = client.newSession();
    for (int i=0; i<numRows; ++i) {
        Upsert upsert = table.newUpsert();
        PartialRow row = upsert.getRow();
        row.addInt("ID", i+1);
        row.addInt("score", i+10);

        session.apply(upsert);
    }

    // Call session.close() to end the session and ensure the rows are
    // flushed and errors are returned.
    // You can also call session.flush() to do the same without ending the session.
    // When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended
    // for most workloads, you must check the pending errors as shown below, since
    // write operations are flushed to Kudu in background threads.
    session.close();
    if (session.countPendingErrors() != 0) {
        System.out.println("errors inserting rows");
        org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
        org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
        int numErrs = Math.min(errs.length, 5);
        System.out.println("there were errors inserting rows to Kudu");
        System.out.println("the first few errors follow:");
        for (int i = 0; i < numErrs; i++) {
            System.out.println(errs[i]);
        }
        if (roStatus.isOverflowed()) {
            System.out.println("error buffer overflowed: some errors were discarded");
        }
        throw new RuntimeException("error inserting rows to Kudu");
    }
    System.out.println("Inserted " + numRows + " rows");
}
Note In the sample code, numRows specifies the number of records that are inserted into the table.

Read data from a table

Use the newScannerBuilder method to read all records from a Kudu table.

public static void scanTable(String masterAddress, String tableName, int numRows) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
    KuduTable table = client.openTable(tableName);
    Schema schema = table.getSchema();

    // Scan with a predicate on the 'key' column, returning the 'value' and "added" columns.
    List<String> projectColumns = new ArrayList<>(2);
    projectColumns.add("ID");
    projectColumns.add("score");
    int lowerBound = 0;
    KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
            schema.getColumn("ID"),
            KuduPredicate.ComparisonOp.GREATER_EQUAL,
            lowerBound);
    int upperBound = numRows / 2;
    KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
            schema.getColumn("ID"),
            KuduPredicate.ComparisonOp.LESS,
            upperBound);
    KuduScanner scanner = client.newScannerBuilder(table)
            .setProjectedColumnNames(projectColumns)
            .addPredicate(lowerPred)
            .addPredicate(upperPred)
            .build();

    // Check the correct number of values and null values are returned, and
    // that the default value was set for the new column on each row.
    // Note: scanning a hash-partitioned table will not return results in primary key order.
    int resultCount = 0;
    int nullCount = 0;
    while (scanner.hasMoreRows()) {
        RowResultIterator results = scanner.nextRows();
        while (results.hasNext()) {
            RowResult result = results.next();
            resultCount++;
        }
    }
    int expectedResultCount = upperBound - lowerBound -1;
    if (resultCount != expectedResultCount) {
        throw new RuntimeException("scan error: expected " + expectedResultCount +
                " results but got " + resultCount + " results");
    }

    System.out.println("Scanned some rows and checked the results");
}

Delete a table

Use the deleteTable method to delete a Kudu table.

public static void deleteTable(String masterAddress, String tableName) throws KuduException {
    System.out.println("Connecting to " + masterAddress);
    KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();

    DeleteTableResponse response = client.deleteTable(tableName);
    System.out.println("Table delete " + response.getElapsedMillis());
}