[Working Blog] ONTAP Snapshot Reporting in Python - Part 9 - The Finished Script

The complete script is below.

Saves as say: ontapSnapshotReportGenerator.py 

And run (say from a Windows command prompt) as:

ontapSnapshotReportGenerator.py --cluster CLUSTER -u USERNAME -p PASSWORD -o OUTPUTFILE

And an example of the CSV output is below:


#######################################################
## ONTAP SNAPSHOTS (BACKUP) REPORT GENERATOR v 0.001 ##
#######################################################

## IMPORT STATEMENTS ##

import argparse,csv,datetime,pytz,sys
from datetime import timedelta
from netapp_ontap import config, HostConnection
from netapp_ontap.resources import Cluster,Svm,Volume
from netapp_ontap.resources import SnapshotPolicy,Schedule,Snapshot

## FUNCTIONS ##

''' cronPeriod: Used to parse cron schedules.'''
def cronPeriod(x,period):
  try:
    if period == "minutes":
      return x.cron.minutes
    elif period == "hours":
      return x.cron.hours
    elif period == "days":
      return x.cron.days
    elif period == "weekdays":
      return x.cron.weekdays
    elif period == "months":
      return x.cron.months
    else:
      return ['X']
  except AttributeError:
    return ['X']

''' getSnapshots: Used to generate a list of expected snapshots from snapshot policy schedule.'''
''' getSnapshots(datetime   ,str   ,int  ,list   ,list ,list,list    ,list)'''
def getSnapshots(localtime,prefix,count,minutes,hours,days,weekdays,months): 
    ''' Variables '''
    xTime     = localtime + datetime.timedelta(minutes=1)
    go        = True
    snapCount = 0
    snapList  = []  
    ''' The Go Back In A Minute at a Time Loop '''
    while go:
        match    = False
        xTime    = xTime - timedelta(minutes=1)
        xYear    = xTime.year     
        xMonth   = xTime.month    
        xDay     = xTime.day      
        xWeekDay = xTime.weekday()
        xHour    = xTime.hour     
        xMinute  = xTime.minute
        ''' Checking months '''
        if months != ['X']:
            if xMonth not in set(months):
                xTime = datetime.datetime(xYear,xMonth,1,0,0)
                continue
                ''' otherwise we might have a match '''
        ''' Checking day/weekday '''
        if days != ['X'] and weekdays != ['X']:
            if xDay not in set(days) or xWeekDay not in set(weekdays):
                xTime  = datetime.datetime(xYear,xMonth,xDay,0,0)	  
                continue
                ''' otherwise we might have a match '''
        ''' Checking hour '''
        if hours != ['X']:
            if xHour not in set(hours):
                xTime = datetime.datetime(xYear,xMonth,xDay,xHour,0)
                continue
                ''' otherwise we might have a match '''
        ''' Checking minute '''
        if xMinute in set(minutes):
            if hours == ['X'] and days == ['X'] and weekdays == ['X'] and months == ['X']:
                match = True
            elif xHour in set(hours):
                if days == ['X'] and weekdays == ['X'] and months == ['X']:
                    match = True
                elif xDay in set(days) or xWeekDay in set(weekdays):
                    if months == ['X']:
                        match = True
                    elif xMonth in set(months):
                        match = True
        ''' The snapshot '''
        if match:	
            ''' Snap Name at time x (PREFIX.YYYY-MM-DD_HHmm) '''
            xSnapName  = prefix + "."
            xSnapName += str(xYear) + "-"
            xSnapName += "{:02d}".format(xMonth) + "-"
            xSnapName += "{:02d}".format(xDay) + "_"
            xSnapName += "{:02d}".format(xHour)
            xSnapName += "{:02d}".format(xMinute)
            snapList  += [xSnapName]
            snapCount += 1
        ''' Count check '''
        if snapCount >= count:
            go = False
    ''' Return the list of snaps! '''
    return snapList

## MAIN PROGRAM ##

