In an E-MapReduce (EMR) Data Science cluster, you can use the Spark compute engine to infer a large number of images. Spark can quickly process terabytes or petabytes of image data by using the vCPUs or vGPUs of each worker node in the Data Science cluster.

Prerequisites

  • Development tools
    • Java Development Kit (JDK) 8 is installed on your on-premises machine.
    • Maven 3.x is installed on your on-premises machine.
    • An integrated development environment (IDE) for Java or Scala is installed on your on-premises machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
  • A Data Science cluster and a Hadoop cluster are created. For more information, see Create a cluster.
  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • The dsdemo code is downloaded. If you have created a Data Science cluster, you can join the DingTalk group numbered 32497587 to obtain the dsdemo code.

Background information

This topic describes the distributed inference solution in the following scenarios:
Important In a Data Science cluster, you can run a computing job that uses vCPU resources. If you want to run a computing job that uses vGPU resources, contact O&M engineers to update CUDA to 10.1.

Store images in HDFS

In this scenario, you can use the HDFS service in the Data Science cluster. You do not need to purchase other storage services. However, the I/O throughput of HDFS is low when you use HDFS to store a large number of small files, such as several kilobytes of images.

The following sample code shows the content of the DistributedPredictHDFS.java file in the IDE:
public class DistributedPredictHDFS {
    public static void main(String[] args) throws Exception{
        System.out.println("Start DistributedPredictHDFS Job.");

        String imagesPath = args[0];
        String modelPath  = args[1];
        String resultPath = args[2];

        SparkConf conf = new SparkConf().setAppName("DistributedPredictHDFSOnSpark");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, PortableDataStream> imageStream = sc.binaryFiles(imagesPath, 128);
        System.out.println("Partitions: "+ imageStream.getNumPartitions());

        JavaRDD<String> result = imageStream.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, PortableDataStream>>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(Iterator<Tuple2<String, PortableDataStream>> iterator) throws Exception{

                ImageClassificationTranslator translator =
                        ImageClassificationTranslator.builder()
                                .addTransform(new Resize(224, 224))
                                .addTransform(new ToTensor())
                                .build();

                Criteria<Image, Classifications> criteria =
                            Criteria.builder()
                        .optApplication(Application.CV.IMAGE_CLASSIFICATION)
                        .setTypes(Image.class, Classifications.class) // defines input and output data type
                        .optTranslator(translator)
                        .optModelUrls(modelPath)
                        .build();

                System.out.println("Enginename: "+Engine.getInstance().getEngineName());
                System.out.println("ModelName: " + criteria.getModelName());
                System.out.println("criteriainfo: " + criteria.toString());
                List<String> list = new ArrayList<>();
                ZooModel<Image, Classifications> model ;
                Predictor<Image, Classifications> predictor;
                model = ModelZoo.loadModel(criteria);
                predictor = model.newPredictor() ;
                int idx = 0;
                List<Image> imagelist = new ArrayList<>();
                while(iterator.hasNext()){
                    Tuple2<String, PortableDataStream> item = iterator.next();

                    String name = item._1();
                    PortableDataStream content = item._2();

                    Image img = ImageFactory.getInstance().fromInputStream(content.open());
                    imagelist.add(img);
                    idx++;

                    if(imagelist.size()%32 == 0) {
                        List<Classifications> results = predictor.batchPredict(imagelist);
                        System.out.println("index: " + idx + "" + name + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
                        System.out.println("index: " + idx);
                        list.add(results.toString());
                        imagelist.clear();
                    }
                }
                /*
                process_insufficient_images().
                */
                return list.iterator();
            }
        });
        result.saveAsTextFile(resultPath);

    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following code:
    #!/bin/sh
    hadoop fs -put -f images hdfs://emr-header-1:9000/
    hadoop fs -put tensorflow_MobileNet.zip hdfs://emr-header-1:9000/
    hadoop fs -rm -r hdfs://emr-header-1:9000/predict_result
    spark-submit --master yarn-cluster \
    --conf spark.driver.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --conf spark.executor.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --num-executors 6 --executor-cores 18 --executor-memory 16G \
    --class com.alibaba.datascience.DistributedPredictHDFS distributedinference-0.1-SNAPSHOT.jar \
    hdfs://emr-header-1:9000/images/ \
    hdfs://emr-header-1:9000/tensorflow_MobileNet.zip \
    hdfs://emr-header-1:9000/predict_result

Store images in HBase

Important You must use HBase 2.0 or later, which matches EMR V4.X.
In this scenario, if you use HBase to store small files, the I/O throughput is high. You can use the HBase service of your EMR Hadoop cluster or activate ApsaraDB for HBase. For more information about ApsaraDB for HBase, see ApsaraDB for HBase.
The following sample code shows the content of the DistributedPredictHBase.java file in the IDE:
public class DistributedPredictHBase {
    public static void main(String[] args) throws Exception{
        System.out.println("Start DistributedPredictHBase Job.");

        String hbasePath  = args[0];
        String modelPath  = args[1];
        String resultPath = args[2];

        SparkConf conf = new SparkConf().setAppName("DistributedPredictHBaseOnSpark");
        JavaSparkContext sc = new JavaSparkContext(conf);


        Scan scan = new Scan();
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());

