Appendix A. Custom Script Sample

The sample custom script is written in Python 3 and contains classes and methods that are used to create a PD snapshot. The script also includes methods that you can use to implement pre-freeze and post-thaw operations:

  • doSomeChecks(pdVms)
  • freezeVms(self, pdVmsList)
  • unFreezeVms(self, pdVms)

Note

The sample script does not check whether the provided Nutanix AHV cluster certificate is valid.

#!/usr/bin/python3

import requests

import urllib3

 

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

import json

import time

import logging

import argparse

import os

from urllib.parse import urlparse

from urllib.parse import urlunparse

from urllib.parse import urlencode

 

def quitScriptFailed(message):

   print("""  {"status": "Failed", "errorMessage": "%s"}  """ % message)

   exit(0)

 

# ##################################################################################################

#

# Initialize

#

# ##################################################################################################

 

scriptDir = os.path.dirname(os.path.realpath(__file__))

 

#

# Process Command Line

#

 

parser = argparse.ArgumentParser(description='Create vms snapshot for Protection Domain')

parser.add_argument('--pdName', type=str, help='Protection Domain name')

parser.add_argument('--jobName', type=str, help='Job Name for displaying in header')

parser.add_argument('--logFolder', type=str, help='Logging path')

parser.add_argument('--logLevel', type=str, nargs='?', default='DEBUG', help='Log level: debug, info, warning, error, critical. (default=debug)')

args = parser.parse_args()

 

if args.pdName is None or args.pdName == "":

   quitScriptFailed("Invalid argument: pdName")

 

if args.jobName is None or args.jobName == "":

   quitScriptFailed("Invalid argument: jobName")

 

pdName = args.pdName

jobName = args.jobName

logDir = args.logFolder

logLevel = args.logLevel

 

#

# Init Log

#

 

from datetime import datetime

 

logFilePath = "%s/%s" % (logDir, "custom_script.log")

os.makedirs(logDir, exist_ok=True)

LogConsoleDbg = False

 

logging.addLevelName(logging.FATAL, 'Fatal')

logging.addLevelName(logging.ERROR, 'Error')

logging.addLevelName(logging.WARN,  'Warn ')

logging.addLevelName(logging.INFO,  'Info ')

logging.addLevelName(logging.DEBUG, 'Debug')

logging.basicConfig(

           filename=logFilePath,

           filemode='a',

           format='%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s',

           datefmt='[%Y-%m-%d] [%H:%M:%S',

           level=str.upper(logLevel)

           )

 

# LogConsoleDbg = True

 

if LogConsoleDbg:

   Formater = logging.getLogger().handlers[0].formatter

   consoleHandler = logging.StreamHandler()

   consoleHandler.setFormatter(Formater)

   logging.getLogger().addHandler(consoleHandler)

 

logging.info("--------------------------------------------------------------------------------")

logging.info("Start to script execution")

logging.info("    Protection Domain name: %s" % pdName)

logging.info("    Job name:               %s" % jobName)

logging.info("--------------------------------------------------------------------------------")

 

#

# Init config from env variables

#

 

 

try:

   nutanixClusterIp = os.getenv('NUTANIX_CLUSTER_ADDRESS')

   nutanixLogin = os.getenv('NUTANIX_CLUSTER_LOGIN')

   nutanixPass = os.getenv('NUTANIX_CLUSTER_PASSWORD')

 

except Exception as e:

   # print(e)

   Error = "Failed to get environment variables";

   logging.exception(Error)

   quitScriptFailed(Error)

 

 

 

# ##################################################################################################

#

# Result's

#

# ##################################################################################################

 

class ScriptResult:

   """

   Describes the attributes and methods required to work with the Nutanix cluster API

   """

 

   SUCCESS = 0

   WARNING = 1

   FAILED = 2

 

   def __init__(self, status = SUCCESS, errorMessage = ""):

 

       self.status = status

       self.errorMessage = errorMessage

 

       self.oob_schedule_id = ""

 

   def SetFailed(self, message):

       self.status = ScriptResult.FAILED

       self.errorMessage = message

 

   def SetWarning(self, message):

       self.status = ScriptResult.WARNING

       self.errorMessage = message

 

   def makeJsonString(self):

 

       jResult = dict()

 

       if self.status == ScriptResult.SUCCESS:

           jResult['status'] = "Success"

       elif self.status == ScriptResult.WARNING:

           jResult['status'] = "Warning"

       elif self.status == ScriptResult.FAILED:

           jResult['status'] = "Failed"

 

       jResult['errorMessage'] = self.errorMessage

       jResult['oob_schedule_id'] = self.oob_schedule_id

 

       return json.dumps(jResult)

 