def main() -> None:
    print('ONTAP Snapshot (Backup) Report Generator')
    parser=argparse.ArgumentParser(description="ONTAP Snapshot (Backup) Report.")
    parser.add_argument('-c', '--cluster',    type=str, required=True)
    parser.add_argument('-u', '--username',   type=str, required=True)
    parser.add_argument('-p', '--password',   type=str, required=True)
    parser.add_argument('-o', '--outputFile', type=str, required=True)
    args       = parser.parse_args()
    cluster    = args.cluster
    username   = args.username
    password   = args.password
    outputFile = args.outputFile
    print('Cluster:', cluster, 'Output File:', outputFile)
    
    ## 1) CONNECT TO CLUSTER ##
    conn = HostConnection(
        host=cluster,
        username=username,
        password=password,
        verify=False
    )    
    config.CONNECTION = conn
    clus = Cluster()
    
    try:
        clus.get()
    except Exception as e:
        print( "Failed to connect to cluster!")
        return "Failed to connect to cluster!"
        
    ## 2) GET SNAPSHOT POLICES AND SNAPSHOT POLICY SCHEDULES ##    
    snapPolicyUUIDs      = []
    snapPolicyNames      = []
    snapPolicyUUIDtoName = {}
    snapPolicyNameToUUID = {}
    snapPoliciesByUUID   = {}
    
    for policy in SnapshotPolicy.get_collection():
        policy.get()
        snapPolicyUUIDs += [policy.uuid]
        snapPolicyNames += [policy.name]
        snapPolicyUUIDtoName[policy.uuid] = policy.name
        snapPolicyNameToUUID[policy.name] = policy.uuid
        snapPoliciesByUUID[policy.uuid]   = policy

    ## 3) GET CRON SCHEDULES ##
    schedUUIDs = []
    schedNames = []
    schedUUIDtoName = {}
    schedNameToUUID = {}
    schedByUUID = {}

    for s in Schedule.get_collection(type="cron"):
        schedUUIDs += [s.uuid]
        schedNames += [s.name]
        schedUUIDtoName[s.uuid] = s.name
        schedNameToUUID[s.name] = s.uuid
        schedByUUID[s.uuid] = {}
        schedByUUID[s.uuid]["minutes"]  = cronPeriod(s, "minutes")
        schedByUUID[s.uuid]["hours"]    = cronPeriod(s, "hours")
        schedByUUID[s.uuid]["days"]     = cronPeriod(s, "days")
        schedByUUID[s.uuid]["weekdays"] = cronPeriod(s, "weekdays")
        schedByUUID[s.uuid]["months"]   = cronPeriod(s, "months")
        ''' Handling some conditions '''
        if schedByUUID[s.uuid]["months"] != ['X']:
            if schedByUUID[s.uuid]["days"] == ['X'] and schedByUUID[s.uuid]["weekdays"] == ['X']:
                schedByUUID[s.uuid]["days"] = [1]
        if schedByUUID[s.uuid]["days"] != ['X'] or schedByUUID[s.uuid]["weekdays"] != ['X']:
            if schedByUUID[s.uuid]["hours"] == ['X']:
                schedByUUID[s.uuid]["hours"] = [0]
    
    ''' Changing ONTAP day 0 = Sunday, to Python datetime day 0 = Monday '''
    for u in schedUUIDs:
        if schedByUUID[u]['weekdays'] != ['X']:
            newWeekdays = []
            for w in schedByUUID[u]['weekdays']:
                x = w - 1
                if x == -1:
                    x = 6
                newWeekdays += [x]
            schedByUUID[u]['weekdays'] = newWeekdays

    ## 4) GET VOLUMES ##
    volUUIDs      = []
    volNames      = []
    volUUIDtoName = {}
    volNameToUUID = {}
    volByUUID     = {}

    for v in Volume.get_collection():
        volUUIDs += [v.uuid]
        volNames += [v.name]
        volUUIDtoName[v.uuid] = v.name
        volNameToUUID[v.name] = v.uuid
        void = v.get()
        volByUUID[v.uuid] = {}
        volByUUID[v.uuid]["create_time"]     = v.create_time
        volByUUID[v.uuid]["snapshot_policy"] = v.snapshot_policy.name
        volByUUID[v.uuid]["state"]           = v.state
        volByUUID[v.uuid]["style"]           = v.style
        volByUUID[v.uuid]["svm"]             = v.svm.name
    
    ## 5) GET UTC TIME AND CONVERT TO LOCAL TIMEZONE ##    
    clus.get()
    utcDateTime = clus.statistics.timestamp
    local_tz    = pytz.timezone(clus.timezone.name)
    localDT     = utcDateTime.astimezone(local_tz)
    
    ## 6) ACQUIRE SNAPSHOTS ##
    snapsByVolUUID = {}
    snapDataByVolUUID = {}

    for u in volUUIDs:
        snaps = Snapshot.get_collection(u,fields="create_time")
        snapsByVolUUID[u] = []
        snapDataByVolUUID[u] = {}
        for s in snaps:
            snapsByVolUUID[u] += [s.name]
            snapDataByVolUUID[u][s.name] = {}
            snapDataByVolUUID[u][s.name]['create_time'] = s.create_time

    ## 7) PER SNAPSHOT POLICY CREATE LIST OF EXPECTED SNAPSHOTS ##
    snapPolSnapList = {}

    for UUID in snapPolicyUUIDs:
        snapPolSnapList[UUID] = []
        try:
            for s in snapPoliciesByUUID[UUID].copies:
                prefix    = s.prefix
                count     = s.count
                zCronUUID = s.schedule.uuid	
                minutes   = schedByUUID[zCronUUID]['minutes']
                hours     = schedByUUID[zCronUUID]['hours']
                days      = schedByUUID[zCronUUID]['days']
                weekdays  = schedByUUID[zCronUUID]['weekdays']
                months    = schedByUUID[zCronUUID]['months']
                snapPolSnapList[UUID] += getSnapshots(localDT,prefix,count,minutes,hours,days,weekdays,months)
        except AttributeError:
            void = "The snapshot policy has no schedules."

    ## 8) OUTPUT TO CSV ##
    outputCSV = []

    for u in volUUIDs:
        if volByUUID[u]['style'] != 'flexvol':
            ''' Only considering FlexVols (the idea is to skip FlexGroups) '''
            continue
        sp = volByUUID[u]['snapshot_policy']
        spUUID = snapPolicyNameToUUID[sp]
        expectedSnaps = snapPolSnapList[spUUID]
        for s in expectedSnaps:
            if s in set(snapsByVolUUID[u]):
                success = "SUCCESS"
                createTime = str(snapDataByVolUUID[u][s]['create_time'])
            else:
                success = "FAILURE"
                createTime = ""
            outDict = {}
            outDict['SVM']              = volByUUID[u]['svm']
            outDict['Volume']           = volUUIDtoName[u]
            outDict['Snapshot Name']    = s
            outDict['Snapshot Policy']  = volByUUID[u]['snapshot_policy']
            outDict['Success (exists)'] = success
            outDict['Create Time']      = createTime
            outputCSV += [outDict]

    with open(outputFile, 'w', newline='') as csvfile:
        fieldnames = ['SVM','Volume','Snapshot Name','Snapshot Policy','Success (exists)','Create Time']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for r in outputCSV:
            writer.writerow(r)
    
if __name__ == "__main__":
    main()

THE END (likely I'll need to tweak it / develop it further)

Comments