Appendix A. Custom Script Samples
The sample customs script are written in Python 3 and contain classes and methods that are used to create a PD snapshot in a Nutanix AHV cluster and Prism Central. The scripts also include methods that you can use to implement pre-freeze and post-thaw operations:
- doSomeChecks(pdVms)
- freezeVms(self, pdVmsList)
- unFreezeVms(self, pdVms)
Note |
The sample scripts do not not check whether the provided Nutanix AHV cluster certificate is valid. |
#!/usr/bin/python3 import requests import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) import json import time import logging import argparse import os from urllib.parse import urlparse from urllib.parse import urlunparse from urllib.parse import urlencode
def quitScriptFailed(message): print(""" {"status": "Failed", "errorMessage": "%s"} """ % message) exit(0)
# ################################################################################################## # # Initialize # # ##################################################################################################
scriptDir = os.path.dirname(os.path.realpath(__file__))
# # Process Command Line #
parser = argparse.ArgumentParser(description='Create vms snapshot for Protection Domain') parser.add_argument('--pdName', type=str, help='Protection Domain name') parser.add_argument('--jobName', type=str, help='Job Name for displaying in header') parser.add_argument('--logFolder', type=str, help='Logging path') parser.add_argument('--logLevel', type=str, nargs='?', default='DEBUG', help='Log level: debug, info, warning, error, critical. (default=debug)') args = parser.parse_args()
if args.pdName is None or args.pdName == "": quitScriptFailed("Invalid argument: pdName")
if args.jobName is None or args.jobName == "": quitScriptFailed("Invalid argument: jobName")
pdName = args.pdName jobName = args.jobName logDir = args.logFolder logLevel = args.logLevel
# # Init Log #
from datetime import datetime
logFilePath = "%s/%s" % (logDir, "custom_script.log") os.makedirs(logDir, exist_ok=True) LogConsoleDbg = False
logging.addLevelName(logging.FATAL, 'Fatal') logging.addLevelName(logging.ERROR, 'Error') logging.addLevelName(logging.WARN, 'Warn ') logging.addLevelName(logging.INFO, 'Info ') logging.addLevelName(logging.DEBUG, 'Debug') logging.basicConfig( filename=logFilePath, filemode='a', format='%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s', datefmt='[%Y-%m-%d] [%H:%M:%S', level=str.upper(logLevel) )
# LogConsoleDbg = True
if LogConsoleDbg: Formater = logging.getLogger().handlers[0].formatter consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(Formater) logging.getLogger().addHandler(consoleHandler)
logging.info("--------------------------------------------------------------------------------") logging.info("Start to script execution") logging.info(" Protection Domain name: %s" % pdName) logging.info(" Job name: %s" % jobName) logging.info("--------------------------------------------------------------------------------")
# # Init config from env variables #
try: nutanixClusterIp = os.getenv('NUTANIX_CLUSTER_ADDRESS') nutanixLogin = os.getenv('NUTANIX_CLUSTER_LOGIN') nutanixPass = os.getenv('NUTANIX_CLUSTER_PASSWORD')
except Exception as e: # print(e) Error = "Failed to get environment variables"; logging.exception(Error) quitScriptFailed(Error)
# ################################################################################################## # # Result's # # ##################################################################################################
class ScriptResult: """ Describes the attributes and methods required to work with the Nutanix cluster API """
SUCCESS = 0 WARNING = 1 FAILED = 2
def __init__(self, status = SUCCESS, errorMessage = ""):
self.status = status self.errorMessage = errorMessage
self.oob_schedule_id = ""
def SetFailed(self, message): self.status = ScriptResult.FAILED self.errorMessage = message
def SetWarning(self, message): self.status = ScriptResult.WARNING self.errorMessage = message
def makeJsonString(self):
jResult = dict()
if self.status == ScriptResult.SUCCESS: jResult['status'] = "Success" elif self.status == ScriptResult.WARNING: jResult['status'] = "Warning" elif self.status == ScriptResult.FAILED: jResult['status'] = "Failed"
jResult['errorMessage'] = self.errorMessage jResult['oob_schedule_id'] = self.oob_schedule_id
return json.dumps(jResult)
def generateResult(result):
jsonString = result.makeJsonString()
if result.status == ScriptResult.FAILED or result.status == ScriptResult.WARNING: logging.error("Custom script execution finished with error. Result: %s" % jsonString) else: logging.info("Custom script execution finished. Result: %s" % jsonString)
print(jsonString) exit(0)
def generateResultFailed(errorMessage, exception = None): if isinstance(exception, NutanixException): errorMessage += " Error: %s" % str(exception) # Add error message of Nutanix API call. E.g.: "Specified protection domain AgentTest-ProtectionDomain does not exist" elif isinstance(exception, AuthenticationException): errorMessage += " Error: Authentication failed." elif isinstance(exception, requests.exceptions.ConnectionError): errorMessage += " Error: Connect failed."
# if isinstance(e, NutanixExceptionNotFound): generateResult(ScriptResult(ScriptResult.FAILED, errorMessage))
def getFullUrl(url, params):
paramsStr = urlencode(params) urlObj = urlparse(url) urlQuery = urlObj.query
if paramsStr: if urlQuery: urlQuery = '%s&%s' % (urlQuery, paramsStr) else: urlQuery = paramsStr
urlObj = urlObj._replace(query=urlQuery)
return urlunparse(urlObj)
# ################################################################################################## # # Nutanix # # ##################################################################################################
class AuthenticationException(Exception): def __init__(self): super(AuthenticationException, self).__init__()
class NutanixException(Exception): def __init__(self, message=""): super(NutanixException, self).__init__(message) self.message = message
class NutanixExceptionNotFound(NutanixException): def __init__(self, message=""): super(NutanixExceptionNotFound, self).__init__(message)
class NutanixCluster: """ Describes the attributes and methods required to work with the Nutanix cluster API """ def __init__(self, nutanixClusterIp, nutanixLogin, nutanixPass): """Constructor""" self.nutanixClusterIp = nutanixClusterIp self.nutanixLogin = nutanixLogin self.nutanixPass = nutanixPass self.nutanixApi1 = "https://%s:9440/PrismGateway/services/rest/v1" % self.nutanixClusterIp self.nutanixApi2 = "https://%s:9440/PrismGateway/services/rest/v2.0" % self.nutanixClusterIp self.nutanixApi3 = "https://%s:9440/api/nutanix/v3" % self.nutanixClusterIp
@staticmethod def getErrorMessage(jResponse):
if "message" in jResponse: # for Nutanix API v1 & v2 return jResponse["message"] elif "message_list" in jResponse: # for Nutanix API v3 message = "" for jError in jResponse["message_list"]: if "message" in jError: if len(message): message += "; " message += jError["message"] else: raise requests.exceptions.RequestException return message else: raise requests.exceptions.RequestException
@staticmethod def logRequest(level, httpMethod, urlFull, body=''):
method = httpMethod.upper()
bodyCanExist = False if method == 'POST' or method == 'PUT' or method == 'PATCH': bodyCanExist = True
if bodyCanExist: logging.log(level, "HTTP Request (%s %s): %s", method, urlFull, "" if body else "none") if body: logging.log(level, body) else: logging.log(level, "HTTP Request (%s %s)", method, urlFull)
@staticmethod def logResponse(level, response):
logging.log(level, "HTTP Response (%s %s): %s", response.status_code, response.reason, "" if response.text else "none") if response.text: logging.log(level, response.text)
def executeHttpRequest(self, httpMethod, url, body='', params=''): """ Execute http/https requests :param httpMethod: (get, post ...) :param url: :param body: message body :param params: query string parameters in the URL :return: """ response = None s = requests.Session() s.auth = (self.nutanixLogin, self.nutanixPass) s.headers.update({'Content-Type': 'application/json; charset=utf-8'})
urlFull = getFullUrl(url, params) reqRespLogged = logging.root.level <= logging.DEBUG
# # Do request #
self.logRequest(logging.DEBUG, httpMethod, urlFull, body)
if httpMethod == 'get': response = s.get(url, data=body, params=params, verify=False, timeout=10) elif httpMethod == 'post': response = s.post(url, data=body, params=params, verify=False, timeout=10) else: raise NotImplementedError("HTTP Method '%s' not supported")
self.logResponse(logging.DEBUG, response)
# # Check to failed request & log #
isSuccess = response.status_code in (200, 201, 202)
if not isSuccess and not reqRespLogged: self.logRequest(logging.ERROR, httpMethod, urlFull, body) self.logResponse(logging.ERROR, response)
# # Process response #
if isSuccess: return response.json() else: isJson = response.headers['Content-Type'].lower().find("application/json") != -1
if response.status_code == 401: # Not Found raise AuthenticationException()
if isJson:
message = self.getErrorMessage(response.json()) logging.error("Nutanix request failed. Nutanix error: %s" % message)
if response.status_code == 404: # Not Found raise NutanixExceptionNotFound(message) else: raise NutanixException(message)
else:
logging.error("Nutanix request failed.")
raise requests.exceptions.RequestException
def executeHttpRequestV1(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi1, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def executeHttpRequestV2(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi2, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def executeHttpRequestV3(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi3, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def getPdVms(self, pdName): """ Get vm_id for the Protection Domain :param pdName: Protection Domain name :return pdVms: returns a list of vms_id for the Protection Domain """ pdVms = [] try: logging.info("Getting vm_id list for Protection Domain='%s'", pdName) data = self.executeHttpRequestV2('get', "protection_domains/%s" % pdName) for vms in data["vms"]: pdVms.append(vms["vm_id"])
except Exception as e: logging.error("Getting vm_id list from Protection Domain='%s' failed. Error: %s.", pdName, str(e)) logging.exception(str(e)) generateResultFailed("Getting vm_id list for Protection Domain='%s' failed." % pdName, e) logging.info("Getting vm_id list vm_id='%s' success", pdVms) return pdVms
def getVmsInfoEx(self, vmsUuids, mustExist=True): """ Obtaining information about vms, for example ip address Similarly, you can get vms data about cpu, memory, etc. This method is created as an example. :param vmsUuids: list of vm_id :return vmsInfo: returns a list of vm ip address """ vmsInfo = [] logging.info("Getting vms started. UUIDs: %s", vmsUuids)
for vmUuid in vmsUuids:
vmInfo = {"uuid": vmUuid, "vm": None}
try: data = self.executeHttpRequestV3('get', "vms/%s" % vmUuid) vmInfo["vm"] = data vmsInfo.append(vmInfo) except Exception as e:
if isinstance(e, NutanixExceptionNotFound): if not mustExist: vmsInfo.append(vmInfo) continue
logging.error("Getting vms failed. VM UUID: %s", vmUuid) logging.exception(str(e)) generateResultFailed("Getting vms failed.", e)
logging.info("Getting vms success. UUIDs: %s", vmsUuids)
return vmsInfo
def getVmsInfo(self, vmsUuids, mustExist=True):
vmsInfo = [] vmsInfoEx = self.getVmsInfoEx(vmsUuids, mustExist)
for vmInfo in vmsInfoEx: if vmInfo["vm"]: # if exist on Nutanix --> add to list vmsInfo.append(vmInfo["vm"])
return vmsInfo
def createPdSnapshot(self, pdName): """ Creating vms snapshot for the Protection Domain :param pdName: :return: """ schedule_id = None snapshotInfo = dict()
body = json.dumps({"app_consistent": "false"})
try: logging.info("Creating vms snapshot for Protection Domain='%s'", pdName) data = self.executeHttpRequestV2('post', "protection_domains/%s/oob_schedules" % pdName, body) schedule_id = data["schedule_id"] except Exception as e: logging.error("Creating vms snapshot for Protection Domain='%s' failed" % pdName) logging.exception(str(e)) generateResultFailed("Creating vms snapshot for Protection Domain='%s' failed." % pdName, e)
params = {'oob_schedule_ids': schedule_id} state = '' isStateAvailable = False
for i in range(10): time.sleep(5) #waiting when information about snapshot be available try: logging.info("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s'", schedule_id, pdName) data = self.executeHttpRequestV2('get', "protection_domains/%s/dr_snapshots/" % pdName, '', params) try: state = data["entities"][0]["state"] except: state = None if state == 'AVAILABLE': isStateAvailable = True break except Exception as e: logging.error("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s' failed", schedule_id, pdName) logging.exception(str(e)) logging.info("Unfreeze vms for Protection Domain='%s'", pdName) cluster.unFreezeVms(pdVms) generateResultFailed("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s' failed." % (schedule_id, pdName), e)
if not isStateAvailable: logging.error("Unexpected snapshot status='%s' oob_schedule_ids='%s' for Protection Domain='%s'", state, schedule_id, pdName) logging.info("Unfreeze vms for Protection Domain='%s'", pdName) cluster.unFreezeVms(pdVms) generateResultFailed("Snapshot oob_schedule_ids='%s' status error (status='%s') for Protection Domain='%s'." % (state, schedule_id, pdName))
logging.info("Creating vms snapshot for Protection Domain='%s' success", pdName) pdSnapshotId = data["entities"][0]["snapshot_id"] pdSnapshotUuid = data["entities"][0]["snapshot_uuid"] snapshotInfo['schedule_id'] = schedule_id snapshotInfo['pdSnapshotId'] = pdSnapshotId snapshotInfo['pdSnapshotUuid'] = pdSnapshotUuid return snapshotInfo
def freezeVms(self, pdVmsList): """ Freezing vms :param vmsInfo: vms data needed to freeze :return: """ logging.info("Freezing vms for Protection Domain='%s'", pdName) # here should be added code for freezing vms" logging.info("Freezing vms for Protection Domain='%s' success", pdName) pass
def unFreezeVms(self, pdVms): """ unfreezing vms :param pdVms: vms data needed to unfreeze :return: """ logging.info("Unfreezing vms for Protection Domain='%s'", pdName) # here should be added code for unfreezing vms" logging.info("Unfreezing vms for Protection Domain='%s' success", pdName) pass
def doSomeChecks(pdVms): # here should be added code for PD VMs check # for example get vms ip addresses logging.info("Do some checks")
vmsInfo = [] vmsInfoEx = cluster.getVmsInfoEx(pdVms, False)
for vmInfo in vmsInfoEx: if vmInfo["vm"]: # if exist on Nutanix --> add to list vmsInfo.append(vmInfo["vm"]) else: logging.warning("VM with UUID='%s' will not backup. VM in PD does not exist in cluster.", vmInfo["uuid"])
logging.info("Do some checks success") # here should be added error message (in case of failed checks)
# ################################################################################################## # # Main # # ##################################################################################################
if __name__ == "__main__":
cluster = NutanixCluster(nutanixClusterIp, nutanixLogin, nutanixPass)
pdVms = cluster.getPdVms(pdName) if not pdVms: generateResultFailed("Protection Domain '%s' doesn't have vms." % pdName)
doSomeChecks(pdVms) cluster.freezeVms(pdVms) snapshotInfo = cluster.createPdSnapshot(pdName) cluster.unFreezeVms(pdVms)
logging.info("All steps succeeded")
result = ScriptResult() result.oob_schedule_id = snapshotInfo['schedule_id']
generateResult(result)
|
#!/usr/bin/python3 import requests import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) import json import time import logging import argparse import os from urllib.parse import urlparse from urllib.parse import urlunparse from urllib.parse import urlencode
def quitScriptFailed(message): print(""" {"status": "Failed", "errorMessage": "%s"} """ % message) exit(0)
# ################################################################################################## # # Initialize # # ##################################################################################################
scriptDir = os.path.dirname(os.path.realpath(__file__))
# # Process Command Line #
parser = argparse.ArgumentParser(description='Create vms snapshot for Protection Domain') parser.add_argument('--clusterId', type=str, help='Id of a cluster with the corresponding protection domain') parser.add_argument('--pdName', type=str, help='Protection domain name') parser.add_argument('--jobName', type=str, help='Job name for displaying in header') parser.add_argument('--logFolder', type=str, help='Logging path') parser.add_argument('--logLevel', type=str, nargs='?', default='DEBUG', help='Log level: debug, info, warning, error, critical. (default=warning)') args = parser.parse_args()
if args.pdName is None or args.pdName == "": quitScriptFailed("Invalid argument: pdName")
if args.jobName is None or args.jobName == "": quitScriptFailed("Invalid argument: jobName")
clusterId = args.clusterId pdName = args.pdName jobName = args.jobName logDir = args.logFolder logLevel = args.logLevel
# # Init Log #
from datetime import datetime
logFilePath = f"{logDir}/custom_script.log" os.makedirs(logDir, exist_ok=True) LogConsoleDbg = False
logging.addLevelName(logging.FATAL, 'Fatal') logging.addLevelName(logging.ERROR, 'Error') logging.addLevelName(logging.WARN, 'Warn ') logging.addLevelName(logging.INFO, 'Info ') logging.addLevelName(logging.DEBUG, 'Debug') logging.basicConfig( filename=logFilePath, filemode='a', format='%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s', datefmt='[%Y-%m-%d] [%H:%M:%S', level=str.upper(logLevel) )
# LogConsoleDbg = True
if LogConsoleDbg: Formater = logging.getLogger().handlers[0].formatter consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(Formater) logging.getLogger().addHandler(consoleHandler)
logging.info("--------------------------------------------------------------------------------") logging.info("Start to script execution") logging.info(f" Protection Domain name: {pdName}") logging.info(f" Job name: {jobName}") logging.info("--------------------------------------------------------------------------------")
# # Init config from env variables #
try: nutanixPrismCentralIp = os.getenv('NUTANIX_PRISM_CENTRAL_ADDRESS') nutanixLogin = os.getenv('NUTANIX_PRISM_CENTRAL_LOGIN') nutanixPass = os.getenv('NUTANIX_PRISM_CENTRAL_PASSWORD')
except Exception as e: # print(e) Error = "Failed to get environment variables"; logging.exception(Error) quitScriptFailed(Error)
# ################################################################################################## # # Result's # # ##################################################################################################
class ScriptResult: """ Describes the attributes and methods required to work with the Nutanix cluster API """
SUCCESS = 0 WARNING = 1 FAILED = 2
def __init__(self, status=SUCCESS, errorMessage=""):
self.status = status self.errorMessage = errorMessage
self.oob_schedule_id = ""
def SetFailed(self, message): self.status = ScriptResult.FAILED self.errorMessage = message
def SetWarning(self, message): self.status = ScriptResult.WARNING self.errorMessage = message
def makeJsonString(self):
jResult = dict()
if self.status == ScriptResult.SUCCESS: jResult['status'] = "Success" elif self.status == ScriptResult.WARNING: jResult['status'] = "Warning" elif self.status == ScriptResult.FAILED: jResult['status'] = "Failed"
jResult['errorMessage'] = self.errorMessage jResult['oob_schedule_id'] = self.oob_schedule_id
return json.dumps(jResult)
def generateResult(result): jsonString = result.makeJsonString()
if result.status == ScriptResult.FAILED or result.status == ScriptResult.WARNING: logging.error(f"Custom script execution finished with error. Result: {jsonString}") else: logging.info(f"Custom script execution finished. Result: {jsonString}")
print(jsonString) exit(0)
def generateResultFailed(errorMessage, exception=None): if isinstance(exception, NutanixException): errorMessage += f" Error: {str(exception)}" # Add error message of Nutanix API call. E.g.: "Specified protection domain AgentTest-ProtectionDomain does not exist" elif isinstance(exception, AuthenticationException): errorMessage += " Error: Authentication failed." elif isinstance(exception, requests.exceptions.ConnectionError): errorMessage += " Error: Connect failed."
# if isinstance(e, NutanixExceptionNotFound): generateResult(ScriptResult(ScriptResult.FAILED, errorMessage))
def getFullUrl(url, params): paramsStr = urlencode(params) urlObj = urlparse(url) urlQuery = urlObj.query
if paramsStr: if urlQuery: urlQuery = f"{urlQuery}&{paramsStr}" else: urlQuery = paramsStr
urlObj = urlObj._replace(query=urlQuery)
return urlunparse(urlObj)
# ################################################################################################## # # Nutanix # # ##################################################################################################
class AuthenticationException(Exception): def __init__(self): super(AuthenticationException, self).__init__()
class NutanixException(Exception): def __init__(self, message=""): super(NutanixException, self).__init__(message) self.message = message
class NutanixExceptionNotFound(NutanixException): def __init__(self, message=""): super(NutanixExceptionNotFound, self).__init__(message)
class NutanixCluster: """ Describes the attributes and methods required to work with the Nutanix cluster API """
def __init__(self, nutanixPrismCentralIp, nutanixLogin, nutanixPass): """Constructor""" self.nutanixPrismCentralIp = nutanixPrismCentralIp self.nutanixLogin = nutanixLogin self.nutanixPass = nutanixPass self.nutanixApi1 = f"https://{self.nutanixPrismCentralIp}:9440/PrismGateway/services/rest/v1" self.nutanixApi2 = f"https://{self.nutanixPrismCentralIp}:9440/PrismGateway/services/rest/v2.0" self.nutanixApi3 = f"https://{self.nutanixPrismCentralIp}:9440/api/nutanix/v3"
@staticmethod def getErrorMessage(jResponse):
if "message" in jResponse: # for Nutanix API v1 & v2 return jResponse["message"] elif "message_list" in jResponse: # for Nutanix API v3 message = "" for jError in jResponse["message_list"]: if "message" in jError: if len(message): message += "; " message += jError["message"] else: raise requests.exceptions.RequestException return message else: raise requests.exceptions.RequestException
@staticmethod def logRequest(level, httpMethod, urlFull, body=''):
method = httpMethod.upper()
bodyCanExist = False if method == 'POST' or method == 'PUT' or method == 'PATCH': bodyCanExist = True
if bodyCanExist: logging.log(level, "HTTP Request (%s %s): %s", method, urlFull, "" if body else "none") if body: logging.log(level, body) else: logging.log(level, "HTTP Request (%s %s)", method, urlFull)
@staticmethod def logResponse(level, response):
logging.log(level, "HTTP Response (%s %s): %s", response.status_code, response.reason, "" if response.text else "none") if response.text: logging.log(level, response.text)
def executeHttpRequest(self, httpMethod, url, body='', params=''): """ Execute http/https requests :param httpMethod: (get, post ...) :param url: :param body: message body :param params: query string parameters in the URL :return: """ response = None s = requests.Session() s.auth = (self.nutanixLogin, self.nutanixPass) s.headers.update({'Content-Type': 'application/json; charset=utf-8'})
urlFull = getFullUrl(url, params) reqRespLogged = logging.root.level <= logging.DEBUG
# # Do request #
self.logRequest(logging.DEBUG, httpMethod, urlFull, body)
if httpMethod == 'get': response = s.get(url, data=body, params=params, verify=False, timeout=10) elif httpMethod == 'post': response = s.post(url, data=body, params=params, verify=False, timeout=10) else: raise NotImplementedError("HTTP Method '%s' not supported")
self.logResponse(logging.DEBUG, response)
# # Check to failed request & log #
isSuccess = response.status_code in (200, 201, 202)
if not isSuccess and not reqRespLogged: self.logRequest(logging.ERROR, httpMethod, urlFull, body) self.logResponse(logging.ERROR, response)
# # Process response #
if isSuccess: return response.json() else: isJson = response.headers['Content-Type'].lower().find("application/json") != -1
if response.status_code == 401: # Not Found raise AuthenticationException()
if isJson:
message = self.getErrorMessage(response.json()) logging.error("Nutanix request failed. Nutanix error: %s" % message)
if response.status_code == 404: # Not Found raise NutanixExceptionNotFound(message) else: raise NutanixException(message)
else:
logging.error("Nutanix request failed.")
raise requests.exceptions.RequestException
def executeHttpRequestV1(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi1, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def executeHttpRequestV2(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi2, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def executeHttpRequestV3(self, httpMethod, relativeUrl, body='', params=''): url = "%s/%s" % (self.nutanixApi3, relativeUrl) return self.executeHttpRequest(httpMethod, url, body, params)
def getPdVms(self, pdName): """ Get vm_id for the Protection Domain :param pdName: Protection Domain name :return pdVms: returns a list of vms_id for the Protection Domain """ pdVms = [] params = {"proxyClusterUuid": clusterId}
try: logging.info(f"Getting vm_id list for Protection Domain='{pdName}'") data = self.executeHttpRequestV2('get', f"protection_domains/{pdName}", params=params) for vms in data["vms"]: pdVms.append(vms["vm_id"])
except Exception as e: logging.error(f"Getting vm_id list from Protection Domain='{pdName}' failed. Error: {str(e)}.") logging.exception(str(e)) generateResultFailed(f"Getting vm_id list for Protection Domain='{pdName}' failed.", e) logging.info(f"Getting vm_id list vm_id='{pdVms}' success") return pdVms
def getVmsInfoEx(self, vmsUuids, mustExist=True): """ Obtaining information about vms, for example ip address Similarly, you can get vms data about cpu, memory, etc. This method is created as an example. :param vmsUuids: list of vm_id :return vmsInfo: returns a list of vm ip address """ vmsInfo = [] params = {"proxyClusterUuid": clusterId} logging.info(f"Getting vms started. UUIDs: {vmsUuids}")
for vmUuid in vmsUuids:
vmInfo = {"uuid": vmUuid, "vm": None}
try: data = self.executeHttpRequestV3('get', f"vms/{vmUuid}", params=params) vmInfo["vm"] = data vmsInfo.append(vmInfo) except Exception as e:
if isinstance(e, NutanixExceptionNotFound): if not mustExist: vmsInfo.append(vmInfo) continue
logging.error(f"Getting vms failed. VM UUID: {vmUuid}", ) logging.exception(str(e)) generateResultFailed("Getting vms failed.", e)
logging.info(f"Getting vms success. UUIDs: {vmsUuids}")
return vmsInfo
def getVmsInfo(self, vmsUuids, mustExist=True):
vmsInfo = [] vmsInfoEx = self.getVmsInfoEx(vmsUuids, mustExist)
for vmInfo in vmsInfoEx: if vmInfo["vm"]: # if exist on Nutanix --> add to list vmsInfo.append(vmInfo["vm"])
return vmsInfo
def createPdSnapshot(self, pdName): """ Creating vms snapshot for the Protection Domain :param pdName: :return: """ schedule_id = None snapshotInfo = dict()
body = json.dumps({"app_consistent": "false"})
try: logging.info(f"Creating vms snapshot for Protection Domain='{pdName}'") data = self.executeHttpRequestV2('post', f"protection_domains/{pdName}/oob_schedules", body, params={"proxyClusterUuid": clusterId}) schedule_id = data["schedule_id"] except Exception as e: logging.error(f"Creating vms snapshot for Protection Domain='{pdName}' failed") logging.exception(str(e)) generateResultFailed(f"Creating vms snapshot for Protection Domain='{pdName}' failed.", e)
params = { 'oob_schedule_ids': schedule_id, "proxyClusterUuid": clusterId }
state = '' isStateAvailable = False
for i in range(10): time.sleep(5) # waiting when information about snapshot be available try: logging.info(f"Getting snapshot status oob_schedule_ids='{schedule_id}' for Protection Domain='{pdName}'") data = self.executeHttpRequestV2('get', f"protection_domains/{pdName}/dr_snapshots/", '', params) try: state = data["entities"][0]["state"] except: state = None if state == 'AVAILABLE': isStateAvailable = True break except Exception as e: logging.error(f"Getting snapshot status oob_schedule_ids='{schedule_id}' for Protection Domain='{pdName}' failed") logging.exception(str(e)) logging.info(f"Unfreeze vms for Protection Domain='{pdName}'") cluster.unFreezeVms(pdVms) generateResultFailed(f"Getting snapshot status oob_schedule_ids='{schedule_id}' for Protection Domain='{pdName}' failed.", e)
if not isStateAvailable: logging.error(f"Unexpected snapshot status='{state}' oob_schedule_ids='{schedule_id}' for Protection Domain='{pdName}'") logging.info(f"Unfreeze vms for Protection Domain='{pdName}'") cluster.unFreezeVms(pdVms) generateResultFailed(f"Snapshot oob_schedule_ids='{state}' status error (status='{schedule_id}') for Protection Domain='{pdName}'.")
logging.info(f"Creating vms snapshot for Protection Domain='{pdName}' success") pdSnapshotId = data["entities"][0]["snapshot_id"] pdSnapshotUuid = data["entities"][0]["snapshot_uuid"] snapshotInfo['schedule_id'] = schedule_id snapshotInfo['pdSnapshotId'] = pdSnapshotId snapshotInfo['pdSnapshotUuid'] = pdSnapshotUuid return snapshotInfo
def freezeVms(self, pdVmsList): """ Freezing vms :param vmsInfo: vms data needed to freeze :return: """ logging.info(f"Freezing vms for Protection Domain='{pdName}'") # here should be added code for freezing vms" logging.info(f"Freezing vms for Protection Domain='{pdName}' success") pass
def unFreezeVms(self, pdVms): """ unfreezing vms :param pdVms: vms data needed to unfreeze :return: """ logging.info(f"Unfreezing vms for Protection Domain='{pdName}'") # here should be added code for unfreezing vms" logging.info(f"Unfreezing vms for Protection Domain='{pdName}' success") pass
def doSomeChecks(pdVms): # here should be added code for PD VMs check # for example get vms ip addresses logging.info("Do some checks")
vmsInfo = [] vmsInfoEx = cluster.getVmsInfoEx(pdVms, False)
for vmInfo in vmsInfoEx: if vmInfo["vm"]: # if exist on Nutanix --> add to list vmsInfo.append(vmInfo["vm"]) else: logging.warning(f"VM with UUID='{vmInfo['uuid']}' will not backup. VM in PD does not exist in cluster.")
logging.info("Do some checks success") # here should be added error message (in case of failed checks)
# ################################################################################################## # # Main # # ##################################################################################################
if __name__ == "__main__":
cluster = NutanixCluster(nutanixPrismCentralIp, nutanixLogin, nutanixPass)
pdVms = cluster.getPdVms(pdName) if not pdVms: generateResultFailed(f"Protection Domain '{pdName}' doesn't have vms.")
doSomeChecks(pdVms) cluster.freezeVms(pdVms) snapshotInfo = cluster.createPdSnapshot(pdName) cluster.unFreezeVms(pdVms)
logging.info("All steps succeeded")
result = ScriptResult() result.oob_schedule_id = snapshotInfo['schedule_id']
generateResult(result)
|