def generateResult(result):

 

   jsonString = result.makeJsonString()

 

   if result.status == ScriptResult.FAILED or result.status == ScriptResult.WARNING:

       logging.error("Custom script execution finished with error. Result: %s" % jsonString)

   else:

       logging.info("Custom script execution finished. Result: %s" % jsonString)

 

   print(jsonString)

   exit(0)

 

def generateResultFailed(errorMessage, exception = None):

   if isinstance(exception, NutanixException):

       errorMessage += " Error: %s" % str(exception)       # Add error message of Nutanix API call. E.g.: "Specified protection domain AgentTest-ProtectionDomain does not exist"

   elif isinstance(exception, AuthenticationException):

       errorMessage += " Error: Authentication failed."

   elif isinstance(exception, requests.exceptions.ConnectionError):

       errorMessage += " Error: Connect failed."

 

       # if isinstance(e, NutanixExceptionNotFound):

   generateResult(ScriptResult(ScriptResult.FAILED, errorMessage))

 

def getFullUrl(url, params):

 

   paramsStr = urlencode(params)

   urlObj = urlparse(url)

   urlQuery = urlObj.query

 

   if paramsStr:

       if urlQuery:

           urlQuery = '%s&%s' % (urlQuery, paramsStr)

       else:

           urlQuery = paramsStr

 

   urlObj = urlObj._replace(query=urlQuery)

 

   return urlunparse(urlObj)

 

# ##################################################################################################

#

# Nutanix

#

# ##################################################################################################

 

class AuthenticationException(Exception):

   def __init__(self):

       super(AuthenticationException, self).__init__()

 

 

class NutanixException(Exception):

   def __init__(self, message=""):

       super(NutanixException, self).__init__(message)

       self.message = message

 

class NutanixExceptionNotFound(NutanixException):

   def __init__(self, message=""):

       super(NutanixExceptionNotFound, self).__init__(message)

 

