Ysera
Assistant Engineer
Assistant Engineer
  • UID634
  • Fans0
  • Follows0
  • Posts44
Reads:1892Replies:0

[Share]MaxCompute: Call PyODPS through dynamic loading in Shell node

Created#
More Posted time:Sep 29, 2016 14:40 PM
Application scenarios
PyODPS shouldn’t be a stranger to anyone who knows about Python. In my personal opinion, it represents the future of algorithm development. The reason is simple. We want both efficient and convenient local debugging (Python) and support from massive data and distributed machine learning algorithm platforms (MaxCompute: XLab, MPI and PAI). However, the fulfillment of the above visions must be based on such a premise: “The same set of code should be run both online and offline”.
Script advantages
There are a lot of benefits to straighten out PyODPS, just to name a few:
1. Local reads and writes of files and quick validation of algorithm logic
2. Compatibility with ODPS environment to achieve algorithm release and O&M
3. Loading custom algorithm libraries without restrictions from online environments
4. Boasting four major advantages of SHELL calls
Dynamic loading
Local running of PyODPS is very simple, and pip install is all needed. But it is very difficult to have a python environment of your own on D2. The key of running PyODPS on D2 lies in the dynamic installation of PyODPS packages. The key of dynamic installation of PyODPS packages lies in the dynamic import of packages which requires two hooks in calling the import method: find_module and load_module. The former is in charge of searching and the latter is in charge of loading. It is well known that library loading relies on recursive import calls: a normal module usually corresponds to a file, while a complicated package usually corresponds to a folder and requires initialization through the init method. Therefore, we need to implement find_module and load_module methods in the memory. The detailed process is as follows:
Import hook
class MemoryZipImport:
    def __init__(self, files):
        self.files = files

    def find_module(self, full_name, path=None):
        """find_module(full_name, path=None) -> self or None.

        zipimportx: faster zipfile imports for frozen python apps
        https://github.com/cloudmatrix/zipimportx/blob/master/zipimportx/__init__.py

        Search for a module specified by 'full_name'. 'full_name' must be the
        fully qualified (dotted) module name. It returns the zipimporter
        instance itself if the module was found, or None if it wasn't.
        The optional 'path' argument is interpreted as required by the importer
        protocol (which means you can put one of these objects in sys.meta_path
        and it will behave appropriately).

        :param full_name:
        :param path:
        :return:
        """
        print 'find_module:', full_name, path

        full_path = full_name.replace('.', '/')
        module_path = full_path + '.py'
        package_path = full_path + '/__init__.py'

        if module_path in self.files or package_path in self.files:
            return self

        return None

    def load_module(self, full_name):
        """load_module(full_name) -> module.

        PEP 302 -- New Import Hooks
        https://www.python.org/dev/peps/pep-0302/

        Load the module specified by 'full_name'. 'full_name' must be the
        fully qualified (dotted) module name. It returns the imported
        module, or raises ZipImportError if it wasn't found.

        :param full_name:
        """
        print 'load_module:', full_name

        if full_name in sys.modules:
            return sys.modules[full_name]

        full_path = full_name.replace('.', '/')
        module_path = full_path + '.py'
        package_path = full_path + '/__init__.py'

        mod = sys.modules.setdefault(full_name, imp.new_module(full_name))
        mod.__file__ = "<%s>" % self.__class__.__name__
        mod.__loader__ = self

        if module_path in self.files:
            code = self.files[module_path]
        else:
            code = self.files[package_path]
            mod.__path__ = [full_path.rsplit('/')[0]]

        exec code in mod.__dict__

        return mod


Load data
def get_module(lib_src, switch_inline=False):
    """
    Get the designated library (base64 encoding).

    lib_src:  Library name
    switch_inline:  Local storage switch
    """
    if switch_inline:
        lib_data = globals()['data_%s' % lib_src]
    else:
        cmd_get_data =\
            '''
            select
                %s:one_search_lib_data_%s() as lib_data
            from tbcdm.dual
            ''' % (
                const_project_udf, lib_src
            )
        odps_get_data = odps(cmd_get_data, switch_print=False)
        lib_data = ''.join(odps_get_data[2:]).strip()[1:-1].replace('""', '"')

    print 'data_size:',  len(lib_data)
    return lib_data


Dynamic import
def import_module(lib_src, module_src=[], ext_src='zip'):
    """
    Get the designated library (base64 encoding).

    lib_src:  Library name
    module_src:  Module name
    """
    data_object = io.BytesIO(get_module(lib_src, True).replace(b'\n', b'').decode('base64'))
    lib_files = {}
    if ext_src == 'zip':
        lib_object = zipfile.ZipFile(data_object)
        for z in lib_object.infolist():
            if not z.filename.endswith('/'):
                lib_files[z.filename] = lib_object.open(z.filename).read()
    elif ext_src == 'gz':
        lib_object = tarfile.open(mode="r:gz", fileobj=data_object)
        for t in lib_object.getmembers():
            if t.isfile():
                lib_files[t.name] = lib_object.extractfile(t.name).read()
    elif ext_src == 'bz2':
        lib_object = tarfile.open(mode="r:bz2", fileobj=data_object)
        lib_files = {}
        for t in lib_object.getmembers():
            if t.isfile():
                lib_files[t.name] = lib_object.extractfile(t.name).read()

    sys.meta_path.append(MemoryZipImport(lib_files))
    map(__import__, module_src if len(module_src) > 0 else [lib_src])


Primary method
def main():
    """
    save_module('C:/Users/Jackie/Desktop/test/pyodps.tar.bz2')
    """
    import_module('pyodps', ['threadpool', 'requests', 'ordereddict', 'odps'], 'bz2')

    from odps import ODPS

    odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
        endpoint='**your-end-point**'
    )

    t = odps.get_table('dual')
    print t.schema
    # """


Effect
Guest