Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/aleph/db/accessors/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,30 @@ def count_address_credit_history(
)

return session.execute(query).scalar_one()


def get_resource_consumed_credits(
session: DbSession,
item_hash: str,
) -> int:
"""
Calculate the total credits consumed by a specific resource.

Aggregates all credit_history entries where:
- payment_method = 'credit_expense'
- origin = item_hash (the resource identifier)

Args:
session: Database session
item_hash: The item hash of the resource (message hash)

Returns:
Total credits consumed by the resource
"""
query = select(func.sum(func.abs(AlephCreditHistoryDb.amount))).where(
(AlephCreditHistoryDb.payment_method == "credit_expense")
& (AlephCreditHistoryDb.origin == item_hash)
)

result = session.execute(query).scalar()
return result or 0
5 changes: 5 additions & 0 deletions src/aleph/schemas/api/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,8 @@ class GetAccountCreditHistoryResponse(BaseModel):
pagination_page: int
pagination_total: int
pagination_per_page: int


class GetResourceConsumedCreditsResponse(BaseModel):
item_hash: str
consumed_credits: int
24 changes: 24 additions & 0 deletions src/aleph/web/controllers/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_balances_by_chain,
get_credit_balance,
get_credit_balances,
get_resource_consumed_credits,
get_total_detailed_balance,
)
from aleph.db.accessors.cost import get_total_cost_for_address
Expand All @@ -33,9 +34,11 @@
GetAccountQueryParams,
GetBalancesChainsQueryParams,
GetCreditBalancesQueryParams,
GetResourceConsumedCreditsResponse,
)
from aleph.types.db_session import DbSessionFactory
from aleph.web.controllers.app_state_getters import get_session_factory_from_request
from aleph.web.controllers.utils import get_item_hash_str_from_request


def make_stats_dict(stats) -> Dict[str, Any]:
Expand Down Expand Up @@ -268,3 +271,24 @@ async def get_account_credit_history(request: web.Request) -> web.Response:
)

return web.json_response(text=response.model_dump_json())


async def get_resource_consumed_credits_controller(
request: web.Request,
) -> web.Response:
"""Returns the total credits consumed by a specific resource (item_hash)."""
item_hash = get_item_hash_str_from_request(request)

session_factory: DbSessionFactory = get_session_factory_from_request(request)

with session_factory() as session:
consumed_credits = get_resource_consumed_credits(
session=session, item_hash=item_hash
)

response = GetResourceConsumedCreditsResponse(
item_hash=item_hash,
consumed_credits=consumed_credits,
)

return web.json_response(text=response.model_dump_json())
36 changes: 9 additions & 27 deletions src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
get_node_cache_from_request,
get_session_factory_from_request,
)
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue
from aleph.web.controllers.utils import (
get_item_hash_from_request,
mq_make_aleph_message_topic_queue,
)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -408,14 +411,7 @@ def _get_message_with_status(


async def view_message(request: web.Request):
item_hash_str = request.match_info.get("item_hash")
if not item_hash_str:
raise web.HTTPUnprocessableEntity(text=f"Invalid message hash: {item_hash_str}")

try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")
item_hash = get_item_hash_from_request(request)

session_factory: DbSessionFactory = request.app["session_factory"]
with session_factory() as session:
Expand All @@ -430,14 +426,7 @@ async def view_message(request: web.Request):


async def view_message_content(request: web.Request):
item_hash_str = request.match_info.get("item_hash")
if not item_hash_str:
raise web.HTTPUnprocessableEntity(text=f"Invalid message hash: {item_hash_str}")

try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")
item_hash = get_item_hash_from_request(request)

session_factory: DbSessionFactory = request.app["session_factory"]
with session_factory() as session:
Expand All @@ -455,28 +444,21 @@ async def view_message_content(request: web.Request):
or not isinstance(message_with_status.message, PostMessage)
):
raise web.HTTPUnprocessableEntity(
text=f"Invalid message hash status {status} for hash {item_hash_str}"
text=f"Invalid message hash status {status} for hash {item_hash}"
)

message_type = message_with_status.message.type
if message_type != MessageType.post:
raise web.HTTPUnprocessableEntity(
text=f"Invalid message hash type {message_type} for hash {item_hash_str}"
text=f"Invalid message hash type {message_type} for hash {item_hash}"
)

content = message_with_status.message.content.content
return web.json_response(text=json.dumps(content))


async def view_message_status(request: web.Request):
item_hash_str = request.match_info.get("item_hash")
if not item_hash_str:
raise web.HTTPUnprocessableEntity(text=f"Invalid message hash: {item_hash_str}")

try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")
item_hash = get_item_hash_from_request(request)

