Source code for wialon.api

#!/usr/bin/env python
# -*- coding: utf-8 -*-

try:
    from builtins import str
except:
    str = lambda x: "%s" % x

try:
    from urllib import urlencode
    from urlparse import urljoin
except Exception:
    from urllib.parse import urlencode, urljoin

try:
    from urllib2 import Request, urlopen, HTTPError, URLError
except ImportError:
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError

try:
    import simplejson as json
    assert json  # Silence potential warnings from static analysis tools
except ImportError:
    import json

import gzip
import io


[docs] class WialonError(Exception): """ Exception raised when an Wialon Remote API call fails due to a network related error or for a Wialon specific reason. """ errors = { 1: 'Invalid session', 2: 'Invalid service', 3: 'Invalid result', 4: 'Invalid input', 5: 'Error performing request', 6: 'Unknown error', 7: 'Access denied', 8: 'Invalid user name or password', 9: 'Authorization server is unavailable, please try again later', 1001: 'No message for selected interval', 1002: 'Item with such unique property already exists', 1003: 'Only one request of given time is allowed at the moment' } def __init__(self, code, text): self._text = text self._code = code try: self._code = int(code) except ValueError: pass def __unicode__(self): explanation = self._text if (self._code in WialonError.errors): explanation = " ".join([WialonError.errors[self._code], self._text]) message = u'{error} ({code})'.format(error=explanation, code=self._code) return u'WialonError({message})'.format(message=message) def __str__(self): return self.__unicode__() def __repr__(self): return str(self)
class Wialon(object): request_headers = { 'Accept-Encoding': 'gzip, deflate' } def __init__(self, scheme='https', host="hst-api.wialon.com", port=443, sid=None, **extra_params): """ Created the Wialon API object. """ self._sid = sid self.__default_params = {} self.__default_params.update(extra_params) self.__base_url = ( '{scheme}://{host}:{port}'.format( scheme=scheme, host=host, port=port ) ) self.__base_api_url = urljoin(self.__base_url, 'wialon/ajax.html?') @property def sid(self): return self._sid @sid.setter def sid(self, value): self._sid = value def update_extra_params(self, **params): """ Updated the Wialon API default parameters. """ self.__default_params.update(params) def avl_evts(self): """ Call avl_event request """ url = urljoin(self.__base_url, 'avl_evts') params = { 'sid': self.sid } return self.request('avl_evts', url, params) def call(self, action_name, *argc, **kwargs): """ Call the API method provided with the parameters supplied. """ if (not kwargs): # List params for batch if isinstance(argc, tuple) and len(argc) == 1: params = json.dumps(argc[0], ensure_ascii=False) else: params = json.dumps(argc, ensure_ascii=False) else: params = json.dumps(kwargs, ensure_ascii=False) if action_name.startswith('unit_group'): if len(action_name) < 12: raise ValueError(f'Invalid action name {action_name}') action_name = 'unit_group/' + action_name[11:] else: action_name = action_name.replace('_', '/', 1) params = { 'svc': action_name, 'params': params.encode("utf-8"), 'sid': self.sid } all_params = self.__default_params.copy() all_params.update(params) return self.request(action_name, self.__base_api_url, all_params) def token_login(self, *args, **kwargs): kwargs['appName'] = 'python-wialon' return self.call('token_login', *args, **kwargs) def request(self, action_name, url, params): url_params = urlencode(params) data = url_params.encode('utf-8') try: request = Request(url, data, headers=self.request_headers) response = urlopen(request) response_content = response.read() except HTTPError as e: raise WialonError(0, u"HTTP {code}".format(code=e.code)) except URLError as e: raise WialonError(0, str(e)) response_info = response.info() content_type = response_info.get('Content-Type') content_encoding = response_info.get('Content-Encoding') if content_encoding == 'gzip': buffer = io.BytesIO(response_content) f = gzip.GzipFile(fileobj=buffer) try: result = f.read() finally: f.close() buffer.close() else: result = response_content try: if content_type == 'application/json': result = result.decode('utf-8', errors='ignore') result = json.loads(result) except ValueError as e: raise WialonError( 0, u"Invalid response from Wialon: {0}".format(e), ) if (isinstance(result, dict) and 'error' in result and result['error'] > 0): raise WialonError(result['error'], action_name) errors = [] if isinstance(result, list): # Check for batch errors for elem in result: if (not isinstance(elem, dict)): continue if "error" in elem: errors.append("%s (%d)" % (WialonError.errors[elem["error"]], elem["error"])) if (errors): errors.append(action_name) raise WialonError(0, " ".join(errors)) return result def __getattr__(self, action_name): """ Enable the calling of Wialon API methods through Python method calls of the same name. """ def get(self, *args, **kwargs): return self.call(action_name, *args, **kwargs) return get.__get__(self) if __name__ == '__main__': try: wialon_api = Wialon() # token/login request token = 'TEST TOKEN HERE' result = wialon_api.token_login(token=token) wialon_api.sid = result['eid'] # get events result = wialon_api.avl_evts() # core/logout request wialon_api.core_logout() except WialonError: pass