全部產品
Search
文件中心

MaxCompute:原生SDK概述

更新時間:Feb 28, 2024

本文為您介紹較為常用的MapReduce核心介面。

如果您使用Maven,可以從Maven庫中搜尋odps-sdk-mapred擷取不同版本的Java SDK,相關配置資訊如下。
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.40.10-public</version>
</dependency>

資料類型

MapReduce支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute資料類型與Java資料類型的對應關係如下。
MaxCompute SQL TypeJava Type
BIGINTLONG
STRINGSTRING
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMEDATE
DECIMALBIGDECIMAL

MapReduce主要介面

主要介面描述
MapperBase使用者自訂的Map函數需要繼承自此類。處理輸入表的記錄對象,加工處理成索引值對集合輸出到Reduce階段,或者不經過Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而直接輸出計算結果的作業,也可稱之為MapOnly作業。
ReducerBase使用者自訂的Reduce函數需要繼承自此類。對與一個鍵(Key)關聯的一組數值集(Values)進行歸約計算。
TaskContext是MapperBase及ReducerBase多個成員函數的輸入參數之一,含有任務啟動並執行上下文資訊。
JobClient用於提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞(非同步) 方式。
RunningJob作業運行時對象,用於跟蹤運行中的MapReduce工作執行個體。
JobConf描述一個MapReduce任務的配置,通常在主程式(main函數)中定義JobConf對象,然後通過JobClient提交作業給MaxCompute服務。

MapperBase

主要函數介面。
主要介面描述
void cleanup(TaskContext context)在Map階段結束時,map方法之後調用。
void map(long key, Record record, TaskContext context)map方法,處理輸入表的記錄。
void setup(TaskContext context)在Map階段開始時,map方法之前調用。

ReducerBase

主要函數介面。
主要介面描述
void cleanup( TaskContext context)在Reduce階段結束時,reduce方法之後調用。
void reduce(Record key, Iterator<Record > values, TaskContext context)reduce方法,處理輸入表的記錄。
void setup( TaskContext context)在Reduce階段開始時,reduce方法之前調用。

TaskContext

主要函數介面。
主要介面描述
TableInfo[] getOutputTableInfo()擷取輸出的表資訊。
Record createOutputRecord()建立預設輸出表的記錄對象。
Record createOutputRecord(String label)建立給定label輸出表的記錄對象。
Record createMapOutputKeyRecord()建立Map輸出Key的記錄對象。
Record createMapOutputValueRecord()建立Map輸出Value的記錄對象。
void write(Record record)寫記錄到預設輸出,用於Reduce端寫出資料,可以在Reduce端多次調用。
void write(Record record, String label)寫記錄到給定label輸出,用於Reduce端寫出資料。可以在 Reduce端多次調用。
void write(Record key, Record value)Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。
BufferedInputStream readResourceFileAsStream(String resourceName)讀取檔案類型資源。
Iterator<Record > readResourceTable(String resourceName)讀取表類型資源。
Counter getCounter(Enum<? > name)擷取給定名稱的Counter對象。
Counter getCounter(String group, String name)擷取給定組名和名稱的Counter對象。
void progress()向MapReduce架構報告心跳資訊。 如果使用者方法處理時間很長,且中間沒有調用架構,可以調用這個方法避免task逾時,架構預設600秒逾時。
  • MaxCompute的TaskContext介面中提供了progress功能,但此功能是為防止Worker長時間運行未結束,被架構誤認為逾時而被殺的情況出現。此介面更類似於向架構發送心跳資訊,並不是用來彙報Worker進度。
  • MaxCompute MapReduce預設Worker逾時時間為10分鐘(系統預設配置,不受使用者控制),如果超過10分鐘,Worker仍然沒有向架構發送心跳(調用progress介面),架構會強制停止該Worker,MapReduce任務失敗退出。因此,建議您在Mapper/Reducer函數中,定期調用progress介面,防止架構認為Worker逾時,誤殺任務。

JobConf

