Skip to content
125 changes: 125 additions & 0 deletions examples/sdk_examples/action_report/action_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Enterprise Action Report SDK Example."""

import getpass
import sqlite3

from keepersdk.authentication import configuration, endpoint, keeper_auth, login_auth
from keepersdk.constants import KEEPER_PUBLIC_HOSTS
from keepersdk.enterprise import action_report, enterprise_loader, sqlite_enterprise_storage
from keepersdk.errors import KeeperApiError

TARGET_STATUS = 'no-logon'
DAYS_SINCE = 30


def login():
config = configuration.JsonConfigurationStorage()

if not config.get().last_server:
print("Available server options:")
for region, host in KEEPER_PUBLIC_HOSTS.items():
print(f" {region}: {host}")
server = input('Enter server (default: keepersecurity.com): ').strip() or 'keepersecurity.com'
config.get().last_server = server
else:
server = config.get().last_server

keeper_endpoint = endpoint.KeeperEndpoint(config, server)
login_auth_context = login_auth.LoginAuth(keeper_endpoint)
username = config.get().last_login or input('Enter username: ')

login_auth_context.resume_session = True
login_auth_context.login(username)

while not login_auth_context.login_step.is_final():
step = login_auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print("Device approval request sent. Approve this device and press Enter.")
input()
elif isinstance(step, login_auth.LoginStepPassword):
step.verify_password(getpass.getpass('Enter password: '))
elif isinstance(step, login_auth.LoginStepTwoFactor):
channel = step.get_channels()[0]
step.send_code(channel.channel_uid, getpass.getpass(f'Enter 2FA code for {channel.channel_name}: '))
else:
raise NotImplementedError(f"Unsupported login step: {type(step).__name__}")

if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected):
return login_auth_context.login_step.take_keeper_auth()
return None


def print_report(entries, target_status, days_since):
action_text = (
'\tCOMMAND: NONE (No action specified)\n'
'\tSTATUS: n/a\n'
'\tSERVER MESSAGE: n/a\n'
'\tAFFECTED: 0'
)
status_display = target_status[0].upper() + target_status[1:]

print(f'\nAdmin Action Taken:\n{action_text}\n')
print('Note: the following reflects data prior to any administrative action being applied')
print(f'{len(entries)} User(s) With "{status_display}" Status Older Than {days_since} Day(s):\n')

if not entries:
return

headers = ['User ID', 'Email', 'Name', 'Status', 'Transfer Status', 'Node']
col_widths = [14, 31, 22, 8, 17, 19]

print(' '.join(f'{h:<{w}}' for h, w in zip(headers, col_widths)))
print(' '.join('-' * w for w in col_widths))

for entry in entries:
row = [
str(entry.enterprise_user_id), entry.email, entry.full_name,
entry.status, entry.transfer_status, entry.node_path
]
print(' '.join(f'{str(v)[:w]:<{w}}' for v, w in zip(row, col_widths)))


def generate_action_report(keeper_auth_context: keeper_auth.KeeperAuth):
if not keeper_auth_context.auth_context.is_enterprise_admin:
print("ERROR: This operation requires enterprise admin privileges.")
keeper_auth_context.close()
return

enterprise = None
try:
conn = sqlite3.Connection('file::memory:', uri=True)
enterprise_id = keeper_auth_context.auth_context.enterprise_id or 0
storage = sqlite_enterprise_storage.SqliteEnterpriseStorage(lambda: conn, enterprise_id)
enterprise = enterprise_loader.EnterpriseLoader(keeper_auth_context, storage)

config = action_report.ActionReportConfig(
target_user_status=TARGET_STATUS,
days_since=DAYS_SINCE
)
generator = action_report.ActionReportGenerator(
enterprise.enterprise_data, keeper_auth_context, loader=enterprise, config=config
)
entries = generator.generate_report()
print_report(entries, TARGET_STATUS, DAYS_SINCE)

except KeeperApiError as e:
print(f"\nAPI Error: {e}")
except Exception as e:
print(f"\nError: {e}")
finally:
if enterprise:
enterprise.close()
keeper_auth_context.close()


def main():
auth = login()
if auth:
generate_action_report(auth)
else:
print("Login failed.")


if __name__ == "__main__":
main()
123 changes: 123 additions & 0 deletions examples/sdk_examples/aging_report/aging_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Example: Password aging report using Keeper SDK."""