class NutanixCluster:

   """

   Describes the attributes and methods required to work with the Nutanix cluster API

   """

   def __init__(self, nutanixClusterIp, nutanixLogin, nutanixPass):

       """Constructor"""

       self.nutanixClusterIp = nutanixClusterIp

       self.nutanixLogin = nutanixLogin

       self.nutanixPass = nutanixPass

       self.nutanixApi1 = "https://%s:9440/PrismGateway/services/rest/v1" % self.nutanixClusterIp

       self.nutanixApi2 = "https://%s:9440/PrismGateway/services/rest/v2.0" % self.nutanixClusterIp

       self.nutanixApi3 = "https://%s:9440/api/nutanix/v3" % self.nutanixClusterIp

 

   @staticmethod

   def getErrorMessage(jResponse):

 

       if "message" in jResponse:              # for Nutanix API v1 & v2

           return jResponse["message"]

       elif "message_list" in jResponse:       # for Nutanix API v3

           message = ""

           for jError in jResponse["message_list"]:

               if "message" in jError:

                   if len(message):

                       message += "; "

                   message += jError["message"]

               else:

                   raise requests.exceptions.RequestException

           return message

       else:

           raise requests.exceptions.RequestException

 

   @staticmethod

   def logRequest(level, httpMethod, urlFull, body=''):

 

       method = httpMethod.upper()

 

       bodyCanExist = False

       if method == 'POST' or method == 'PUT' or method == 'PATCH':

           bodyCanExist = True

 

       if bodyCanExist:

           logging.log(level, "HTTP Request (%s %s): %s", method, urlFull, "" if body else "none")

           if body:

               logging.log(level, body)

       else:

           logging.log(level, "HTTP Request (%s %s)", method, urlFull)

 

   @staticmethod

   def logResponse(level, response):

 

       logging.log(level, "HTTP Response (%s %s): %s", response.status_code, response.reason, "" if response.text else "none")

       if response.text:

           logging.log(level, response.text)

 

   def executeHttpRequest(self, httpMethod, url, body='', params=''):

       """

       Execute http/https requests

       :param httpMethod: (get, post ...)

       :param url:

       :param body: message body

       :param params: query string parameters in the URL

       :return:

       """

       response = None

       s = requests.Session()

       s.auth = (self.nutanixLogin, self.nutanixPass)

       s.headers.update({'Content-Type': 'application/json; charset=utf-8'})

 

       urlFull = getFullUrl(url, params)

       reqRespLogged = logging.root.level <= logging.DEBUG

 

       #

       # Do request

       #

 

       self.logRequest(logging.DEBUG, httpMethod, urlFull, body)

 

       if httpMethod == 'get':

           response = s.get(url, data=body, params=params, verify=False, timeout=10)

       elif httpMethod == 'post':

           response = s.post(url, data=body, params=params, verify=False, timeout=10)

       else:

           raise NotImplementedError("HTTP Method '%s' not supported")

 

       self.logResponse(logging.DEBUG, response)

 

       #

       # Check to failed request & log

       #

 

       isSuccess = response.status_code in (200, 201, 202)

 

       if not isSuccess and not reqRespLogged:

           self.logRequest(logging.ERROR, httpMethod, urlFull, body)

           self.logResponse(logging.ERROR, response)

 

       #

       # Process response

       #

 

       if isSuccess:

           return response.json()

       else:

           isJson = response.headers['Content-Type'].lower().find("application/json") != -1

 

           if response.status_code == 401:  # Not Found

               raise AuthenticationException()

 

           if isJson:

 

               message = self.getErrorMessage(response.json())

               logging.error("Nutanix request failed. Nutanix error: %s" % message)

 

               if response.status_code == 404:                 # Not Found

                   raise NutanixExceptionNotFound(message)

               else:

                   raise NutanixException(message)

 

           else:

 

               logging.error("Nutanix request failed.")

 

           raise requests.exceptions.RequestException

 

   def executeHttpRequestV1(self, httpMethod, relativeUrl, body='', params=''):

       url = "%s/%s" % (self.nutanixApi1, relativeUrl)

       return self.executeHttpRequest(httpMethod, url, body, params)

 

   def executeHttpRequestV2(self, httpMethod, relativeUrl, body='', params=''):

       url = "%s/%s" % (self.nutanixApi2, relativeUrl)

       return self.executeHttpRequest(httpMethod, url, body, params)

 

   def executeHttpRequestV3(self, httpMethod, relativeUrl, body='', params=''):

       url = "%s/%s" % (self.nutanixApi3, relativeUrl)

       return self.executeHttpRequest(httpMethod, url, body, params)

 

   def getPdVms(self, pdName):

       """

       Get vm_id for the Protection Domain

       :param pdName: Protection Domain name

       :return pdVms: returns a list of vms_id for the Protection Domain

       """

       pdVms = []

       try:

           logging.info("Getting vm_id list for Protection Domain='%s'", pdName)

           data = self.executeHttpRequestV2('get', "protection_domains/%s" % pdName)

           for vms in data["vms"]:

               pdVms.append(vms["vm_id"])

 

       except Exception as e:

           logging.error("Getting vm_id list from Protection Domain='%s' failed. Error: %s.", pdName, str(e))

           logging.exception(str(e))

           generateResultFailed("Getting vm_id list for Protection Domain='%s' failed." % pdName, e)

       logging.info("Getting vm_id list vm_id='%s' success", pdVms)

       return pdVms

 

   def getVmsInfoEx(self, vmsUuids, mustExist=True):

       """

       Obtaining information about vms, for example ip address

       Similarly, you can get vms data about cpu, memory, etc.

       This method is created as an example.

       :param vmsUuids: list of vm_id

       :return vmsInfo: returns a list of vm ip address

       """

       vmsInfo = []

       logging.info("Getting vms started. UUIDs: %s", vmsUuids)

 

       for vmUuid in vmsUuids:

 

           vmInfo = {"uuid": vmUuid, "vm": None}

 

           try:

               data = self.executeHttpRequestV3('get', "vms/%s" % vmUuid)

               vmInfo["vm"] = data

               vmsInfo.append(vmInfo)

           except Exception as e:

 

               if isinstance(e, NutanixExceptionNotFound):

                   if not mustExist:

                       vmsInfo.append(vmInfo)

                       continue

 

               logging.error("Getting vms failed. VM UUID: %s", vmUuid)

               logging.exception(str(e))

               generateResultFailed("Getting vms failed.", e)

 

       logging.info("Getting vms success. UUIDs: %s", vmsUuids)

 

       return vmsInfo

 

   def getVmsInfo(self, vmsUuids, mustExist=True):

 

       vmsInfo = []

       vmsInfoEx = self.getVmsInfoEx(vmsUuids, mustExist)

 

       for vmInfo in vmsInfoEx:

           if vmInfo["vm"]:                    # if exist on Nutanix --> add to list

               vmsInfo.append(vmInfo["vm"])

 

       return vmsInfo

 

   def createPdSnapshot(self, pdName):

       """

       Creating vms snapshot for the Protection Domain

       :param pdName:

       :return:

       """

       schedule_id = None

       snapshotInfo = dict()

 

       body = json.dumps({"app_consistent": "false"})

 

       try:

           logging.info("Creating vms snapshot for Protection Domain='%s'", pdName)

           data = self.executeHttpRequestV2('post', "protection_domains/%s/oob_schedules" % pdName, body)

           schedule_id = data["schedule_id"]

       except Exception as e:

           logging.error("Creating vms snapshot for Protection Domain='%s' failed" % pdName)

           logging.exception(str(e))

           generateResultFailed("Creating vms snapshot for Protection Domain='%s' failed." % pdName,  e)

 

       params = {'oob_schedule_ids': schedule_id}

       state = ''

       isStateAvailable = False

 

       for i in range(10):

           time.sleep(5) #waiting when information about snapshot be available

           try:

               logging.info("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s'", schedule_id, pdName)

               data = self.executeHttpRequestV2('get', "protection_domains/%s/dr_snapshots/" % pdName, '', params)

               try:

                   state = data["entities"][0]["state"]

               except:

                   state = None

               if state == 'AVAILABLE':

                   isStateAvailable = True

                   break

           except Exception as e:

               logging.error("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s' failed", schedule_id, pdName)

               logging.exception(str(e))

               logging.info("Unfreeze vms for Protection Domain='%s'", pdName)

               cluster.unFreezeVms(pdVms)

               generateResultFailed("Getting snapshot status oob_schedule_ids='%s' for Protection Domain='%s' failed." % (schedule_id, pdName), e)

 

       if not isStateAvailable:

           logging.error("Unexpected snapshot status='%s' oob_schedule_ids='%s' for Protection Domain='%s'", state, schedule_id, pdName)

           logging.info("Unfreeze vms for Protection Domain='%s'", pdName)

           cluster.unFreezeVms(pdVms)

           generateResultFailed("Snapshot oob_schedule_ids='%s' status error (status='%s') for Protection Domain='%s'." % (state, schedule_id, pdName))

 

       logging.info("Creating vms snapshot for Protection Domain='%s' success", pdName)

       pdSnapshotId = data["entities"][0]["snapshot_id"]

       pdSnapshotUuid = data["entities"][0]["snapshot_uuid"]

       snapshotInfo['schedule_id'] = schedule_id

       snapshotInfo['pdSnapshotId'] = pdSnapshotId

       snapshotInfo['pdSnapshotUuid'] = pdSnapshotUuid

       return snapshotInfo

 

   def freezeVms(self, pdVmsList):

       """

       Freezing vms

       :param vmsInfo: vms data needed to freeze

       :return:

       """

       logging.info("Freezing vms for Protection Domain='%s'", pdName)

       # here should be added code for freezing vms"

       logging.info("Freezing vms for Protection Domain='%s' success", pdName)

       pass

 

   def unFreezeVms(self, pdVms):

       """

       unfreezing vms

       :param pdVms:  vms data needed to unfreeze

       :return:

       """

       logging.info("Unfreezing vms for Protection Domain='%s'", pdName)

       # here should be added code for unfreezing vms"

       logging.info("Unfreezing vms for Protection Domain='%s' success", pdName)

       pass

 