session_factory: DbSessionFactory = request.app["session_factory"]
with session_factory() as session:
Expand Down
25 changes: 14 additions & 11 deletions src/aleph/web/controllers/prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
get_session_factory_from_request,
get_storage_service_from_request,
)
from aleph.web.controllers.utils import get_item_hash_from_request

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,25 +68,19 @@ class MessagePrice(DataClassJsonMixin):
required_tokens: Optional[Decimal] = None


async def get_executable_message(session: DbSession, item_hash_str: str) -> MessageDb:
async def get_executable_message(session: DbSession, item_hash: ItemHash) -> MessageDb:
"""Attempt to get an executable message from the database.
Raises an HTTP exception if the message is not found, not processed or is not an executable message.
"""

# Parse the item_hash_str into an ItemHash object
try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")

# Get the message status from the database
message_status_db = get_message_status(session=session, item_hash=item_hash)
if not message_status_db:
raise web.HTTPNotFound(body=f"Message not found with hash: {item_hash}")
# Loop through the status_exceptions to find a match and raise the corresponding exception
if message_status_db.status in MESSAGE_STATUS_EXCEPTIONS:
exception, error_message = MESSAGE_STATUS_EXCEPTIONS[message_status_db.status]
raise exception(body=f"{error_message}: {item_hash_str}")
raise exception(body=f"{error_message}: {item_hash}")
assert message_status_db.status == MessageStatus.PROCESSED

# Get the message from the database
Expand All @@ -98,7 +93,7 @@ async def get_executable_message(session: DbSession, item_hash_str: str) -> Mess
MessageType.store,
):
raise web.HTTPBadRequest(
body=f"Message is not an executable or store message: {item_hash_str}"
body=f"Message is not an executable or store message: {item_hash}"
)

return message
Expand All @@ -109,7 +104,7 @@ async def message_price(request: web.Request):

session_factory = get_session_factory_from_request(request)
with session_factory() as session:
item_hash = request.match_info["item_hash"]
item_hash = get_item_hash_from_request(request)
message = await get_executable_message(session, item_hash)
content: ExecutableContent = message.parsed_content

Expand Down Expand Up @@ -201,7 +196,15 @@ async def recalculate_message_costs(request: web.Request):
if item_hash_param:
# Recalculate costs for a specific message
try:
message = await get_executable_message(session, item_hash_param)
# Parse the item_hash_param into an ItemHash object
try:
item_hash = ItemHash(item_hash_param)
except ValueError:
raise web.HTTPBadRequest(
body=f"Invalid message hash: {item_hash_param}"
)

message = await get_executable_message(session, item_hash)
messages_to_recalculate = [message]
except HTTPException:
raise
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/web/controllers/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def register_routes(app: web.Application):
"/api/v0/addresses/{address}/credit_history",
accounts.get_account_credit_history,
)
app.router.add_get(
"/api/v0/messages/{item_hash}/consumed_credits",
accounts.get_resource_consumed_credits_controller,
)

app.router.add_post("/api/v0/ipfs/add_json", storage.add_ipfs_json_controller)
app.router.add_post("/api/v0/storage/add_json", storage.add_storage_json_controller)
Expand Down
31 changes: 31 additions & 0 deletions src/aleph/web/controllers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import aiohttp_jinja2
from aiohttp import web
from aiohttp.web_request import FileField
from aleph_message.models import ItemHash
from aleph_p2p_client import AlephP2PServiceClient
from configmanager import Config
from pydantic import BaseModel
Expand Down Expand Up @@ -411,3 +412,33 @@ def add_grace_period_for_file(session: DbSession, file_hash: str, hours: int):
created=utc_now(),
delete_by=delete_by,
)


def get_item_hash_str_from_request(request: web.Request) -> str:
"""
Extract and validate item_hash string from request path parameters.
Raises HTTPUnprocessableEntity if item_hash is missing.
"""
item_hash_str = request.match_info.get("item_hash")
if not item_hash_str:
raise web.HTTPUnprocessableEntity(text="Item hash must be specified.")
return item_hash_str


def get_item_hash_from_request(request: web.Request) -> ItemHash:
"""
Extract and validate item_hash from request path parameters.
Returns an ItemHash object.
Raises HTTPUnprocessableEntity if item_hash is missing.
Raises HTTPBadRequest if item_hash format is invalid.
"""
item_hash_str = request.match_info.get("item_hash")
if not item_hash_str:
raise web.HTTPUnprocessableEntity(text=f"Invalid message hash: {item_hash_str}")

try:
item_hash = ItemHash(item_hash_str)
except ValueError:
raise web.HTTPBadRequest(body=f"Invalid message hash: {item_hash_str}")

return item_hash
Loading
Loading