        /* 0 */
        String tablename0 = "image0";
        Configuration hbconf0 = HBaseConfiguration.create();
        hbconf0.set(TableInputFormat.INPUT_TABLE, tablename0);
        hbconf0.set(TableInputFormat.SCAN_BATCHSIZE, "256");
        hbconf0.set(TableInputFormat.SCAN, scanToString);
        hbconf0.set("hbase.zookeeper.quorum", hbasePath);
        hbconf0.set("hbase.zookeeper.property.clientPort", "2181");
        JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd0 = sc.newAPIHadoopRDD(hbconf0, TableInputFormat.class,
                ImmutableBytesWritable.class, Result.class);
        for(int i=1;i<8;i++) {
            /* 1 */
            String tablename = "image" + i;
            Configuration hbconf = HBaseConfiguration.create();
            hbconf.set(TableInputFormat.INPUT_TABLE, tablename);
            hbconf.set(TableInputFormat.SCAN_BATCHSIZE, "256");
            hbconf.set(TableInputFormat.SCAN, scanToString);
            hbconf.set("hbase.zookeeper.quorum", hbasePath);
            hbconf.set("hbase.zookeeper.property.clientPort", "2181");
            JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = sc.newAPIHadoopRDD(hbconf, TableInputFormat.class,
                    ImmutableBytesWritable.class, Result.class);
            HBaseRdd0 = HBaseRdd0.union(HBaseRdd);
        }

        System.out.println("Partitions: "+ HBaseRdd0.getNumPartitions());