import datetime
import getpass
import sqlite3
import sys
import traceback

from keepersdk.authentication import login_auth, configuration, endpoint, keeper_auth
from keepersdk.enterprise import enterprise_loader, sqlite_enterprise_storage, aging_report
from keepersdk.errors import KeeperApiError


TABLE_WIDTH = 140
COL_WIDTHS = (30, 30, 25, 10, 45)
HEADERS = ['Owner', 'Title', 'Password Changed', 'Shared', 'Record URL']


def login():
config = configuration.JsonConfigurationStorage()
server = config.get().last_server or 'keepersecurity.com'

keeper_endpoint = endpoint.KeeperEndpoint(config, server)
auth_context = login_auth.LoginAuth(keeper_endpoint)
auth_context.resume_session = True

username = config.get().last_login
if not username:
print("Error: No saved login found. Please run with interactive login first.")
return None, None

auth_context.login(username)

while not auth_context.login_step.is_final():
step = auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print("Device approval required. Approve and press Enter.")
input()
elif isinstance(step, login_auth.LoginStepPassword):
step.verify_password(getpass.getpass('Enter password: '))
elif isinstance(step, login_auth.LoginStepTwoFactor):
channel = step.get_channels()[0]
code = getpass.getpass(f'Enter 2FA code for {channel.channel_name}: ')
step.send_code(channel.channel_uid, code)
else:
raise NotImplementedError(f"Unsupported login step: {type(step).__name__}")

if isinstance(auth_context.login_step, login_auth.LoginStepConnected):
return auth_context.login_step.take_keeper_auth(), server
return None, None


def format_row(values):
return ' '.join(
f"{str(val or '')[:w-1]:<{w}}"
for val, w in zip(values, COL_WIDTHS + (20,) * (len(values) - len(COL_WIDTHS)))
)


def print_report(rows, title):
print(f"\n{title}")
print('=' * TABLE_WIDTH)
print(format_row(HEADERS))
print('-' * TABLE_WIDTH)

for row in rows:
display_row = list(row)
display_row[3] = 'True' if display_row[3] else 'False'
print(format_row(display_row))

print('=' * TABLE_WIDTH)
print(f"\nFound {len(rows)} record(s) with aging passwords")


def generate_report(auth: keeper_auth.KeeperAuth, server: str):
if not auth.auth_context.is_enterprise_admin:
print("ERROR: This operation requires enterprise admin privileges.")
return 1

enterprise = None
try:
conn = sqlite3.Connection('file::memory:', uri=True)
enterprise_id = auth.auth_context.enterprise_id or 0
storage = sqlite_enterprise_storage.SqliteEnterpriseStorage(lambda: conn, enterprise_id)
enterprise = enterprise_loader.EnterpriseLoader(auth, storage)

print('\nThe default password aging period is 3 months\n')
print('Loading record password change information...')

config = aging_report.AgingReportConfig(server=server)
generator = aging_report.AgingReportGenerator(enterprise.enterprise_data, auth, config)
rows = list(generator.generate_report_rows())

cutoff_dt = datetime.datetime.now() - datetime.timedelta(days=aging_report.DEFAULT_PERIOD_DAYS)
title = f'Aging Report: Records With Passwords Last Modified Before {cutoff_dt.strftime("%Y/%m/%d %H:%M:%S")}'

print_report(rows, title)
return 0

except KeeperApiError as e:
print(f"API Error: {e}")
return 1
except Exception as e:
print(f"Error generating aging report: {e}")
traceback.print_exc()
return 1
finally:
if enterprise:
enterprise.close()
auth.close()


def main():
auth, server = login()
if not auth:
print("Login failed. Unable to generate aging report.")
return 1
return generate_report(auth, server)


if __name__ == "__main__":
sys.exit(main() or 0)
Loading