def doSomeChecks(pdVms):

   # here should be added code for PD VMs check

   # for example get vms ip addresses

   logging.info("Do some checks")

 

   vmsInfo = []

   vmsInfoEx = cluster.getVmsInfoEx(pdVms, False)

 

   for vmInfo in vmsInfoEx:

       if vmInfo["vm"]:  # if exist on Nutanix --> add to list

           vmsInfo.append(vmInfo["vm"])

       else:

           logging.warning("VM with UUID='%s' will not backup. VM in PD does not exist in cluster.", vmInfo["uuid"])

 

   logging.info("Do some checks success")

   # here should be added error message (in case of failed checks)

 

 

# ##################################################################################################

#

# Main

#

# ##################################################################################################

 

if __name__ == "__main__":

 

   cluster = NutanixCluster(nutanixClusterIp, nutanixLogin, nutanixPass)

 

   pdVms = cluster.getPdVms(pdName)

   if not pdVms:

       generateResultFailed("Protection Domain '%s' doesn't have vms." % pdName)

 

   doSomeChecks(pdVms)

   cluster.freezeVms(pdVms)

   snapshotInfo = cluster.createPdSnapshot(pdName)

   cluster.unFreezeVms(pdVms)

 

   logging.info("All steps succeeded")

 

   result = ScriptResult()

   result.oob_schedule_id = snapshotInfo['schedule_id']

 

   generateResult(result)