        JavaRDD<String> resultx = HBaseRdd0.mapPartitions(new FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception {
                // TODO Auto-generated method stub
                ImageClassificationTranslator translator =
                        ImageClassificationTranslator.builder()
                                .addTransform(new Resize(224, 224))
                                .addTransform(new ToTensor())
                                .build();

                Criteria<Image, Classifications> criteria =
                        Criteria.builder()
                                .optApplication(Application.CV.IMAGE_CLASSIFICATION)
                                .setTypes(Image.class, Classifications.class) // defines input and output data type
                                .optTranslator(translator)
                                .optModelUrls(modelPath)
                                .build();

                System.out.println("Enginename: "+Engine.getInstance().getEngineName());
                System.out.println("ModelName: " + criteria.getModelName());
                System.out.println("criteriainfo: " + criteria.toString());
                List<String> list = new ArrayList<>();
                ZooModel<Image, Classifications> model ;
                Predictor<Image, Classifications> predictor;
                model = ModelZoo.loadModel(criteria);
                predictor = model.newPredictor() ;
                int idx = 0;
                List<Image> imagelist = new ArrayList<>();
                List<String> rows = new ArrayList<String>();
                while (iterator.hasNext()) {

                    Result result = iterator.next()._2();
                    String rowKey = Bytes.toString(result.getRow());
                    byte[] body = result.getValue("f".getBytes(), "body".getBytes());
                    InputStream input = new ByteArrayInputStream(body);
                    Image img = ImageFactory.getInstance().fromInputStream(input);
                    imagelist.add(img);
                    idx++;

                    if(imagelist.size()%64 == 0) {
                        List<Classifications> results = predictor.batchPredict(imagelist);
                        //System.out.println("index: " + idx + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
                        System.out.println("index: " + idx + " " + rowKey);
                        results.clear();
                        imagelist.clear();
                    }
                    rows.add(rowKey);
                }
                /*
                process_insufficient_images().
                */
                return rows.iterator();
           }
        });
        resultx.saveAsTextFile(resultPath);


    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following code:
    #!/bin/sh
    hadoop fs -rm -r hdfs://emr-header-1:9000/predict_result
    hadoop fs -put tensorflow_MobileNet.zip hdfs://emr-header-1:9000/
    spark-submit --master yarn-cluster \
    --conf spark.driver.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --conf spark.executor.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --num-executors 16 --executor-cores 4 --executor-memory 24G \
    --class com.alibaba.datascience.DistributedPredictHBase distributedinference-0.1-SNAPSHOT.jar \
    192.168.0.* \
    hdfs://emr-header-1:9000/tensorflow_MobileNet.zip \
    hdfs://emr-header-1:9000/predict_result

Store images in OSS

The following sample code shows the content of the DistributedPredictOSS.java file in the IDE:
public class DistributedPredictOSS {
    public static void main(String[] args) throws Exception{
        System.out.println("Start DistributedPredictOSS Job.");

        String ossPath  = args[0];
        String modelPath  = args[1];
        String resultPath = args[2];

        SparkConf conf = new SparkConf().setAppName("DistributedPredictOSSOnSpark");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<String, PortableDataStream> imageStream = sc.binaryFiles(ossPath, 128);
        System.out.println("Partitions: "+ imageStream.getNumPartitions());

        JavaRDD<String> result = imageStream.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, PortableDataStream>>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(Iterator<Tuple2<String, PortableDataStream>> iterator) throws Exception{

                ImageClassificationTranslator translator =
                        ImageClassificationTranslator.builder()
                                .addTransform(new Resize(224, 224))
                                .addTransform(new ToTensor())
                                .build();

                Criteria<Image, Classifications> criteria =
                        Criteria.builder()
                                .optApplication(Application.CV.IMAGE_CLASSIFICATION)
                                .setTypes(Image.class, Classifications.class) // defines input and output data type
                                .optTranslator(translator)
                                .optModelUrls(modelPath)
                                .build();

                System.out.println("Enginename: "+Engine.getInstance().getEngineName());
                System.out.println("ModelName: " + criteria.getModelName());
                System.out.println("criteriainfo: " + criteria.toString());
                List<String> list = new ArrayList<>();
                ZooModel<Image, Classifications> model ;
                Predictor<Image, Classifications> predictor;
                model = ModelZoo.loadModel(criteria);
                predictor = model.newPredictor() ;
                int idx = 0;
                List<Image> imagelist = new ArrayList<>();
                while(iterator.hasNext()){
                    Tuple2<String, PortableDataStream> item = iterator.next();

                    String name = item._1();
                    PortableDataStream content = item._2();

                    Image img = ImageFactory.getInstance().fromInputStream(content.open());
                    imagelist.add(img);
                    idx++;

                    if(imagelist.size()%32 == 0) {
                        List<Classifications> results = predictor.batchPredict(imagelist);
                        //System.out.println("index: " + idx + "" + name + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
                        System.out.println("index: " + idx);
                        list.add(results.toString());
                        imagelist.clear();
                    }
                }
                /*
                process_insufficient_images().
                */
                return list.iterator();
            }
        });
        result.saveAsTextFile(resultPath);

    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following code:
    #!/bin/sh
    hadoop fs -rm -r hdfs://emr-header-1:9000/predict_result
    hadoop fs -put tensorflow_MobileNet.zip hdfs://emr-header-1:9000/
    ossutil -i <yourAccessKeyId> -k <yourAccessKeySecret> -e oss-cn-huhehaote.aliyuncs.com cp -r images oss://bucket/images
    spark-submit --master yarn-cluster \
    --conf spark.driver.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --conf spark.executor.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --num-executors 16 --executor-cores 4 --executor-memory 16G \
    --class com.alibaba.datascience.DistributedPredictOSS distributedinference-0.1-SNAPSHOT.jar \
    oss://bucket/images/ \
    hdfs://emr-header-1:9000/tensorflow_MobileNet.zip \
    hdfs://emr-header-1:9000/predict_result
    • yourAccessKeyId: indicates the AccessKey ID of your Alibaba Cloud account.
    • yourAccessKeySecret: indicates the AccessKey secret of your Alibaba Cloud account.

Merge multiple images into a large file and store the large file in HDFS

In this scenario, the I/O throughput is high. However, you must use Image to Base64 Encoder to convert images into Base64-encoded images before you merge the images into a large file.

The following sample code shows the content of the DistributedPredictHDFSBigFile.java file in the IDE:
public class DistributedPredictHDFSBigFile {
    public static void main(String[] args) throws Exception{
        System.out.println("Start DistributedPredictHDFSBigFile Job.");

        String imagesPath = args[0];
        String modelPath  = args[1];
        String resultPath = args[2];

        SparkConf conf = new SparkConf().setAppName("DistributedPredictHDFSBigFileOnSpark");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> imageStream_base64 = sc.textFile(imagesPath);
        System.out.println("Partitions: "+ imageStream_base64.getNumPartitions());

        JavaRDD<byte[]> imageStream_bytes =  imageStream_base64.map(new Function<String, byte[]>() {
            @Override
            public byte[] call(String in) throws Exception {
                byte[] out = Base64.getDecoder().decode(in);
                return out;
            }
        });
        System.out.println("Partitions: "+ imageStream_bytes.getNumPartitions());

        JavaRDD<String> result = imageStream_bytes.mapPartitions(new FlatMapFunction<Iterator<byte[]>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<String> call(Iterator<byte[]> iterator) throws Exception{
                ImageClassificationTranslator translator =
                        ImageClassificationTranslator.builder()
                                .addTransform(new Resize(224, 224))
                                .addTransform(new ToTensor())
                                .build();

                Criteria<Image, Classifications> criteria =
                        Criteria.builder()
                                .optApplication(Application.CV.IMAGE_CLASSIFICATION)
                                .setTypes(Image.class, Classifications.class) // defines input and output data type
                                .optTranslator(translator)
                                .optModelUrls(modelPath)
                                .build();

                System.out.println("Enginename: "+Engine.getInstance().getEngineName());
                System.out.println("ModelName: " + criteria.getModelName());
                System.out.println("criteriainfo: " + criteria.toString());
                List<String> list = new ArrayList<>();
                ZooModel<Image, Classifications> model ;
                Predictor<Image, Classifications> predictor;
                model = ModelZoo.loadModel(criteria);
                predictor = model.newPredictor() ;
                int idx = 0;
                List<Image> imagelist = new ArrayList<>();
                while(iterator.hasNext()){
                    byte[] body = iterator.next();

                    InputStream input = new ByteArrayInputStream(body);
                    Image img = ImageFactory.getInstance().fromInputStream(input);
                    imagelist.add(img);
                    idx++;

                    if(imagelist.size()%32 == 0) {
                        List<Classifications> results = predictor.batchPredict(imagelist);
                        System.out.println("index: " + idx + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
                        System.out.println("index: " + idx);
                        list.add(results.toString());
                        imagelist.clear();
                    }
                }
                /*
                process_insufficient_images().
                */
                return list.iterator();
            }

        });
        result.saveAsTextFile(resultPath);

    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following code:
    #!/bin/sh
    hadoop fs -rm -r hdfs://emr-header-1:9000/predict_result
    hadoop fs -put tensorflow_MobileNet.zip hdfs://emr-header-1:9000/
    hadoop fs -put -f images.base64 hdfs://emr-header-1:9000/
    spark-submit --master yarn-cluster \
    --conf spark.driver.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --conf spark.executor.extraClassPath=/usr/local/dstools/jars/protobuf-java-3.15.3.jar \
    --num-executors 16 --executor-cores 4 --executor-memory 24G \
    --class com.alibaba.datascience.DistributedPredictHDFSBigFile distributedinference-0.1-SNAPSHOT.jar \
    hdfs://emr-header-1:9000/images.base64 \
    hdfs://emr-header-1:9000/tensorflow_MobileNet.zip \
    hdfs://emr-header-1:9000/predict_result

Convert images into Base64-encoded images

The following sample code shows the content of the ConvertImageToBase64.java file in the IDE:
public class ConvertImageToBase64 {
    public static void main(String[] args) throws Exception {
        String filename = "car.jpg";

        File file = new File(filename);
        FileInputStream fis = new FileInputStream(file);
        byte[] fileBytes = new byte[(int) file.length()];
        fis.read(fileBytes);
        String encoded = Base64.getEncoder().encodeToString(fileBytes);

        /* decode */
        byte[] decoded = Base64.getDecoder().decode(encoded);

        encoded += '\n';
        OutputStream out = new BufferedOutputStream(new FileOutputStream(filename+".base64", false));
        out.write(encoded.getBytes());

    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following code:
    #!/bin/sh
    java -cp distributedinference-0.1-SNAPSHOT.jar com.alibaba.datascience.ConvertImageToBase64

Import images to HBase

HBase is a high-performance key-value storage that can store a large number of images. HBase significantly improves the I/O throughput of HDFS.

The following sample code shows the content of the ImportImageToHBase.java file in the IDE:
public class ImportImageToHBase {
    public static void main(String[] args) throws Exception {
        String h_table = args[0];
        String filename = "car.jpg";
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "192.168.0.*:2181");
        Connection connection = ConnectionFactory.createConnection(configuration);

        Table table = connection.getTable(TableName.valueOf(h_table));

        /* put image into hbase*/
        File file = new File(filename);
        FileInputStream fis = new FileInputStream(file);
        byte[] fileBytes = new byte[(int) file.length()];
        fis.read(fileBytes);
        int i;
        for(i=0;i<100000;i++) {
            String key = String.valueOf(i) + ".jpg";
            Put put = new Put(key.getBytes());
            put.addColumn("f".getBytes(), "body".getBytes(), fileBytes);
            try {
                table.put(put);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        fis.close();
        System.out.println("put 100000 images done!");

        /* get image from hbase*/
        Get get = new Get(Bytes.toBytes(filename));
        Result result = table.get(get);
        byte[] body = result.getValue("f".getBytes(), "body".getBytes());
        OutputStream out = new BufferedOutputStream(new FileOutputStream(filename+".bk", false));
        out.write(body);
    }
}
  1. Log on to your Data Science cluster in SSH mode. For more information, see Log on to a cluster.
  2. Create multiple tables in HBase to support data partitioning of Spark and to increase the data processing parallelism.
    $ hbase-shell
    $ create 'image0','f','body'
    $ create 'image1','f','body'
    $ create 'image2','f','body'
  3. Run the following code:
    #!/bin/sh
    java -cp distributedinference-0.1-SNAPSHOT.jar com.alibaba.datascience.ImportImageToHBase image0 192.168.0.*
    Important 192.168.0.* in the code is the public IP address of the master node of the created Hadoop cluster.