主要函數介面。
主要介面描述
void setResources(String resourceNames)聲明本作業使用的資源。只有聲明的資源才能在運行Mapper/Reducer時通過TaskContext對象讀取。
void setMapOutputKeySchema(Column[] schema)設定Mapper輸出到Reducer的Key屬性。
void setMapOutputValueSchema(Column[] schema)設定Mapper輸出到Reducer的Value屬性。
void setOutputKeySortColumns(String[] cols)設定Mapper輸出到Reducer的Key排序列。
void setOutputGroupingColumns(String[] cols)設定Key分組列。
void setMapperClass(Class<? extends Mapper > theClass)設定作業的Mapper函數。
void setPartitionColumns(String[] cols)設定作業指定的分區列。預設是Mapper輸出Key的所有列。
void setReducerClass(Class<? extends Reducer > theClass)設定作業的Reducer。
void setCombinerClass(Class<? extends Reducer > theClass)設定作業的combiner。在Map端運行,作用類似於單個Map對本地的相同Key值做Reduce。
void setSplitSize(long size)設定分區大小,單位MB,預設值256。
void setNumReduceTasks(int n)設定Reducer任務數,預設為Mapper任務數的1/4。
void setMemoryForMapTask(int mem)設定Mapper任務中單個Worker的記憶體大小,單位MB, 預設值2048。
void setMemoryForReduceTask(int mem)設定Reducer任務中單個Worker的記憶體大小,單位MB, 預設值 2048。
  • 通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
  • 在Map端,Mapper輸出的Record會根據設定的PartitionColumns計算雜湊值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。
  • 在Reduce端,輸入Records,再按照KeySortColumns排序後,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍曆輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。

JobClient

主要函數介面。
主要介面描述
static RunningJob runJob(JobConf job)阻塞(同步)方式提交MapReduce作業後立即返回。
static RunningJob submitJob(JobConf job)非阻塞(非同步)方式提交MapReduce作業後立即返回。

RunningJob

主要函數介面。
主要介面描述
String getInstanceID()擷取作業運行執行個體ID,用於查看作業記錄和作業管理。
boolean isComplete()查詢作業是否結束。
boolean isSuccessful()查詢工作執行個體是否運行成功。
void waitForCompletion()等待直至工作執行個體結束。一般用於非同步方式提交的作業。
JobStatus getJobStatus()查詢工作執行個體運行狀態。
void killJob()結束此作業。
Counters getCounters()擷取Conter資訊。

InputUtils

主要函數介面。
主要介面描述
static void addTable(TableInfo table, JobConf conf)添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。
static void setTables(TableInfo [] tables, JobConf conf)添加多張表到任務輸入中。

OutputUtils

主要函數介面。
主要介面描述
static void addTable(TableInfo table, JobConf conf)添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊列中。
static void setTables(TableInfo[] tables, JobConf conf)添加多張表到任務輸出中。

Pipeline

Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要介面如下。
    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder setOutputKeySchema(Column[] keySchema)
    public Builder setOutputValueSchema(Column[] valueSchema)
    public Builder setOutputKeySortColumns(String[] sortCols)
    public Builder setOutputKeySortOrder(SortOrder[] order)
    public Builder setPartitionColumns(String[] partCols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    public Builder setOutputGroupingColumns(String[] cols)
樣本如下。
    Job job = new Job();
    Pipeline pipeline = Pipeline.builder()
     .addMapper(TokenizerMapper.class)
     .setOutputKeySchema(
         new Column[] { new Column("word", OdpsType.STRING) })
     .setOutputValueSchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .addReducer(SumReducer.class)
     .setOutputKeySchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .setOutputValueSchema(
         new Column[] { new Column("word", OdpsType.STRING),
         new Column("count", OdpsType.BIGINT) })
     .addReducer(IdentityReducer.class).createPipeline();
    job.setPipeline(pipeline);  
    job.addInput(...)
    job.addOutput(...)
    job.submit();
如上所示,您可以在main函數中構建一個Map之後,連續接兩個Reduce的MapReduce任務。如果您比較熟悉MapReduce的基礎功能,即可便於使用MR2。
說明
  • 建議您在使用MR2功能前,先瞭解MapReduce的基礎用法。
  • JobConf僅能夠配置Map後接單Reduce的MapReduce任務。