__all__ = ["Cursor"]
from collections import deque
from typing import Any, Deque, Optional, Sequence
from arango.connection import BaseConnection
from arango.exceptions import (
CursorCloseError,
CursorCountError,
CursorEmptyError,
CursorNextError,
CursorStateError,
)
from arango.request import Request
from arango.typings import Json
[docs]class Cursor:
"""Cursor API wrapper.
Cursors fetch query results from ArangoDB server in batches. Cursor objects
are *stateful* as they store the fetched items in-memory. They must not be
shared across threads without proper locking mechanism.
:param connection: HTTP connection.
:param init_data: Cursor initialization data.
:type init_data: dict
:param cursor_type: Cursor type ("cursor" or "export").
:type cursor_type: str
"""
__slots__ = [
"_conn",
"_type",
"_id",
"_count",
"_cached",
"_stats",
"_profile",
"_warnings",
"_has_more",
"_batch",
]
def __init__(
self,
connection: BaseConnection,
init_data: Json,
cursor_type: str = "cursor",
) -> None:
self._conn = connection
self._type = cursor_type
self._batch: Deque[Any] = deque()
self._id = None
self._count: Optional[int] = None
self._cached = None
self._stats = None
self._profile = None
self._warnings = None
self._update(init_data)
def __iter__(self) -> "Cursor":
return self
def __next__(self) -> Any: # pragma: no cover
return self.next()
def __enter__(self) -> "Cursor":
return self
def __len__(self) -> int:
if self._count is None:
raise CursorCountError("cursor count not enabled")
return self._count
def __exit__(self, *_: Any) -> None:
self.close(ignore_missing=True)
def __repr__(self) -> str:
return f"<Cursor {self._id}>" if self._id else "<Cursor>"
def _update(self, data: Json) -> Json:
"""Update the cursor using data from ArangoDB server.
:param data: Cursor data from ArangoDB server (e.g. results).
:type data: dict
:return: Update cursor data.
:rtype: dict
"""
result: Json = {}
if "id" in data:
self._id = data["id"]
result["id"] = data["id"]
if "count" in data:
self._count = data["count"]
result["count"] = data["count"]
if "cached" in data:
self._cached = data["cached"]
result["cached"] = data["cached"]
self._has_more = bool(data["hasMore"])
result["has_more"] = data["hasMore"]
self._batch.extend(data["result"])
result["batch"] = data["result"]
if "extra" in data:
extra = data["extra"]
if "profile" in extra:
self._profile = extra["profile"]
result["profile"] = extra["profile"]
if "warnings" in extra:
self._warnings = extra["warnings"]
result["warnings"] = extra["warnings"]
if "stats" in extra:
stats = extra["stats"]
if "writesExecuted" in stats:
stats["modified"] = stats.pop("writesExecuted")
if "writesIgnored" in stats:
stats["ignored"] = stats.pop("writesIgnored")
if "scannedFull" in stats:
stats["scanned_full"] = stats.pop("scannedFull")
if "scannedIndex" in stats:
stats["scanned_index"] = stats.pop("scannedIndex")
if "executionTime" in stats:
stats["execution_time"] = stats.pop("executionTime")
if "httpRequests" in stats:
stats["http_requests"] = stats.pop("httpRequests")
if "cursorsCreated" in stats:
stats["cursorsCreated"] = stats.pop("cursorsCreated")
if "cursorsRearmed" in stats:
stats["cursorsRearmed"] = stats.pop("cursorsRearmed")
if "cacheHits" in stats:
stats["cacheHits"] = stats.pop("cacheHits")
if "cacheMisses" in stats:
stats["cacheMisses"] = stats.pop("cacheMisses")
self._stats = stats
result["statistics"] = stats
return result
@property
def id(self) -> Optional[str]:
"""Return the cursor ID.
:return: Cursor ID.
:rtype: str
"""
return self._id
@property
def type(self) -> str:
"""Return the cursor type.
:return: Cursor type ("cursor" or "export").
:rtype: str
"""
return self._type
[docs] def batch(self) -> Optional[Deque[Any]]:
"""Return the current batch of results.
:return: Current batch.
:rtype: collections.deque
"""
return self._batch
[docs] def has_more(self) -> Optional[bool]:
"""Return True if more results are available on the server.
:return: True if more results are available on the server.
:rtype: bool
"""
return self._has_more
[docs] def count(self) -> Optional[int]:
"""Return the total number of documents in the entire result set.
:return: Total number of documents, or None if the count option
was not enabled during cursor initialization.
:rtype: int | None
"""
return self._count
[docs] def cached(self) -> Optional[bool]:
"""Return True if results are cached.
:return: True if results are cached.
:rtype: bool
"""
return self._cached
[docs] def statistics(self) -> Optional[Json]:
"""Return cursor statistics.
:return: Cursor statistics.
:rtype: dict
"""
return self._stats
[docs] def profile(self) -> Optional[Json]:
"""Return cursor performance profile.
:return: Cursor performance profile.
:rtype: dict
"""
return self._profile
[docs] def warnings(self) -> Optional[Sequence[Json]]:
"""Return any warnings from the query execution.
:return: Warnings, or None if there are none.
:rtype: [str]
"""
return self._warnings
[docs] def empty(self) -> bool:
"""Check if the current batch is empty.
:return: True if current batch is empty, False otherwise.
:rtype: bool
"""
return len(self._batch) == 0
[docs] def next(self) -> Any:
"""Pop the next item from the current batch.
If current batch is empty/depleted, an API request is automatically
sent to ArangoDB server to fetch the next batch and update the cursor.
:return: Next item in current batch.
:raise StopIteration: If the result set is depleted.
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self.empty():
if not self.has_more():
raise StopIteration
self.fetch()
return self.pop()
[docs] def pop(self) -> Any:
"""Pop the next item from current batch.
If current batch is empty/depleted, an exception is raised. You must
call :func:`arango.cursor.Cursor.fetch` to manually fetch the next
batch from server.
:return: Next item in current batch.
:raise arango.exceptions.CursorEmptyError: If current batch is empty.
"""
if len(self._batch) == 0:
raise CursorEmptyError("current batch is empty")
return self._batch.popleft()
[docs] def fetch(self) -> Json:
"""Fetch the next batch from server and update the cursor.
:return: New batch details.
:rtype: dict
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
request = Request(method="put", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)
if not resp.is_success:
raise CursorNextError(resp, request)
return self._update(resp.body)
[docs] def close(self, ignore_missing: bool = False) -> Optional[bool]:
"""Close the cursor and free any server resources tied to it.
:param ignore_missing: Do not raise exception on missing cursors.
:type ignore_missing: bool
:return: True if cursor was closed successfully, False if cursor was
missing on the server and **ignore_missing** was set to True, None
if there are no cursors to close server-side (e.g. result set is
smaller than the batch size).
:rtype: bool | None
:raise arango.exceptions.CursorCloseError: If operation fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
return None
request = Request(method="delete", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)
if resp.is_success:
return True
if resp.status_code == 404 and ignore_missing:
return False
raise CursorCloseError(resp, request)