diff --git a/requirements.txt b/requirements.txt index 40d02ef1..1f5f75b7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,8 @@ boto3 kubernetes openshift coldfront >= 1.1.0 +pandas +pyarrow pydantic python-cinderclient # TODO: Set version for OpenStack Clients python-keystoneclient diff --git a/setup.cfg b/setup.cfg index 9a41f59b..d88cf8bf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,8 @@ install_requires = python-swiftclient requests < 3.0 simplejson < 4.0 + pandas + pyarrow pytz [options.packages.find] diff --git a/src/coldfront_plugin_cloud/management/commands/fetch_daily_billable_usage.py b/src/coldfront_plugin_cloud/management/commands/fetch_daily_billable_usage.py new file mode 100644 index 00000000..93b447d0 --- /dev/null +++ b/src/coldfront_plugin_cloud/management/commands/fetch_daily_billable_usage.py @@ -0,0 +1,290 @@ +from decimal import Decimal +from datetime import datetime, timedelta, timezone +from dataclasses import dataclass +import functools +import logging +import os +import tempfile +from typing import Optional + +from coldfront_plugin_cloud import attributes +from coldfront.core.utils.common import import_from_settings +from coldfront_plugin_cloud import usage_models +from coldfront_plugin_cloud.usage_models import UsageInfo, validate_date_str +from coldfront_plugin_cloud import utils + +import boto3 +from django.core.management.base import BaseCommand +from coldfront.core.resource.models import Resource +from coldfront.core.allocation.models import Allocation +from coldfront.core.utils import mail +import pandas +import pyarrow +from pandas.core.groupby.generic import DataFrameGroupBy + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +RESOURCES_DAILY_ENABLED = ["NERC-OCP", "NERC-EDU", "NERC"] +RESOURCE_NAME_TO_FILE = { + "NERC": "NERC OpenStack", + "NERC-OCP": "ocp-prod", + "NERC-OCP-EDU": "academic", +} +STORAGE_FILE = "NERC Storage" +ALLOCATION_STATES_TO_PROCESS = ["Active", "Active (Needs Renewal)"] + +INVOICE_COLUMN_ALLOCATION_ID = "Project - Allocation ID" +INVOICE_COLUMN_SU_TYPE = "SU Type" +INVOICE_COLUMN_COST = "Cost" + +S3_KEY_ID = os.getenv("S3_INVOICING_ACCESS_KEY_ID") +S3_SECRET = os.getenv("S3_INVOICING_SECRET_ACCESS_KEY") +S3_ENDPOINT = os.getenv( + "S3_INVOICING_ENDPOINT_URL", "https://s3.us-east-005.backblazeb2.com" +) +S3_BUCKET = os.getenv("S3_INVOICING_BUCKET", "nerc-invoicing") + +CENTER_BASE_URL = import_from_settings("CENTER_BASE_URL") +EMAIL_SENDER = import_from_settings("EMAIL_SENDER") +EMAIL_ENABLED = import_from_settings("EMAIL_ENABLED") +EMAIL_TEMPLATE = """Dear New England Research Cloud user, + +Your {resource.name} {resource.type} Allocation in project {allocation.project.title} has reached your preset Alert value. + +- As of midnight last night, your Allocation reached or exceeded your preset Alert value of {alert_value}. +- To view your Allocation information visit {url}/allocation/{allocation.id} + +Thank you, +New England Research Cloud (NERC) +https://nerc.mghpcc.org/ +""" + + +@dataclass() +class TotalByDate(object): + date: str + total: Decimal + + def __str__(self): + return f"{self.date}: {self.total} USD" + + +class Command(BaseCommand): + help = "Fetch daily billable usage." + + @property + def previous_day(self): + return datetime.now(timezone.utc) - timedelta(days=1) + + @property + def previous_day_string(self): + return self.previous_day.strftime("%Y-%m-%d") + + def add_arguments(self, parser): + parser.add_argument( + "--date", type=str, default=self.previous_day_string, help="Date." + ) + + def handle(self, *args, **options): + date = options["date"] + validate_date_str(date) + + allocations = self.get_allocations_for_daily_billing() + + for allocation in allocations: + resource = allocation.resources.first() + allocation_project_id = allocation.get_attribute( + attributes.ALLOCATION_PROJECT_ID + ) + + if not allocation_project_id: + logger.warning( + f"Allocation {allocation.id} is in an active state without a Project ID attribute. Skipping." + ) + continue + + previous_total = self.get_total_from_attribute(allocation) + + try: + # We must ensure both the cluster charges for the allocation and the storage + # charges are both processed otherwise the value will be misleading. + cluster_usage = self.get_allocation_usage( + resource.name, date, allocation_project_id + ) + storage_usage = self.get_allocation_usage( + STORAGE_FILE, date, allocation_project_id + ) + new_usage = usage_models.merge_models(cluster_usage, storage_usage) + except Exception as e: + logger.error( + f"Unable to get daily billable usage from {resource.name}, skipping {allocation_project_id}: {e}" + ) + continue + + # Only update the latest value if the processed date is newer or same date. + if not previous_total or date >= previous_total.date: + new_total = TotalByDate(date, new_usage.total_charges) + + self.set_total_on_attribute(allocation, new_total) + self.handle_alerting(allocation, previous_total, new_total) + + @staticmethod + def get_daily_location_for_prefix(prefix: str, date: str): + """Formats the S3 location for a given prefix and date. + + For example, the service invoices for the Resource of type OpenStack and name + NERC are located in /Invoices//Service Invoices/NERC OpenStack """ + return f"Invoices/{usage_models.get_invoice_month_from_date(date)}/Service Invoices/{prefix} {date}.csv" + + @staticmethod + def get_allocations_for_daily_billing(): + """Fetches all allocations of the production resources that are in the two Active states.""" + return Allocation.objects.filter( + resources__name__in=RESOURCES_DAILY_ENABLED, + status__name__in=ALLOCATION_STATES_TO_PROCESS, + ) + + @staticmethod + def set_total_on_attribute(allocation, total_by_date: TotalByDate): + """Adds the cumulative charges attribute to a resource.""" + attribute_value = str(total_by_date) + utils.set_attribute_on_allocation( + allocation, attributes.ALLOCATION_CUMULATIVE_CHARGES, attribute_value + ) + + @staticmethod + def get_total_from_attribute(allocation: Allocation) -> Optional[TotalByDate]: + """Load the total and date from the allocation attribute. + + The format is : USD""" + total = allocation.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES) + if not total: + return None + + try: + date, total = total.rstrip(" USD").split(": ") + return TotalByDate(date=date, total=Decimal(total)) + except ValueError as e: + logger.warning( + f"Unable to parse total from attribute for allocation {allocation.id}: {e}" + ) + return None + + @functools.cached_property + def s3_client(self): + if not S3_KEY_ID or not S3_SECRET: + raise Exception( + "Must provide S3_INVOICING_ACCESS_KEY_ID and" + " S3_INVOICING_SECRET_ACCESS_KEY environment variables." + ) + + s3 = boto3.client( + "s3", + endpoint_url=S3_ENDPOINT, + aws_access_key_id=S3_KEY_ID, + aws_secret_access_key=S3_SECRET, + ) + return s3 + + @staticmethod + @functools.cache + def load_csv(location) -> DataFrameGroupBy: + df = pandas.read_csv( + location, + dtype={INVOICE_COLUMN_COST: pandas.ArrowDtype(pyarrow.decimal128(12, 2))}, + ) + return df.groupby(INVOICE_COLUMN_ALLOCATION_ID) + + @functools.cache + def load_service_invoice(self, resource: str, date_str: str) -> DataFrameGroupBy: + """Fetches the dataframe of an invoice from S3.""" + if resource in RESOURCE_NAME_TO_FILE: + resource = RESOURCE_NAME_TO_FILE[resource] + + key = self.get_daily_location_for_prefix(resource, date_str) + with tempfile.TemporaryDirectory() as tmpdir: + filename = os.path.basename(key) + download_location = os.path.join(tmpdir, filename) + logger.info(f"Downloading invoice {key} to {download_location}.") + self.s3_client.download_file(S3_BUCKET, key, download_location) + return self.load_csv(download_location) + + def get_allocation_usage( + self, resource: str, date_str: str, allocation_id + ) -> UsageInfo: + """Loads the service invoice and parse UsageInfo for a specific allocation.""" + invoice = self.load_service_invoice(resource, date_str) + + try: + df = invoice.get_group(allocation_id)[ + [INVOICE_COLUMN_SU_TYPE, INVOICE_COLUMN_COST] + ] + except KeyError: + logger.debug(f"No usage for allocation {allocation_id}.") + return UsageInfo({}) + + return UsageInfo( + df.set_index(INVOICE_COLUMN_SU_TYPE)[INVOICE_COLUMN_COST].to_dict() + ) + + @classmethod + def handle_alerting( + cls, allocation, previous_total: TotalByDate, new_total: TotalByDate + ): + allocation_alerting_value = allocation.get_attribute( + attributes.ALLOCATION_ALERT + ) + already_alerted = False + + if allocation_alerting_value is None: + # Allocation alerting value attribute is not present on this allocation. + utils.set_attribute_on_allocation( + allocation, attributes.ALLOCATION_ALERT, 0 + ) + return + + if allocation_alerting_value <= 0: + # 0 is the default and does not send any alerts. + return + + if previous_total and previous_total.total > allocation_alerting_value: + if usage_models.is_date_same_month(previous_total.date, new_total.date): + # Alerting value has already been exceeded, do not alert again. + already_alerted = True + + if new_total.total > allocation_alerting_value: + logger.info( + f"{allocation.id} of {allocation.project.title} exceeded" + f"alerting value of {allocation_alerting_value}." + ) + if not already_alerted and EMAIL_ENABLED: + try: + cls.send_alert_email( + allocation, + allocation.resources.first().name, + allocation_alerting_value, + ) + logger.info( + f"Sent alert email to PI of {allocation.id} of {allocation.project.title}" + f"for exceeding alert value." + ) + except Exception as e: + logger.error( + f"Unable to send alert email to PI of {allocation.id} of {allocation.project.title}: {e}" + ) + + @staticmethod + def send_alert_email(allocation: Allocation, resource: Resource, alert_value): + mail.send_mail( + subject="Allocation Usage Alert", + message=EMAIL_TEMPLATE.format( + allocation=allocation, + resource=resource, + alert_value=alert_value, + url=CENTER_BASE_URL, + ), + from_email=EMAIL_SENDER, + recipient_list=[allocation.project.pi.email], + ) diff --git a/src/coldfront_plugin_cloud/tests/unit/test_fetch_daily_billable_usage.py b/src/coldfront_plugin_cloud/tests/unit/test_fetch_daily_billable_usage.py new file mode 100644 index 00000000..43f82e4d --- /dev/null +++ b/src/coldfront_plugin_cloud/tests/unit/test_fetch_daily_billable_usage.py @@ -0,0 +1,194 @@ +import io + +from unittest.mock import Mock, patch + + +from coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage import ( + Command, +) +from coldfront_plugin_cloud import attributes +from coldfront_plugin_cloud import usage_models +from coldfront_plugin_cloud.tests import base +from coldfront_plugin_cloud import utils + +from django.core.management import call_command + + +TEST_INVOICE = """ +Project - Allocation ID,SU Type,Cost +test-allocation-1,OpenStack CPU,100.25 +test-allocation-1,OpenStack V100 GPU,500.37 +test-allocation-2,OpenStack CPU,0.25 +""" + + +class TestFetchDailyBillableUsage(base.TestBase): + def test_get_daily_location_for_prefix(self): + self.assertEqual( + Command.get_daily_location_for_prefix("Test", "2025-11-01"), + "Invoices/2025-11/Service Invoices/Test 2025-11-01.csv", + ) + + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.Command.s3_client" + ) + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.Command.load_csv" + ) + def test_fetch_service_invoice_from_s3(self, mock_load_csv, mock_s3_client): + c = Command() + mock_s3_client.download_file = Mock() + + c.load_service_invoice("Test", "2025-11-01") + + self.assertEqual(mock_s3_client.download_file.call_count, 1) + call_args = mock_s3_client.download_file.call_args[0] + self.assertEqual(call_args[0], "nerc-invoicing") + self.assertEqual( + call_args[1], "Invoices/2025-11/Service Invoices/Test 2025-11-01.csv" + ) + + download_location = call_args[2] + mock_load_csv.assert_called_once_with(download_location) + + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.Command.load_service_invoice" + ) + def test_read_csv_and_get_allocation_usage(self, mock_load_service_invoice): + c = Command() + + # We mock the test CSV with StringIO + test_invoice_data = io.StringIO(TEST_INVOICE) + invoice = c.load_csv(test_invoice_data) + mock_load_service_invoice.return_value = invoice + + usage_info = c.get_allocation_usage("Test", "2025-01-11", "test-allocation-1") + usage_info_dict = usage_models.to_dict(usage_info) + + self.assertEqual(usage_info_dict["OpenStack CPU"], "100.25") + self.assertEqual(usage_info_dict["OpenStack V100 GPU"], "500.37") + + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.RESOURCES_DAILY_ENABLED", + ["FakeProd"], + ) + def test_get_allocations_for_daily_billing(self): + fakeprod = self.new_openstack_resource( + name="FakeProd", internal_name="FakeProd" + ) + fakedev = self.new_openstack_resource(name="FakeDev", internal_name="FakeDev") + + prod_project = self.new_project() + dev_project = self.new_project() + + prod_allocation_1 = self.new_allocation( + project=prod_project, resource=fakeprod, quantity=1, status="Active" + ) + prod_allocation_2 = self.new_allocation( + project=prod_project, + resource=fakeprod, + quantity=1, + status="Active (Needs Renewal)", + ) + prod_allocation_3 = self.new_allocation( + project=prod_project, resource=fakeprod, quantity=1, status="Denied" + ) + + dev_allocation_1 = self.new_allocation( + project=dev_project, resource=fakedev, quantity=1, status="Active" + ) + + returned_allocations = Command.get_allocations_for_daily_billing() + returned_allocation_ids = [x.id for x in returned_allocations] + + self.assertEqual(len(returned_allocations), 2) + self.assertIn(prod_allocation_1.id, returned_allocation_ids) + self.assertIn(prod_allocation_2.id, returned_allocation_ids) + self.assertNotIn(prod_allocation_3, returned_allocation_ids) + self.assertNotIn(dev_allocation_1, returned_allocation_ids) + + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.EMAIL_ENABLED", + True, + ) + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.RESOURCES_DAILY_ENABLED", + ["FakeProd"], + ) + @patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.Command.get_allocation_usage" + ) + def test_call_command(self, mock_get_allocation_usage): + mock_get_allocation_usage.side_effect = [ + usage_models.UsageInfo({"CPU": "100.00"}), + usage_models.UsageInfo({"Storage": "30.12"}), + ] + + fakeprod = self.new_openstack_resource( + name="FakeProd", internal_name="FakeProd" + ) + prod_project = self.new_project() + allocation_1 = self.new_allocation( + project=prod_project, resource=fakeprod, quantity=1, status="Active" + ) + utils.set_attribute_on_allocation( + allocation_1, attributes.ALLOCATION_PROJECT_ID, "test-allocation-1" + ) + + call_command("fetch_daily_billable_usage", date="2025-11-15") + + self.assertEqual( + allocation_1.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES), + "2025-11-15: 130.12 USD", + ) + + utils.set_attribute_on_allocation( + allocation_1, attributes.ALLOCATION_ALERT, 200 + ) + + # Testing backfill + mock_get_allocation_usage.side_effect = [ + usage_models.UsageInfo({"CPU": "50.00"}), + usage_models.UsageInfo({"CPU": "30.12"}), + ] + call_command("fetch_daily_billable_usage", date="2025-11-14") + + # Previous date doesn't update the allocation attribute + self.assertEqual( + allocation_1.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES), + "2025-11-15: 130.12 USD", + ) + + # Testing reprocessing of same date overwrites previous value. + mock_get_allocation_usage.side_effect = [ + usage_models.UsageInfo({"CPU": "40.00"}), + usage_models.UsageInfo({"Storage": "10.00"}), + ] + call_command("fetch_daily_billable_usage", date="2025-11-15") + self.assertEqual( + allocation_1.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES), + "2025-11-15: 50.00 USD", + ) + + # Future date updates the allocation attribute and triggers alerting. + mock_get_allocation_usage.side_effect = [ + usage_models.UsageInfo({"CPU": "165.00"}), + usage_models.UsageInfo({"Storage": "60.00"}), + ] + with patch( + "coldfront_plugin_cloud.management.commands.fetch_daily_billable_usage.Command.send_alert_email" + ) as mock: + call_command("fetch_daily_billable_usage", date="2025-11-16") + mock.assert_called_once_with(allocation_1, fakeprod.name, 200) + self.assertEqual( + allocation_1.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES), + "2025-11-16: 225.00 USD", + ) + + # Unable to fetch daily billable usage preserves previous values. + mock_get_allocation_usage.side_effect = ValueError + call_command("fetch_daily_billable_usage", date="2025-11-17") + self.assertEqual( + allocation_1.get_attribute(attributes.ALLOCATION_CUMULATIVE_CHARGES), + "2025-11-16: 225.00 USD", + ) diff --git a/src/coldfront_plugin_cloud/tests/unit/test_usage_models.py b/src/coldfront_plugin_cloud/tests/unit/test_usage_models.py index 7638a38d..d1f3e81f 100644 --- a/src/coldfront_plugin_cloud/tests/unit/test_usage_models.py +++ b/src/coldfront_plugin_cloud/tests/unit/test_usage_models.py @@ -63,3 +63,33 @@ def test_previous_charges_dict(self): # Invalid month format should raise ValidationError with self.assertRaises(ValidationError): usage_models.PreviousChargesDict(root={"2025-11-01": {"su": 1.0}}) + + def test_get_month_from_date(self): + self.assertEqual( + usage_models.get_invoice_month_from_date("2025-11-30"), "2025-11" + ) + self.assertEqual( + usage_models.get_invoice_month_from_date("2025-07-30"), "2025-07" + ) + + def test_is_same_month(self): + self.assertTrue(usage_models.is_date_same_month("2025-01-01", "2025-01-15")) + self.assertFalse(usage_models.is_date_same_month("2025-01-01", "2025-02-15")) + + def test_merge_models(self): + ui1 = usage_models.UsageInfo({"OpenStack CPU": "100.00"}) + ui2 = usage_models.UsageInfo({"OpenStack NESE Storage": "35.00"}) + ui_merged = usage_models.merge_models(ui1, ui2) + + self.assertEqual( + ui_merged.model_dump(mode="json"), + {"OpenStack CPU": "100.00", "OpenStack NESE Storage": "35.00"}, + ) + + def test_merge_models_different_types_fails(self): + ui1 = usage_models.UsageInfo({"OpenStack CPU": "100.00"}) + cc1 = usage_models.CumulativeChargesDict( + {"2025-11-01": {"OpenStack CPU": "100.00"}} + ) + + self.assertRaises(ValueError, usage_models.merge_models, ui1, cc1) diff --git a/src/coldfront_plugin_cloud/usage_models.py b/src/coldfront_plugin_cloud/usage_models.py index 9d53f0bb..3020d9d0 100644 --- a/src/coldfront_plugin_cloud/usage_models.py +++ b/src/coldfront_plugin_cloud/usage_models.py @@ -16,12 +16,44 @@ def validate_month_str(v: str) -> str: return v +def get_invoice_month_from_date(date_str: str) -> str: + return datetime.datetime.strptime(date_str, "%Y-%m-%d").strftime("%Y-%m") + + +def is_date_same_month(date1_str: str, date2_str: str) -> bool: + """Compares that two dates are within the same month.""" + return get_invoice_month_from_date(date1_str) == get_invoice_month_from_date( + date2_str + ) + + +def to_dict(model): + """Exports the pydantic model to dictionary format. + + The JSON mode argument ensures Decimals are converted to strings.""" + + return model.model_dump(mode="json") + + +def merge_models(model1, model2): + """Merges two instances of the same pydantic model.""" + if type(model1) is type(model2): + return model1.__class__(to_dict(model1) | to_dict(model2)) + else: + raise ValueError("Different types of models can't be merged.") + + DateField = Annotated[str, pydantic.AfterValidator(validate_date_str)] MonthField = Annotated[str, pydantic.AfterValidator(validate_month_str)] class UsageInfo(pydantic.RootModel[dict[str, Decimal]]): - pass + @functools.cached_property + def total_charges(self) -> Decimal: + total = Decimal("0.00") + for su_charge in self.root.values(): + total += su_charge + return total T = TypeVar("T", bound=str) @@ -41,9 +73,7 @@ def check_month(self): if self.root: months = set() for date_str in self.root.keys(): - months.add( - datetime.datetime.strptime(date_str, "%Y-%m-%d").strftime("%Y-%m") - ) + months.add(get_invoice_month_from_date(date_str)) if len(months) != 1: raise ValueError("All dates must be within the same month") @@ -53,8 +83,7 @@ def check_month(self): def total_charges(self) -> Decimal: total = Decimal("0.00") if most_recent_charges := self.root.get(self.most_recent_date): - for su_charge in most_recent_charges.root.values(): - total += su_charge + total = most_recent_charges.total_charges return total