隨著巨量資料平台發展,現已具備處理多類型的非結構化、半結構化資料的能力。其中,將IP地址轉換為歸屬地是一種常見情境。本文介紹如何通過MaxCompute UDF實現將IPv4或IPv6地址轉換為歸屬地。
背景資訊
要實現將IPv4或IPv6地址轉換為歸屬地,必須要有IP地址庫,需要下載IP地址庫檔案並以資源形式上傳至MaxCompute專案。開發MaxCompute UDF,並基於IP地址庫檔案註冊函數,從而在SQL語句中調用函數將IP地址轉換為歸屬地。
本文提供的IP地址庫檔案,僅供驗證該最佳實務使用,請結合實際業務情況,自我維護IP地址庫檔案。
前提條件
操作流程
基於MaxCompute UDF將IPv4或IPv6地址轉換為歸屬地的操作流程如下:
-
將IP地址庫檔案作為資源上傳至MaxCompute專案,後續建立的MaxCompute UDF會依賴此資源。
-
串連MaxCompute專案,並建立MaxCompute Java Module。
-
在IntelliJ IDEA上編寫MaxCompute UDF代碼。
-
將MaxCompute UDF註冊為函數。
-
步驟五:調用MaxCompute UDF轉換IP地址為歸屬地
在SQL語句中調用註冊好的函數將IP地址轉換為歸屬地。
步驟一:上傳IP地址庫檔案
-
下載IP地址庫檔案至本地,解壓得到ipv4.txt和ipv6.txt,並放置於MaxCompute用戶端的安裝目錄
...\odpscmd_public\bin下。 -
安裝並登入MaxCompute本地用戶端,進入目標MaxCompute專案。
-
執行
add file命令,將ipv4.txt和ipv6.txt以File類型資源上傳至MaxCompute專案。ADD file ipv4.txt -f; ADD file ipv6.txt -f;添加資源資訊,請參見添加資源。
-
(用於本地調試)將ipv4.txt和ipv6.txt複製到本地專案的
warehouse/example_project/_resources_目錄下。
步驟二:建立專案串連
-
串連MaxCompute專案。操作詳情請參見管理專案串連。
-
建立MaxCompute Java Module。操作詳情請參見建立MaxCompute Java Module。
步驟三:編寫MaxCompute UDF
-
建立Java Class對象。
後續步驟中編寫的MaxCompute UDF代碼會用到此處建立的Java Class。
-
進入IntelliJ IDEA介面,在Project地區,按右鍵Module的源碼目錄(即src > main > java),選擇New > Java Class。
-
在New Java Class對話方塊,輸入Class名稱,按下Enter鍵並在代碼編輯地區輸入代碼。
需要依次建立3個Java Class對象,Class名稱及對應代碼如下,代碼可直接複製使用,無需修改。
-
IpUtils
package com.aliyun.odps.udf.utils; import java.math.BigInteger; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; public class IpUtils { /** * 將字串形式的ip地址轉換為long * * @param ipInString * 字串形式的ip地址 * @return 返回long形式的ip地址 */ public static long StringToLong(String ipInString) { ipInString = ipInString.replace(" ", ""); byte[] bytes; if (ipInString.contains(":")) bytes = ipv6ToBytes(ipInString); else bytes = ipv4ToBytes(ipInString); BigInteger bigInt = new BigInteger(bytes); // System.out.println(bigInt.toString()); return bigInt.longValue(); } /** * 將字串形式的ip地址轉換為long * * @param ipInString * 字串形式的ip地址 * @return bigint的string形式的ip地址 */ public static String StringToBigIntString(String ipInString) { ipInString = ipInString.replace(" ", ""); byte[] bytes; if (ipInString.contains(":")) bytes = ipv6ToBytes(ipInString); else bytes = ipv4ToBytes(ipInString); BigInteger bigInt = new BigInteger(bytes); return bigInt.toString(); } /** * 將整數形式的ip地址轉換為字串形式 * * @param ipInBigInt * 整數形式的ip地址 * @return 字串形式的ip地址 */ public static String BigIntToString(BigInteger ipInBigInt) { byte[] bytes = ipInBigInt.toByteArray(); byte[] unsignedBytes = Arrays.copyOfRange(bytes, 1, bytes.length); // 去除符號位 try { String ip = InetAddress.getByAddress(unsignedBytes).toString(); return ip.substring(ip.indexOf('/') + 1).trim(); } catch (UnknownHostException e) { throw new RuntimeException(e); } } /** * ipv6地址轉有符號byte[17] */ private static byte[] ipv6ToBytes(String ipv6) { byte[] ret = new byte[17]; ret[0] = 0; int ib = 16; boolean comFlag = false;// ipv4混合模式標記 if (ipv6.startsWith(":"))// 去掉開頭的冒號 ipv6 = ipv6.substring(1); String groups[] = ipv6.split(":"); for (int ig = groups.length - 1; ig > -1; ig--) {// 反向掃描 if (groups[ig].contains(".")) { // 出現ipv4混合模式 byte[] temp = ipv4ToBytes(groups[ig]); ret[ib--] = temp[4]; ret[ib--] = temp[3]; ret[ib--] = temp[2]; ret[ib--] = temp[1]; comFlag = true; } else if ("".equals(groups[ig])) { // 出現零長度壓縮,計算缺少的組數 int zlg = 9 - (groups.length + (comFlag ? 1 : 0)); while (zlg-- > 0) {// 將這些組置0 ret[ib--] = 0; ret[ib--] = 0; } } else { int temp = Integer.parseInt(groups[ig], 16); ret[ib--] = (byte) temp; ret[ib--] = (byte) (temp >> 8); } } return ret; } /** * IPv4地址轉有符號byte[5] */ private static byte[] ipv4ToBytes(String ipv4) { byte[] ret = new byte[5]; ret[0] = 0; // 先找到ip地址字串中.的位置 int position1 = ipv4.indexOf("."); int position2 = ipv4.indexOf(".", position1 + 1); int position3 = ipv4.indexOf(".", position2 + 1); // 將每個.之間的字串轉換成整型 ret[1] = (byte) Integer.parseInt(ipv4.substring(0, position1)); ret[2] = (byte) Integer.parseInt(ipv4.substring(position1 + 1, position2)); ret[3] = (byte) Integer.parseInt(ipv4.substring(position2 + 1, position3)); ret[4] = (byte) Integer.parseInt(ipv4.substring(position3 + 1)); return ret; } /** * @param ipAdress ipv4或ipv6字串 * @return 4:ipv4, 6:ipv6, 0:地址不對 * @throws Exception */ public static int isIpV4OrV6(String ipAdress) throws Exception { InetAddress address = InetAddress.getByName(ipAdress); if (address instanceof Inet4Address) return 4; else if (address instanceof Inet6Address) return 6; return 0; } /* * 驗證ip是否屬於某個IP段 * * ipSection ip段(以'-'分隔) * * ip 所驗證的ip號碼 */ public static boolean ipExistsInRange(String ip, String ipSection) { ipSection = ipSection.trim(); ip = ip.trim(); int idx = ipSection.indexOf('-'); String beginIP = ipSection.substring(0, idx); String endIP = ipSection.substring(idx + 1); return getIp2long(beginIP) <= getIp2long(ip) && getIp2long(ip) <= getIp2long(endIP); } public static long getIp2long(String ip) { ip = ip.trim(); String[] ips = ip.split("\\."); long ip2long = 0L; for (int i = 0; i < 4; ++i) { ip2long = ip2long << 8 | Integer.parseInt(ips[i]); } return ip2long; } public static long getIp2long2(String ip) { ip = ip.trim(); String[] ips = ip.split("\\."); long ip1 = Integer.parseInt(ips[0]); long ip2 = Integer.parseInt(ips[1]); long ip3 = Integer.parseInt(ips[2]); long ip4 = Integer.parseInt(ips[3]); long ip2long = 1L * ip1 * 256 * 256 * 256 + ip2 * 256 * 256 + ip3 * 256 + ip4; return ip2long; } public static void main(String[] args) { System.out.println(StringToLong("2002:7af3:f3be:ffff:ffff:ffff:ffff:ffff")); System.out.println(StringToLong("54.38.XX.XX")); } private class Invalid{ private Invalid() { } } } -
IpV4Obj
package com.aliyun.odps.udf.objects; public class IpV4Obj { public long startIp ; public long endIp ; public String city; public String province; public IpV4Obj(long startIp, long endIp, String city, String province) { this.startIp = startIp; this.endIp = endIp; this.city = city; this.province = province; } @Override public String toString() { return "IpV4Obj{" + "startIp=" + startIp + ", endIp=" + endIp + ", city='" + city + '\'' + ", province='" + province + '\'' + '}'; } public void setStartIp(long startIp) { this.startIp = startIp; } public void setEndIp(long endIp) { this.endIp = endIp; } public void setCity(String city) { this.city = city; } public void setProvince(String province) { this.province = province; } public long getStartIp() { return startIp; } public long getEndIp() { return endIp; } public String getCity() { return city; } public String getProvince() { return province; } } -
IpV6Obj
package com.aliyun.odps.udf.objects; public class IpV6Obj { public String startIp ; public String endIp ; public String city; public String province; public String getStartIp() { return startIp; } @Override public String toString() { return "IpV6Obj{" + "startIp='" + startIp + '\'' + ", endIp='" + endIp + '\'' + ", city='" + city + '\'' + ", province='" + province + '\'' + '}'; } public IpV6Obj(String startIp, String endIp, String city, String province) { this.startIp = startIp; this.endIp = endIp; this.city = city; this.province = province; } public void setStartIp(String startIp) { this.startIp = startIp; } public String getEndIp() { return endIp; } public void setEndIp(String endIp) { this.endIp = endIp; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } }
-
-
-
編寫MaxCompute UDF代碼。
-
在Project地區,按右鍵Module的源碼目錄(即src > main > java),選 New > MaxCompute Java。
-
在Create new MaxCompute java class對話方塊,單擊UDF並填寫Name後,按Enter鍵並在代碼編寫地區輸入代碼。
例如Java Class名稱為IpLocation。代碼內容如下,代碼可直接複製使用,無需修改。
package com.aliyun.odps.udf.udfFunction; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.utils.IpUtils; import com.aliyun.odps.udf.objects.IpV4Obj; import com.aliyun.odps.udf.objects.IpV6Obj; import java.io.*; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; public class IpLocation extends UDF { public static IpV4Obj[] ipV4ObjsArray; public static IpV6Obj[] ipV6ObjsArray; public IpLocation() { super(); } @Override public void setup(ExecutionContext ctx) throws UDFException, IOException { //IPV4 if(ipV4ObjsArray==null) { BufferedInputStream bufferedInputStream = ctx.readResourceFileAsStream("ipv4.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(bufferedInputStream)); ArrayList<IpV4Obj> ipV4ObjArrayList=new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { String[] f = line.split("\\|", -1); if(f.length>=5) { long startIp = IpUtils.StringToLong(f[0]); long endIp = IpUtils.StringToLong(f[1]); String city=f[3]; String province=f[4]; IpV4Obj ipV4Obj = new IpV4Obj(startIp, endIp, city, province); ipV4ObjArrayList.add(ipV4Obj); } } br.close(); List<IpV4Obj> collect = ipV4ObjArrayList.stream().sorted(Comparator.comparing(IpV4Obj::getStartIp)).collect(Collectors.toList()); ArrayList<IpV4Obj> basicIpV4DataList=(ArrayList)collect; IpV4Obj[] ipV4Objs = new IpV4Obj[basicIpV4DataList.size()]; ipV4ObjsArray = basicIpV4DataList.toArray(ipV4Objs); } //IPV6 if(ipV6ObjsArray==null) { BufferedInputStream bufferedInputStream = ctx.readResourceFileAsStream("ipv6.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(bufferedInputStream)); ArrayList<IpV6Obj> ipV6ObjArrayList=new ArrayList<>(); String line = null; while ((line = br.readLine()) != null) { String[] f = line.split("\\|", -1); if(f.length>=5) { String startIp = IpUtils.StringToBigIntString(f[0]); String endIp = IpUtils.StringToBigIntString(f[1]); String city=f[3]; String province=f[4]; IpV6Obj ipV6Obj = new IpV6Obj(startIp, endIp, city, province); ipV6ObjArrayList.add(ipV6Obj); } } br.close(); List<IpV6Obj> collect = ipV6ObjArrayList.stream().sorted(Comparator.comparing(IpV6Obj::getStartIp)).collect(Collectors.toList()); ArrayList<IpV6Obj> basicIpV6DataList=(ArrayList)collect; IpV6Obj[] ipV6Objs = new IpV6Obj[basicIpV6DataList.size()]; ipV6ObjsArray = basicIpV6DataList.toArray(ipV6Objs); } } public String evaluate(String ip){ if(ip==null||ip.trim().isEmpty()||!(ip.contains(".")||ip.contains(":"))) { return null; } int ipV4OrV6=0; try { ipV4OrV6= IpUtils.isIpV4OrV6(ip); } catch (Exception e) { return null; } //如果是IPv4 if(ipV4OrV6==4) { int i = binarySearch(ipV4ObjsArray, IpUtils.StringToLong(ip)); if(i>=0) { IpV4Obj ipV4Obj = ipV4ObjsArray[i]; return ipV4Obj.city+","+ipV4Obj.province; }else{ return null; } }else if(ipV4OrV6==6)//如果是IPv6 { int i = binarySearchIPV6(ipV6ObjsArray, IpUtils.StringToBigIntString(ip)); if(i>=0) { IpV6Obj ipV6Obj = ipV6ObjsArray[i]; return ipV6Obj.city+","+ipV6Obj.province; }else{ return null; } }else{//如果不符合IPv4或IPv6格式 return null; } } @Override public void close() throws UDFException, IOException { super.close(); } private static int binarySearch(IpV4Obj[] array,long ip){ int low=0; int hight=array.length-1; while (low<=hight) { int middle=(low+hight)/2; if((ip>=array[middle].startIp)&&(ip<=array[middle].endIp)) { return middle; } if (ip < array[middle].startIp) hight = middle - 1; else { low = middle + 1; } } return -1; } private static int binarySearchIPV6(IpV6Obj[] array,String ip){ int low=0; int hight=array.length-1; while (low<=hight) { int middle=(low+hight)/2; if((ip.compareTo(array[middle].startIp)>=0)&&(ip.compareTo(array[middle].endIp)<=0)) { return middle; } if (ip.compareTo(array[middle].startIp) < 0) hight = middle - 1; else { low = middle + 1; } } return -1; } private class Invalid{ private Invalid() { } } }
-
-
準備本地調試資料。
-
在本地專案的
warehouse/example_project/__tables__/wc_in2/p1=2/p2=1/目錄下,開啟data檔案。 -
將data檔案的最後一列資料修改為ipv4.txt中的IP地址(可在ipv4.txt中任選3個IP地址填入),並儲存。
-
-
調試MaxCompute UDF,確保代碼可以運行成功。
更多調試操作,請參見通過本地運行調試UDF。
-
按右鍵編寫完成的MaxCompute UDF指令碼,選擇Run。
-
在Run/Debug Configurations對話方塊,配置運行參數,單擊OK。
運行參數配置樣本:Name 設為
IpLocation,Main class 設為com.aliyun.odps.udf.udfFunction.IpLocation。MaxCompute 專屬欄位:*MaxCompute project 選擇local和example_project,*MaxCompute table 填寫wc_in2,*Table partition 填寫p2=1,p1=2(格式如 p1=1,p2=2),*Table columns 填寫colc(格式如 c1,c2)。Download Record limit 設為100,Data Column Separator 設為逗號。配置完成後單擊 OK。返回無報錯,說明代碼運行成功,即可繼續執行後續步驟。如有報錯,請按照IntelliJ IDEA報錯資訊處理。說明運行參數可參照圖示資料填寫。
-
步驟四:註冊MaxCompute UDF
-
按右鍵已經編譯成功的MaxCompute UDF指令碼,選擇Deploy to server…。
-
在Package a jar, submit resource and register function對話方塊中,配置參數資訊。
更多參數解釋,請參見打包、上傳及註冊。MaxCompute project選擇目標專案(如
doc_test_dev),Resource file選擇已打包的JAR檔案,Resource name自動填滿為udf-1.0-SNAPSHOT.jar,Extra resources選擇所需的額外資源檔(如ipv4.txt和ipv6.txt),Main class填寫UDF主類全限定名(如com.aliyun.odps.udf.udfFunction.IpLocation),Function name填寫函數名稱(如ipv4_ipv6_aton),勾選Force update if already exists,然後單擊OK。Extra resources必須選中步驟一中上傳的IP地址庫檔案ipv4.txt和ipv6.txt。假設註冊好的函數名稱為ipv4_ipv6_aton。
步驟五:調用MaxCompute UDF轉換IP地址為歸屬地
-
執行SQL SELECT語句調用MaxCompute UDF將IPv4或IPv6地址轉換為歸屬地。
命令樣本如下。
-
轉換IPv4地址為歸屬地
select ipv4_ipv6_aton('116.11.XX.XX');返回結果如下。
北海市,廣西壯族自治區 -
轉換IPv6地址為歸屬地
select ipv4_ipv6_aton('2001:0250:080b:0:0:0:0:0');返回結果如下。
保定市,河北省
-