In an E-MapReduce (EMR) Data Science cluster, you can use the Spark compute engine to infer a large number of images. Spark can quickly process terabytes or petabytes of image data by using the vCPUs or vGPUs of each worker node in the Data Science cluster.
Prerequisites
- Development tools
- Java Development Kit (JDK) 8 is installed on your on-premises machine.
- Maven 3.x is installed on your on-premises machine.
- An integrated development environment (IDE) for Java or Scala is installed on your on-premises machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
- A Data Science cluster and a Hadoop cluster are created. For more information, see Create a cluster.
- Object Storage Service (OSS) is activated. For more information, see Activate OSS.
- The dsdemo code is downloaded. If you have created a Data Science cluster, you can join the DingTalk group numbered 32497587 to obtain the dsdemo code.
Background information
- Store images in HDFS
- Store images in HBase
- Store images in OSS
- Merge multiple images into a large file and store the large file in HDFS
- Convert images into Base64-encoded images
- Import images to HBase
Store images in HDFS
In this scenario, you can use the HDFS service in the Data Science cluster. You do not need to purchase other storage services. However, the I/O throughput of HDFS is low when you use HDFS to store a large number of small files, such as several kilobytes of images.
public class DistributedPredictHDFS {
public static void main(String[] args) throws Exception{
System.out.println("Start DistributedPredictHDFS Job.");
String imagesPath = args[0];
String modelPath = args[1];
String resultPath = args[2];
SparkConf conf = new SparkConf().setAppName("DistributedPredictHDFSOnSpark");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, PortableDataStream> imageStream = sc.binaryFiles(imagesPath, 128);
System.out.println("Partitions: "+ imageStream.getNumPartitions());
JavaRDD<String> result = imageStream.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, PortableDataStream>>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Iterator<Tuple2<String, PortableDataStream>> iterator) throws Exception{
ImageClassificationTranslator translator =
ImageClassificationTranslator.builder()
.addTransform(new Resize(224, 224))
.addTransform(new ToTensor())
.build();
Criteria<Image, Classifications> criteria =
Criteria.builder()
.optApplication(Application.CV.IMAGE_CLASSIFICATION)
.setTypes(Image.class, Classifications.class) // defines input and output data type
.optTranslator(translator)
.optModelUrls(modelPath)
.build();
System.out.println("Enginename: "+Engine.getInstance().getEngineName());
System.out.println("ModelName: " + criteria.getModelName());
System.out.println("criteriainfo: " + criteria.toString());
List<String> list = new ArrayList<>();
ZooModel<Image, Classifications> model ;
Predictor<Image, Classifications> predictor;
model = ModelZoo.loadModel(criteria);
predictor = model.newPredictor() ;
int idx = 0;
List<Image> imagelist = new ArrayList<>();
while(iterator.hasNext()){
Tuple2<String, PortableDataStream> item = iterator.next();
String name = item._1();
PortableDataStream content = item._2();
Image img = ImageFactory.getInstance().fromInputStream(content.open());
imagelist.add(img);
idx++;
if(imagelist.size()%32 == 0) {
List<Classifications> results = predictor.batchPredict(imagelist);
System.out.println("index: " + idx + "" + name + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
System.out.println("index: " + idx);
list.add(results.toString());
imagelist.clear();
}
}
/*
process_insufficient_images().
*/
return list.iterator();
}
});
result.saveAsTextFile(resultPath);
}
}
Store images in HBase
public class DistributedPredictHBase {
public static void main(String[] args) throws Exception{
System.out.println("Start DistributedPredictHBase Job.");
String hbasePath = args[0];
String modelPath = args[1];
String resultPath = args[2];
SparkConf conf = new SparkConf().setAppName("DistributedPredictHBaseOnSpark");
JavaSparkContext sc = new JavaSparkContext(conf);
Scan scan = new Scan();
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
/* 0 */
String tablename0 = "image0";
Configuration hbconf0 = HBaseConfiguration.create();
hbconf0.set(TableInputFormat.INPUT_TABLE, tablename0);
hbconf0.set(TableInputFormat.SCAN_BATCHSIZE, "256");
hbconf0.set(TableInputFormat.SCAN, scanToString);
hbconf0.set("hbase.zookeeper.quorum", hbasePath);
hbconf0.set("hbase.zookeeper.property.clientPort", "2181");
JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd0 = sc.newAPIHadoopRDD(hbconf0, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
for(int i=1;i<8;i++) {
/* 1 */
String tablename = "image" + i;
Configuration hbconf = HBaseConfiguration.create();
hbconf.set(TableInputFormat.INPUT_TABLE, tablename);
hbconf.set(TableInputFormat.SCAN_BATCHSIZE, "256");
hbconf.set(TableInputFormat.SCAN, scanToString);
hbconf.set("hbase.zookeeper.quorum", hbasePath);
hbconf.set("hbase.zookeeper.property.clientPort", "2181");
JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = sc.newAPIHadoopRDD(hbconf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
HBaseRdd0 = HBaseRdd0.union(HBaseRdd);
}
System.out.println("Partitions: "+ HBaseRdd0.getNumPartitions());
JavaRDD<String> resultx = HBaseRdd0.mapPartitions(new FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception {
// TODO Auto-generated method stub
ImageClassificationTranslator translator =
ImageClassificationTranslator.builder()
.addTransform(new Resize(224, 224))
.addTransform(new ToTensor())
.build();
Criteria<Image, Classifications> criteria =
Criteria.builder()
.optApplication(Application.CV.IMAGE_CLASSIFICATION)
.setTypes(Image.class, Classifications.class) // defines input and output data type
.optTranslator(translator)
.optModelUrls(modelPath)
.build();
System.out.println("Enginename: "+Engine.getInstance().getEngineName());
System.out.println("ModelName: " + criteria.getModelName());
System.out.println("criteriainfo: " + criteria.toString());
List<String> list = new ArrayList<>();
ZooModel<Image, Classifications> model ;
Predictor<Image, Classifications> predictor;
model = ModelZoo.loadModel(criteria);
predictor = model.newPredictor() ;
int idx = 0;
List<Image> imagelist = new ArrayList<>();
List<String> rows = new ArrayList<String>();
while (iterator.hasNext()) {
Result result = iterator.next()._2();
String rowKey = Bytes.toString(result.getRow());
byte[] body = result.getValue("f".getBytes(), "body".getBytes());
InputStream input = new ByteArrayInputStream(body);
Image img = ImageFactory.getInstance().fromInputStream(input);
imagelist.add(img);
idx++;
if(imagelist.size()%64 == 0) {
List<Classifications> results = predictor.batchPredict(imagelist);
//System.out.println("index: " + idx + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
System.out.println("index: " + idx + " " + rowKey);
results.clear();
imagelist.clear();
}
rows.add(rowKey);
}
/*
process_insufficient_images().
*/
return rows.iterator();
}
});
resultx.saveAsTextFile(resultPath);
}
}
Store images in OSS
public class DistributedPredictOSS {
public static void main(String[] args) throws Exception{
System.out.println("Start DistributedPredictOSS Job.");
String ossPath = args[0];
String modelPath = args[1];
String resultPath = args[2];
SparkConf conf = new SparkConf().setAppName("DistributedPredictOSSOnSpark");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, PortableDataStream> imageStream = sc.binaryFiles(ossPath, 128);
System.out.println("Partitions: "+ imageStream.getNumPartitions());
JavaRDD<String> result = imageStream.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, PortableDataStream>>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Iterator<Tuple2<String, PortableDataStream>> iterator) throws Exception{
ImageClassificationTranslator translator =
ImageClassificationTranslator.builder()
.addTransform(new Resize(224, 224))
.addTransform(new ToTensor())
.build();
Criteria<Image, Classifications> criteria =
Criteria.builder()
.optApplication(Application.CV.IMAGE_CLASSIFICATION)
.setTypes(Image.class, Classifications.class) // defines input and output data type
.optTranslator(translator)
.optModelUrls(modelPath)
.build();
System.out.println("Enginename: "+Engine.getInstance().getEngineName());
System.out.println("ModelName: " + criteria.getModelName());
System.out.println("criteriainfo: " + criteria.toString());
List<String> list = new ArrayList<>();
ZooModel<Image, Classifications> model ;
Predictor<Image, Classifications> predictor;
model = ModelZoo.loadModel(criteria);
predictor = model.newPredictor() ;
int idx = 0;
List<Image> imagelist = new ArrayList<>();
while(iterator.hasNext()){
Tuple2<String, PortableDataStream> item = iterator.next();
String name = item._1();
PortableDataStream content = item._2();
Image img = ImageFactory.getInstance().fromInputStream(content.open());
imagelist.add(img);
idx++;
if(imagelist.size()%32 == 0) {
List<Classifications> results = predictor.batchPredict(imagelist);
//System.out.println("index: " + idx + "" + name + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
System.out.println("index: " + idx);
list.add(results.toString());
imagelist.clear();
}
}
/*
process_insufficient_images().
*/
return list.iterator();
}
});
result.saveAsTextFile(resultPath);
}
}
Merge multiple images into a large file and store the large file in HDFS
In this scenario, the I/O throughput is high. However, you must use Image to Base64 Encoder to convert images into Base64-encoded images before you merge the images into a large file.
public class DistributedPredictHDFSBigFile {
public static void main(String[] args) throws Exception{
System.out.println("Start DistributedPredictHDFSBigFile Job.");
String imagesPath = args[0];
String modelPath = args[1];
String resultPath = args[2];
SparkConf conf = new SparkConf().setAppName("DistributedPredictHDFSBigFileOnSpark");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> imageStream_base64 = sc.textFile(imagesPath);
System.out.println("Partitions: "+ imageStream_base64.getNumPartitions());
JavaRDD<byte[]> imageStream_bytes = imageStream_base64.map(new Function<String, byte[]>() {
@Override
public byte[] call(String in) throws Exception {
byte[] out = Base64.getDecoder().decode(in);
return out;
}
});
System.out.println("Partitions: "+ imageStream_bytes.getNumPartitions());
JavaRDD<String> result = imageStream_bytes.mapPartitions(new FlatMapFunction<Iterator<byte[]>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Iterator<byte[]> iterator) throws Exception{
ImageClassificationTranslator translator =
ImageClassificationTranslator.builder()
.addTransform(new Resize(224, 224))
.addTransform(new ToTensor())
.build();
Criteria<Image, Classifications> criteria =
Criteria.builder()
.optApplication(Application.CV.IMAGE_CLASSIFICATION)
.setTypes(Image.class, Classifications.class) // defines input and output data type
.optTranslator(translator)
.optModelUrls(modelPath)
.build();
System.out.println("Enginename: "+Engine.getInstance().getEngineName());
System.out.println("ModelName: " + criteria.getModelName());
System.out.println("criteriainfo: " + criteria.toString());
List<String> list = new ArrayList<>();
ZooModel<Image, Classifications> model ;
Predictor<Image, Classifications> predictor;
model = ModelZoo.loadModel(criteria);
predictor = model.newPredictor() ;
int idx = 0;
List<Image> imagelist = new ArrayList<>();
while(iterator.hasNext()){
byte[] body = iterator.next();
InputStream input = new ByteArrayInputStream(body);
Image img = ImageFactory.getInstance().fromInputStream(input);
imagelist.add(img);
idx++;
if(imagelist.size()%32 == 0) {
List<Classifications> results = predictor.batchPredict(imagelist);
System.out.println("index: " + idx + " " + img.getWidth() + " " + img.getHeight() + " " + results.toString());
System.out.println("index: " + idx);
list.add(results.toString());
imagelist.clear();
}
}
/*
process_insufficient_images().
*/
return list.iterator();
}
});
result.saveAsTextFile(resultPath);
}
}
Convert images into Base64-encoded images
public class ConvertImageToBase64 {
public static void main(String[] args) throws Exception {
String filename = "car.jpg";
File file = new File(filename);
FileInputStream fis = new FileInputStream(file);
byte[] fileBytes = new byte[(int) file.length()];
fis.read(fileBytes);
String encoded = Base64.getEncoder().encodeToString(fileBytes);
/* decode */
byte[] decoded = Base64.getDecoder().decode(encoded);
encoded += '\n';
OutputStream out = new BufferedOutputStream(new FileOutputStream(filename+".base64", false));
out.write(encoded.getBytes());
}
}
Import images to HBase
HBase is a high-performance key-value storage that can store a large number of images. HBase significantly improves the I/O throughput of HDFS.
public class ImportImageToHBase {
public static void main(String[] args) throws Exception {
String h_table = args[0];
String filename = "car.jpg";
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "192.168.0.*:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(h_table));
/* put image into hbase*/
File file = new File(filename);
FileInputStream fis = new FileInputStream(file);
byte[] fileBytes = new byte[(int) file.length()];
fis.read(fileBytes);
int i;
for(i=0;i<100000;i++) {
String key = String.valueOf(i) + ".jpg";
Put put = new Put(key.getBytes());
put.addColumn("f".getBytes(), "body".getBytes(), fileBytes);
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
fis.close();
System.out.println("put 100000 images done!");
/* get image from hbase*/
Get get = new Get(Bytes.toBytes(filename));
Result result = table.get(get);
byte[] body = result.getValue("f".getBytes(), "body".getBytes());
OutputStream out = new BufferedOutputStream(new FileOutputStream(filename+".bk", false));
out.write(body);
}
}