Commit ffe49872 authored by Albert's avatar Albert
Browse files

merged master into branch fitcloud

parents 55ebb257 38d05c2b
Pipeline #2698 failed with stage
The MIT License (MIT)
Copyright (c) 2018 onelab
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
......@@ -3,15 +3,21 @@ Base API Class
'''
import datetime
import threading
from queue import Queue
from myslicelib.util import Endpoint, Authentication
from myslicelib.api.sfa.sfaam import SfaAm
from myslicelib.api.sfa.sfareg import SfaReg
from myslicelib.api.fitcloud.fitcloud import FitCloud
from myslicelib.api.cbas.ma import MemberAuthority
from myslicelib.api.cbas.sa import SliceAuthority
from myslicelib.api.cbas.fed import FederationAuthority
# from myslicelib.util.decorators import timeit
#from myslicelib.util.certificate import Keypair, Certificate
from myslice import settings
import concurrent.futures
from pprint import pprint
from myslicelib.error import MysParamsTypeError
......@@ -57,6 +63,16 @@ class Api(object):
'authority',
'project'
]
_ma = [
'user',
]
_sa = [
'slice',
'project'
]
_fed = [
'authority',
]
_q = Queue()
......@@ -68,29 +84,67 @@ class Api(object):
raise ValueError("API needs Authentication, please check your private key or generate a new one")
self.registry = None # at least one registry endpoint must be present
registry_endpoint = None
self.ma = None # at least one Member Authority endpoint must be present
ma_endpoint = None
self.sa = None # at least one Member Authority endpoint must be present
sa_endpoint = None
self.fed = None # at least one Federation endpoint must be present
fed_endpoint = None
self.ams = [] # one or plus am must be present, this depends on the am to be present
# search for the registry
for endpoint in endpoints:
#print("endpoint = {}".format(endpoint))
if (endpoint.protocol == "SFA") and (endpoint.type == "Reg"):
self.registry = SfaReg(endpoint, authentication)
registry_endpoint = endpoint
elif (endpoint.type == "MA"):
self.ma = MemberAuthority(endpoint, authentication)
ma_endpoint = endpoint
elif (endpoint.type == "FED"):
self.fed = FederationAuthority(endpoint, authentication)
fed_endpoint = endpoint
# search for the SA that uses MA to get the user credential
for endpoint in endpoints:
if (endpoint.type == "SA"):
self.sa = SliceAuthority(endpoint, self.ma)
sa_endpoint = endpoint
break
if not self.registry:
raise ValueError("At least a Registry must be specified")
# Set SA for the MA to get user.slices & user.projects
if self.ma and self.sa:
self.ma.set_sa(self.sa)
# Set SA & MA for FED to get authority.pi_users, authority.projects
if self.fed:
self.fed.set_sa(self.sa)
self.fed.set_ma(self.ma)
if not self.registry and not self.ma:
raise ValueError("At least a Registry or a Member Authority must be specified")
# search for the AMs
for endpoint in endpoints:
if endpoint.type == "AM":
if endpoint.protocol == "SFA":
self.ams.append(SfaAm(endpoint, SfaReg(registry_endpoint, authentication)) )
if registry_endpoint:
self.ams.append(SfaAm(endpoint, self.registry))
if sa_endpoint:
self.ams.append(SfaAm(endpoint, self.sa))
elif endpoint.protocol == "FIT-Cloud":
email = settings.auth['email']
password = settings.auth['password']
self.ams.append(FitCloud(endpoint, {'email': email, 'password': password}))
def __getattr__(self, entity):
def method_handler():
if not entity in self._entities:
......@@ -118,14 +172,24 @@ class Api(object):
# res = future.result(timeout=5)
#except concurrent.futures.TimeoutError as e:
# res = {'data':[],'errors':[e]}
#print("api.init res = {}".format(res))
result['data'] += res['data']
result['errors'] += res['errors']
return result
def version(self):
threads = [self._thread_handler(self.registry.version)] + \
[self._thread_handler(am.version) for am in self.ams]
threads = []
if self.registry:
threads += [self._thread_handler(self.registry.version)]
if self.ma:
threads += [self._thread_handler(self.ma.version)]
if self.sa:
threads += [self._thread_handler(self.sa.version)]
if self.fed:
threads += [self._thread_handler(self.fed.version)]
threads += [self._thread_handler(am.version) for am in self.ams]
res = self._parallel_request(threads)
return res
......@@ -133,24 +197,36 @@ class Api(object):
def get_credentials(self, ids, delegated_to=None):
threads = []
for id in ids:
threads += [self._thread_handler(self.registry.get_credential,id,delegated_to)]
if self.registry:
threads += [self._thread_handler(self.registry.get_credential,id,delegated_to)]
if self.ma:
threads += [self._thread_handler(self.ma.get_credential,id)]
result = self._parallel_request(threads)
return result
def get(self, id=None, raw=False):
def get(self, id=None, filter=None, raw=False):
#print("api filter={}".format(filter))
result = {}
threads = []
if self._entity in self._am:
for am in self.ams:
threads += [self._thread_handler(am.get, self._entity, id, raw)]
if self._entity in self._registry:
if self.ma and self._entity in self._ma:
threads += [self._thread_handler(self.ma.get, self._entity, id, filter)]
if self.sa and self._entity in self._sa:
threads += [self._thread_handler(self.sa.get, self._entity, id, filter)]
if self.fed and self._entity in self._fed:
threads += [self._thread_handler(self.fed.get, self._entity, id, filter)]
if self.registry and self._entity in self._registry:
threads += [self._thread_handler(self.registry.get, self._entity, id)]
if self._entity not in self._am and self._entity not in self._registry:
raise NotImplementedError('Not implemented')
result = self._parallel_request(threads)
#print("API result = {}".format(result))
return result
def update(self, id, params):
......@@ -160,7 +236,7 @@ class Api(object):
result = {}
threads = []
creating = False
if self._entity in self._registry:
if self.registry and self._entity in self._registry:
res_get = self.registry.get(self._entity, id)
if len(res_get['errors']) > 0 or len(res_get['data']) == 0:
# Re-Initialize the logs
......@@ -168,8 +244,31 @@ class Api(object):
res_reg = self.registry.create(self._entity, id, params)
creating = True
else:
# XXX We should flag if Exception is raised
res_reg = self.registry.update(self._entity, id, params)
# XXX We should flag if Exception is raised
# XXX because for Slice if Registry call failed we will not call the AMs
if self.ma and self._entity in self._ma:
res_get = self.ma.get(self._entity, id)
if len(res_get['errors']) > 0 or len(res_get['data']) == 0:
# Re-Initialize the logs
self.ma.logs = []
res_reg = self.ma.create(self._entity, id, params)
creating = True
else:
res_reg = self.ma.update(self._entity, id, params)
if self.sa and self._entity in self._sa:
res_get = self.sa.get(self._entity, id)
if len(res_get['errors']) > 0 or len(res_get['data']) == 0:
# Re-Initialize the logs
self.sa.logs = []
res_reg = self.sa.create(self._entity, id, params)
creating = True
else:
if 'geni_users' not in params or len(params['geni_users'])==0:
params['geni_users'] = res_get['data'][0]['geni_users']
res_reg = self.sa.update(self._entity, id, params)
if self._entity in self._am and not creating:
for am in self.ams:
......@@ -193,13 +292,22 @@ class Api(object):
def delete(self, id):
exists = self.get(id)
threads = []
result = True
if not exists:
raise Exception('This object do not exist')
if self._entity in self._am:
for am in self.ams:
threads += [self._thread_handler(am.delete, self._entity, id)]
if self._entity in self._registry:
if self.registry and self._entity in self._registry:
threads += [self._thread_handler(self.registry.delete, self._entity, id)]
if self.ma and self._entity in self._ma:
threads += [self._thread_handler(self.ma.delete, self._entity, id)]
if self.sa and self._entity in self._sa:
threads += [self._thread_handler(self.sa.delete, self._entity, id)]
result = self._parallel_request(threads)
......
import socket
from urllib.parse import urlparse
from sfa.trust.credential import Credential
from myslicelib.api.xmlrpc import Xmlrpc
from myslicelib.util.sfa import hrn_to_urn, urn_to_hrn
from pprint import pprint
import logging
logger = logging.getLogger(__name__)
objects = {
"user": "MEMBER",
"key": "KEY",
"slice": "SLICE",
"project": "PROJECT",
"authority": "SERVICE",
}
fields = {
'user': {
'shortname': 'MEMBER_USERNAME',
'first_name': 'MEMBER_FIRSTNAME',
'last_name': 'MEMBER_LASTNAME',
'certificate': 'MEMBER_CERTIFICATE',
'email': 'MEMBER_EMAIL',
},
'slice': {
'shortname': 'SLICE_NAME',
'created': 'SLICE_CREATION',
'expiration': 'SLICE_EXPIRATION',
'description': 'SLICE_DESCRIPTION',
'project': 'SLICE_PROJECT_URN',
'authority': 'SLICE_PROJECT_URN',
'pi_users': 'SLICE_LEAD',
},
'project': {
'shortname': 'PROJECT_NAME',
'created': 'PROJECT_CREATION',
'expiration': 'PROJECT_EXPIRATION',
'description': 'PROJECT_DESCRIPTION',
'pi_users': 'PROJECT_LEAD',
},
'authority': {
'shortname': 'SERVICE_NAME',
'certificate': 'SERVICE_CERT',
'description': 'SERVICE_DESCRIPTION',
'url': 'SERVICE_URL',
'type': 'SERVICE_TYPE',
},
}
cbas_id = {
"user": "MEMBER_URN",
"key": "KEY_MEMBER",
"slice": "SLICE_URN",
"project": "PROJECT_URN",
"authority": "SERVICE_URN",
}
class Cbas(Xmlrpc):
def __init__(self, endpoint=None, authentication=None):
super(Cbas, self).__init__(endpoint, authentication)
# version call
self._version = self.version(raw=True)
self.user_credential = None
# logs
self.logs = []
def get_credential(self, urn, raw=False, credentials=None):
try:
cred = None
hrn, entity = urn_to_hrn(urn)
options = {}
result_credentials = self._proxy.get_credentials(
urn,
credentials,
options)
if 'code' in result_credentials and result_credentials['code'] == 0:
#cred = [{'geni_type': 'geni_sfa', 'geni_version':'3', 'geni_value' : result_credentials['value'][0]['geni_value']}]
cred = result_credentials['value'][0]
# Reset the proxy SSL Context
#c = Credential(string=cred['geni_value'])
#if c.get_gid_object().save_to_string() is None:
# print("cred = {}".format(cred))
# print("get_gid_object = {}".format(c.get_gid_object().save_to_string()))
# print("get_gid_caller = {}".format(c.get_gid_caller().save_to_string()))
#self.authentication.certificate = c.get_gid_caller().save_to_string()
context = self.get_context()
self.set_proxy(context)
if raw:
return cred
d = [{
'id': urn,
'type': entity,
'xml': cred['geni_value'],
'delegated_to': None,
}]
self.user_credentials += d
except Exception as e:
import traceback
traceback.print_exc()
d = []
self.logs.append({
'endpoint': self.endpoint.name,
'url': self.endpoint.url,
'protocol': self.endpoint.protocol,
'type': self.endpoint.type,
'exception': str(e)
})
return {'data':d,'errors':self.logs}
def search_credential(self, hrn=None, entity=None, urn=None):
if urn:
hrn, entity = urn_to_hrn(urn)
else:
urn = hrn_to_urn(hrn, entity)
return self.get_credential(urn, raw=True)
def version(self, raw=False):
message = None
version = None
urn = None
ret = {}
online = False
status = "offline"
try:
# ret = self._proxy.GetVersion()
ret = self._proxy.get_version()
if 'value' in ret:
# AM
if 'geni_api' in ret['value']:
version = ret['value']['geni_api']
if 'VERSION' in ret['value']:
version = ret['value']['VERSION']
if 'urn' in ret['value']:
urn = ret['value']['urn']
elif 'URN' in ret['value']:
urn = ret['value']['URN']
else:
urn = hrn_to_urn(ret['value']['hrn'], 'authority')
online = True
status = "online"
elif 'sfa' in ret:
# Registry
version = ret['sfa']
urn = ret['urn']
online = True
status = "online"
else:
message = "Error parsing returned value"
except socket.timeout:
message = "Connection timed out ({})".format(self.endpoint.url)
except socket.gaierror:
message = "Server name/address not provided or unknown ({})".format(self.endpoint.url)
except Exception as e:
message = str(e)
if raw:
if 'value' in ret:
return ret['value']
return ret
else:
return {
'data': [{
'status': status,
"connection":{
'online' : online,
'message' : message
},
'id': urn,
'version': version,
'type': self.endpoint.type,
'url' : self.endpoint.url,
'technologies' : self.endpoint.technologies,
'hostname' : urlparse(self.endpoint.url).hostname,
'name' : self.endpoint.name,
'api' : {
"type" : self.endpoint.type,
"protocol" : self.endpoint.protocol,
'version': version,
},
'hasLeases': self.endpoint.hasLeases,
}],
'errors':[message],
}
def _lookup(self, entity, filter={}, raw=False):
if entity in objects.keys():
cbas_entity = objects[entity]
else:
cbas_entity = entity
lookup_result = self._proxy.lookup(cbas_entity, [self.user_credential], filter)
result = None
if type(lookup_result) is dict :
if 'value' in lookup_result:
result = lookup_result['value']
else:
raise KeyError('no values were retuned by query ')
else:
raise TypeError('Expected dict, got something else')
# formatting the response
if raw:
return result
# result = self._extract_with_entity(entity, result)
else:
result = getattr(self, "_" + entity)(result)
return result
def get(self, entity, urn=None, filter=None, raw=False):
result = []
try:
_filter = {}
if urn:
_filter = {"match":{cbas_id[entity]:urn}}
if filter:
mapped_filter = {fields[entity][k]: v for k, v in filter.items()}
if "match" not in _filter:
_filter["match"] = {}
_filter["match"].update(mapped_filter)
result = self._lookup(entity, _filter, raw)
if raw:
return result
except Exception as e:
import traceback
traceback.print_exc()
self.logs.append({
'endpoint': self.endpoint.name,
'url': self.endpoint.url,
'protocol': self.endpoint.protocol,
'type': self.endpoint.type,
'exception': str(e)
})
return {'data':result,'errors':self.logs}
def delete(self, entity, urn):
result = []
try:
if entity in objects.keys():
cbas_entity = objects[entity]
else:
cbas_entity = entity
delete_result = self._proxy.delete(cbas_entity, urn, [self.user_credential], {})
if 'code' not in delete_result or delete_result['code'] != 0:
raise Exception(delete_result)
if entity == "user":
self.delete_key(urn)
except Exception as e:
import traceback
traceback.print_exc()
self.logs.append({
'endpoint': self.endpoint.name,
'url': self.endpoint.url,
'protocol': self.endpoint.protocol,
'type': self.endpoint.type,
'exception': str(e)
})
return {'data':result,'errors':self.logs}
from myslicelib.api.cbas import Cbas, objects, fields, cbas_id
from myslicelib.util.sfa import hrn_to_urn, urn_to_hrn
import logging
logger = logging.getLogger(__name__)
class FederationAuthority(Cbas):
def __init__(self, endpoint=None, authentication=None):
super(FederationAuthority, self).__init__(endpoint, authentication)
#print("Fed get_credentials")
#self.credential = self.get_credential(self.authentication.urn)
def get(self, entity, urn=None, filter=None, raw=False):
#print("filter = {}".format(filter))
try:
if entity in objects.keys():
cbas_entity = objects[entity]
else:
cbas_entity = entity
result = super(FederationAuthority, self).get(entity, urn, filter, raw)
if raw:
return result
except Exception as e:
import traceback
traceback.print_exc()
self.logs.append({
'endpoint': self.endpoint.name,
'url': self.endpoint.url,
'protocol': self.endpoint.protocol,
'type': self.endpoint.type,
'exception': str(e)
})
return result
def set_sa(self, sa):
self.sa = sa
def set_ma(self, ma):
self.ma = ma
def _authority(self, data):
authorities = []
for a in data:
urn = a.get('SERVICE_URN')
authority = urn
# TODO: get users from MA
users = []
# TODO: get pi_users from MA
pi_users = []
ma_users = self.ma.get('user')['data']
for user in ma_users:
users.append(user['id'])
pattern = urn.replace(urn.split("+")[-1],"")
for authority in user['pi_authorities']:
if pattern in