__all__ = [
"StandardDatabase",
"AsyncDatabase",
"BatchDatabase",
"OverloadControlDatabase",
"TransactionDatabase",
]
from datetime import datetime
from numbers import Number
from typing import Any, Dict, List, Optional, Sequence, Union
from warnings import warn
from arango.api import ApiGroup
from arango.aql import AQL
from arango.backup import Backup
from arango.cluster import Cluster
from arango.collection import StandardCollection
from arango.connection import Connection
from arango.exceptions import (
AnalyzerCreateError,
AnalyzerDeleteError,
AnalyzerGetError,
AnalyzerListError,
AsyncJobClearError,
AsyncJobListError,
CollectionCreateError,
CollectionDeleteError,
CollectionListError,
DatabaseCompactError,
DatabaseCreateError,
DatabaseDeleteError,
DatabaseListError,
DatabasePropertiesError,
DatabaseSupportInfoError,
GraphCreateError,
GraphDeleteError,
GraphListError,
JWTSecretListError,
JWTSecretReloadError,
PermissionGetError,
PermissionListError,
PermissionResetError,
PermissionUpdateError,
ServerAvailableOptionsGetError,
ServerCurrentOptionsGetError,
ServerDetailsError,
ServerEchoError,
ServerEncryptionError,
ServerEngineError,
ServerExecuteError,
ServerLicenseGetError,
ServerLicenseSetError,
ServerLogLevelError,
ServerLogLevelResetError,
ServerLogLevelSetError,
ServerLogSettingError,
ServerLogSettingSetError,
ServerMetricsError,
ServerModeError,
ServerModeSetError,
ServerReadLogError,
ServerReloadRoutingError,
ServerRequiredDBVersionError,
ServerRoleError,
ServerRunTestsError,
ServerShutdownError,
ServerShutdownProgressError,
ServerStatisticsError,
ServerStatusError,
ServerTimeError,
ServerTLSError,
ServerTLSReloadError,
ServerVersionError,
TaskCreateError,
TaskDeleteError,
TaskGetError,
TaskListError,
TransactionExecuteError,
TransactionListError,
UserCreateError,
UserDeleteError,
UserGetError,
UserListError,
UserReplaceError,
UserUpdateError,
ViewCreateError,
ViewDeleteError,
ViewGetError,
ViewListError,
ViewRenameError,
ViewReplaceError,
ViewUpdateError,
)
from arango.executor import (
AsyncApiExecutor,
BatchApiExecutor,
DefaultApiExecutor,
OverloadControlApiExecutor,
TransactionApiExecutor,
)
from arango.formatter import (
format_body,
format_database,
format_server_status,
format_tls,
format_view,
)
from arango.foxx import Foxx
from arango.graph import Graph
from arango.job import BatchJob
from arango.pregel import Pregel
from arango.replication import Replication
from arango.request import Request
from arango.response import Response
from arango.result import Result
from arango.typings import Json, Jsons, Params
from arango.utils import get_col_name
from arango.wal import WAL
class Database(ApiGroup):
"""Base class for Database API wrappers."""
def __getitem__(self, name: str) -> StandardCollection:
"""Return the collection API wrapper.
:param name: Collection name.
:type name: str
:return: Collection API wrapper.
:rtype: arango.collection.StandardCollection
"""
return self.collection(name)
def _get_col_by_doc(self, document: Union[str, Json]) -> StandardCollection:
"""Return the collection of the given document.
:param document: Document ID or body with "_id" field.
:type document: str | dict
:return: Collection API wrapper.
:rtype: arango.collection.StandardCollection
:raise arango.exceptions.DocumentParseError: On malformed document.
"""
return self.collection(get_col_name(document))
@property
def name(self) -> str:
"""Return database name.
:return: Database name.
:rtype: str
"""
return self.db_name
@property
def aql(self) -> AQL:
"""Return AQL (ArangoDB Query Language) API wrapper.
:return: AQL API wrapper.
:rtype: arango.aql.AQL
"""
return AQL(self._conn, self._executor)
@property
def wal(self) -> WAL:
"""Return WAL (Write-Ahead Log) API wrapper.
:return: WAL API wrapper.
:rtype: arango.wal.WAL
"""
return WAL(self._conn, self._executor)
@property
def foxx(self) -> Foxx:
"""Return Foxx API wrapper.
:return: Foxx API wrapper.
:rtype: arango.foxx.Foxx
"""
return Foxx(self._conn, self._executor)
@property
def pregel(self) -> Pregel:
"""Return Pregel API wrapper.
:return: Pregel API wrapper.
:rtype: arango.pregel.Pregel
"""
return Pregel(self._conn, self._executor)
@property
def replication(self) -> Replication:
"""Return Replication API wrapper.
:return: Replication API wrapper.
:rtype: arango.replication.Replication
"""
return Replication(self._conn, self._executor)
@property
def cluster(self) -> Cluster: # pragma: no cover
"""Return Cluster API wrapper.
:return: Cluster API wrapper.
:rtype: arango.cluster.Cluster
"""
return Cluster(self._conn, self._executor)
@property
def backup(self) -> Backup:
"""Return Backup API wrapper.
:return: Backup API wrapper.
:rtype: arango.backup.Backup
"""
return Backup(self._conn, self._executor)
def properties(self) -> Result[Json]:
"""Return database properties.
:return: Database properties.
:rtype: dict
:raise arango.exceptions.DatabasePropertiesError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_api/database/current",
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise DatabasePropertiesError(resp, request)
return format_database(resp.body["result"])
return self._execute(request, response_handler)
def execute(self, command: str) -> Result[Any]:
"""Execute raw Javascript command on the server.
Executes the JavaScript code in the body on the server as
the body of a function with no arguments. If you have a
return statement then the return value you produce will be returned
as 'application/json'.
NOTE: this method endpoint will only be usable if the server
was started with the option `--javascript.allow-admin-execute true`.
The default value of this option is false, which disables the execution
of user-defined code and disables this API endpoint entirely.
This is also the recommended setting for production.
:param command: Javascript command to execute.
:type command: str
:return: Return value of **command**, if any.
:rtype: Any
:raise arango.exceptions.ServerExecuteError: If execution fails.
"""
request = Request(method="post", endpoint="/_admin/execute", data=command)
def response_handler(resp: Response) -> Any:
if not resp.is_success:
raise ServerExecuteError(resp, request)
return resp.body
return self._execute(request, response_handler)
def execute_transaction(
self,
command: str,
params: Optional[Json] = None,
read: Optional[Sequence[str]] = None,
write: Optional[Sequence[str]] = None,
sync: Optional[bool] = None,
timeout: Optional[Number] = None,
max_size: Optional[int] = None,
allow_implicit: Optional[bool] = None,
intermediate_commit_count: Optional[int] = None,
intermediate_commit_size: Optional[int] = None,
allow_dirty_read: bool = False,
) -> Result[Any]:
"""Execute raw Javascript command in transaction.
:param command: Javascript command to execute.
:type command: str
:param read: Names of collections read during transaction. If parameter
**allow_implicit** is set to True, any undeclared read collections
are loaded lazily.
:type read: [str] | None
:param write: Names of collections written to during transaction.
Transaction fails on undeclared write collections.
:type write: [str] | None
:param params: Optional parameters passed into the Javascript command.
:type params: dict | None
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param timeout: Timeout for waiting on collection locks. If set to 0,
ArangoDB server waits indefinitely. If not set, system default
value is used.
:type timeout: int | None
:param max_size: Max transaction size limit in bytes.
:type max_size: int | None
:param allow_implicit: If set to True, undeclared read collections are
loaded lazily. If set to False, transaction fails on any undeclared
collections.
:type allow_implicit: bool | None
:param intermediate_commit_count: Max number of operations after which
an intermediate commit is performed automatically.
:type intermediate_commit_count: int | None
:param intermediate_commit_size: Max size of operations in bytes after
which an intermediate commit is performed automatically.
:type intermediate_commit_size: int | None
:param allow_dirty_read: Allow reads from followers in a cluster.
:type allow_dirty_read: bool | None
:return: Return value of **command**.
:rtype: Any
:raise arango.exceptions.TransactionExecuteError: If execution fails.
"""
collections: Json = {"allowImplicit": allow_implicit}
if read is not None:
collections["read"] = read
if write is not None:
collections["write"] = write
data: Json = {"action": command}
if collections:
data["collections"] = collections
if params is not None:
data["params"] = params
if timeout is not None:
data["lockTimeout"] = timeout
if sync is not None:
data["waitForSync"] = sync
if max_size is not None:
data["maxTransactionSize"] = max_size
if intermediate_commit_count is not None:
data["intermediateCommitCount"] = intermediate_commit_count
if intermediate_commit_size is not None:
data["intermediateCommitSize"] = intermediate_commit_size
request = Request(
method="post",
endpoint="/_api/transaction",
data=data,
headers={"x-arango-allow-dirty-read": "true"} if allow_dirty_read else None,
)
def response_handler(resp: Response) -> Any:
if not resp.is_success:
raise TransactionExecuteError(resp, request)
return resp.body.get("result")
return self._execute(request, response_handler)
def list_transactions(self) -> Result[Jsons]:
"""Return the list of running stream transactions.
:return: The list of transactions, with each transaction
containing an "id" and a "state" field.
:rtype: List[Dict[str, Any]]
:raise arango.exceptions.TransactionListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/transaction")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TransactionListError(resp, request)
result: Jsons = resp.body["transactions"]
return result
return self._execute(request, response_handler)
def version(self, details: bool = False) -> Result[Any]:
"""Return ArangoDB server version.
:param details: Return more detailed version output
:type details: bool | None
:return: Server version.
:rtype: str
:raise arango.exceptions.ServerVersionError: If retrieval fails.
"""
request = Request(
method="get", endpoint="/_api/version", params={"details": details}
)
def response_handler(resp: Response) -> Any:
if not resp.is_success:
raise ServerVersionError(resp, request)
if not details:
return str(resp.body["version"])
else:
return resp.body
return self._execute(request, response_handler)
def details(self) -> Result[Json]:
"""Return ArangoDB server details.
:return: Server details.
:rtype: dict
:raise arango.exceptions.ServerDetailsError: If retrieval fails.
"""
request = Request(
method="get", endpoint="/_api/version", params={"details": True}
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body["details"]
return result
raise ServerDetailsError(resp, request)
return self._execute(request, response_handler)
def license(self) -> Result[Json]:
"""View the license information and status of an
Enterprise Edition instance. Can be called on
single servers, Coordinators, and DB-Servers.
:return: Server license.
:rtype: dict
:raise arango.exceptions.ServerLicenseGetError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/license")
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise ServerLicenseGetError(resp, request)
return self._execute(request, response_handler)
def set_license(self, license: str, force: bool = False) -> Result[Json]:
"""Set a new license for an Enterprise Edition
instance. Can be called on single servers, Coordinators,
and DB-Servers.
:param license: The Base64-encoded license string.
:type license: str
:param force: If set to True, the new license will be set even if
it expires sooner than the current license.
:type force: bool
:return: Server license.
:rtype: dict
:raise arango.exceptions.ServerLicenseError: If retrieval fails.
"""
request = Request(
method="put",
endpoint="/_admin/license",
params={"force": force},
data=license,
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise ServerLicenseSetError(resp, request)
return self._execute(request, response_handler)
def status(self) -> Result[Json]:
"""Return ArangoDB server status.
:return: Server status.
:rtype: dict
:raise arango.exceptions.ServerStatusError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_admin/status",
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerStatusError(resp, request)
return format_server_status(resp.body)
return self._execute(request, response_handler)
def compact(
self,
change_level: Optional[bool] = None,
compact_bottom_most_level: Optional[bool] = None,
) -> Result[Json]:
"""Compact all databases.
NOTE: This command can cause a full rewrite of all data in all databases,
which may take very long for large databases. It should thus only be used with
care and only when additional I/O load can be tolerated for a prolonged time.
This method can be used to reclaim disk space after substantial data deletions
have taken place, by compacting the entire database system data.
This method requires superuser access.
:param change_level: Whether or not compacted data should be moved to
the minimum possible level. Default value is False.
:type change_level: bool | None
:param compact_bottom_most_level: Whether or not to compact the
bottom-most level of data. Default value is False.
:type compact_bottom_most_level: bool | None
:return: Collection compact.
:rtype: dict
:raise arango.exceptions.CollectionCompactError: If retrieval fails.
"""
data = {}
if change_level is not None:
data["changeLevel"] = change_level
if compact_bottom_most_level is not None:
data["compactBottomMostLevel"] = compact_bottom_most_level
request = Request(method="put", endpoint="/_admin/compact", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise DatabaseCompactError(resp, request)
return self._execute(request, response_handler)
def required_db_version(self) -> Result[str]:
"""Return required version of target database.
:return: Required version of target database.
:rtype: str
:raise arango.exceptions.ServerRequiredDBVersionError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/database/target-version")
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["version"])
raise ServerRequiredDBVersionError(resp, request)
return self._execute(request, response_handler)
def engine(self) -> Result[Json]:
"""Return the database engine details.
:return: Database engine details.
:rtype: dict
:raise arango.exceptions.ServerEngineError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/engine")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ServerEngineError(resp, request)
return self._execute(request, response_handler)
def statistics(self, description: bool = False) -> Result[Json]:
"""Return server statistics.
:return: Server statistics.
:rtype: dict
:raise arango.exceptions.ServerStatisticsError: If retrieval fails.
"""
if description:
endpoint = "/_admin/statistics-description"
else:
endpoint = "/_admin/statistics"
request = Request(method="get", endpoint=endpoint)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ServerStatisticsError(resp, request)
return self._execute(request, response_handler)
def role(self) -> Result[str]:
"""Return server role.
:return: Server role. Possible values are "SINGLE" (server which is not
in a cluster), "COORDINATOR" (cluster coordinator), "PRIMARY",
"SECONDARY", "AGENT" (Agency node in a cluster) or "UNDEFINED".
:rtype: str
:raise arango.exceptions.ServerRoleError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/server/role")
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["role"])
raise ServerRoleError(resp, request)
return self._execute(request, response_handler)
def mode(self) -> Result[str]:
"""Return the server mode (default or read-only)
In a read-only server, all write operations will fail
with an error code of 1004 (ERROR_READ_ONLY). Creating or dropping
databases and collections will also fail with error code 11 (ERROR_FORBIDDEN).
:return: Server mode. Possible values are "default" or "readonly".
:rtype: str
:raise arango.exceptions.ServerModeError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/server/mode")
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["mode"])
raise ServerModeError(resp, request)
return self._execute(request, response_handler)
def set_mode(self, mode: str) -> Result[Json]:
"""Set the server mode to read-only or default.
Update mode information about a server. The JSON response will
contain a field mode with the value readonly or default.
In a read-only server all write operations will fail with an error
code of 1004 (ERROR_READ_ONLY). Creating or dropping of databases
and collections will also fail with error code 11 (ERROR_FORBIDDEN).
This is a protected API. It requires authentication and administrative
server rights.
:param mode: Server mode. Possible values are "default" or "readonly".
:type mode: str
:return: Server mode.
:rtype: str
:raise arango.exceptions.ServerModeSetError: If set fails.
"""
request = Request(
method="put", endpoint="/_admin/server/mode", data={"mode": mode}
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ServerModeSetError(resp, request)
return self._execute(request, response_handler)
def time(self) -> Result[datetime]:
"""Return server system time.
:return: Server system time.
:rtype: datetime.datetime
:raise arango.exceptions.ServerTimeError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/time")
def response_handler(resp: Response) -> datetime:
if not resp.is_success:
raise ServerTimeError(resp, request)
return datetime.fromtimestamp(resp.body["time"])
return self._execute(request, response_handler)
def echo(self, body: Optional[Any] = None) -> Result[Json]:
"""Return details of the last request (e.g. headers, payload),
or echo the given request body.
:param body: The body of the request. Can be of any type
and is simply forwarded. If not set, the details of the last
request are returned.
:type body: dict | list | str | int | float | None
:return: Details of the last request.
:rtype: dict
:raise arango.exceptions.ServerEchoError: If retrieval fails.
"""
request = (
Request(method="get", endpoint="/_admin/echo")
if body is None
else Request(method="post", endpoint="/_admin/echo", data=body)
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerEchoError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def shutdown(self, soft: bool = False) -> Result[bool]: # pragma: no cover
"""Initiate server shutdown sequence.
:param soft: If set to true, this initiates a soft shutdown. This is only
available on Coordinators. When issued, the Coordinator tracks a number
of ongoing operations, waits until all have finished, and then shuts
itself down normally. It will still accept new operations.
:type soft: bool
:return: True if the server was shutdown successfully.
:rtype: bool
:raise arango.exceptions.ServerShutdownError: If shutdown fails.
"""
request = Request(
method="delete", endpoint="/_admin/shutdown", params={"soft": soft}
)
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise ServerShutdownError(resp, request)
return True
return self._execute(request, response_handler)
def shutdown_progress(self) -> Result[Json]: # pragma: no cover
"""Query the soft shutdown progress. This call reports progress about a
soft Coordinator shutdown (DELETE /_admin/shutdown?soft=true). This API
is only available on Coordinators.
:return: Information about the shutdown progress.
:rtype: dict
:raise arango.exceptions.ServerShutdownError: If shutdown fails.
"""
request = Request(method="get", endpoint="/_admin/shutdown")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerShutdownProgressError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def run_tests(self, tests: Sequence[str]) -> Result[Json]: # pragma: no cover
"""Run available unittests on the server.
:param tests: List of files containing the test suites.
:type tests: [str]
:return: Test results.
:rtype: dict
:raise arango.exceptions.ServerRunTestsError: If execution fails.
"""
request = Request(method="post", endpoint="/_admin/test", data={"tests": tests})
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerRunTestsError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def read_log(
self,
upto: Optional[Union[int, str]] = None,
level: Optional[Union[int, str]] = None,
start: Optional[int] = None,
size: Optional[int] = None,
offset: Optional[int] = None,
search: Optional[str] = None,
sort: Optional[str] = None,
) -> Result[Json]:
"""Read the global log from server. This method is deprecated
in ArangoDB 3.8 and will be removed in a future version
of the driver. Use :func:`arango.database.Database.read_log_entries`
instead.
:param upto: Return the log entries up to the given level (mutually
exclusive with parameter **level**). Allowed values are "fatal",
"error", "warning", "info" (default) and "debug".
:type upto: int | str
:param level: Return the log entries of only the given level (mutually
exclusive with **upto**). Allowed values are "fatal", "error",
"warning", "info" (default) and "debug".
:type level: int | str
:param start: Return the log entries whose ID is greater or equal to
the given value.
:type start: int
:param size: Restrict the size of the result to the given value. This
can be used for pagination.
:type size: int
:param offset: Number of entries to skip (e.g. for pagination).
:type offset: int
:param search: Return only the log entries containing the given text.
:type search: str
:param sort: Sort the log entries according to the given fashion, which
can be "sort" or "desc".
:type sort: str
:return: Server log entries.
:rtype: dict
:raise arango.exceptions.ServerReadLogError: If read fails.
"""
m = "read_log() is deprecated in ArangoDB 3.8 and will be removed in a future version of the driver. Use read_log_entries() instead." # noqa: E501
warn(m, DeprecationWarning, stacklevel=2)
params = dict()
if upto is not None:
params["upto"] = upto
if level is not None:
params["level"] = level
if start is not None:
params["start"] = start
if size is not None:
params["size"] = size
if offset is not None:
params["offset"] = offset
if search is not None:
params["search"] = search
if sort is not None:
params["sort"] = sort
request = Request(method="get", endpoint="/_admin/log", params=params)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerReadLogError(resp, request)
result: Json = resp.body
if "totalAmount" in result:
resp.body["total_amount"] = resp.body.pop("totalAmount")
return result
return self._execute(request, response_handler)
def read_log_entries(
self,
upto: Optional[Union[int, str]] = None,
level: Optional[Union[int, str]] = None,
start: Optional[int] = None,
size: Optional[int] = None,
offset: Optional[int] = None,
search: Optional[str] = None,
sort: Optional[str] = None,
server_id: Optional[str] = None,
) -> Result[Json]:
"""Read the global log from server.
:param upto: Return the log entries up to the given level (mutually
exclusive with parameter **level**). Allowed values are "fatal",
"error", "warning", "info" (default) and "debug".
:type upto: int | str
:param level: Return the log entries of only the given level (mutually
exclusive with **upto**). Allowed values are "fatal", "error",
"warning", "info" (default) and "debug".
:type level: int | str
:param start: Return the log entries whose ID is greater or equal to
the given value.
:type start: int
:param size: Restrict the size of the result to the given value. This
can be used for pagination.
:type size: int
:param offset: Number of entries to skip (e.g. for pagination).
:type offset: int
:param search: Return only the log entries containing the given text.
:type search: str
:param sort: Sort the log entries according to the given fashion, which
can be "sort" or "desc".
:type sort: str
:param server_id: Returns all log entries of the specified server.
All other query parameters remain valid. If no serverId is given,
the asked server will reply. This parameter is only meaningful
on Coordinators.
:type server_id: str
:return: Server log entries.
:rtype: dict
:raise arango.exceptions.ServerReadLogError: If read fails.
"""
params = dict()
if upto is not None:
params["upto"] = upto
if level is not None:
params["level"] = level
if start is not None:
params["start"] = start
if size is not None:
params["size"] = size
if offset is not None:
params["offset"] = offset
if search is not None:
params["search"] = search
if sort is not None:
params["sort"] = sort
if server_id is not None:
params["serverId"] = server_id
request = Request(method="get", endpoint="/_admin/log/entries", params=params)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerReadLogError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def log_settings(self) -> Result[Json]:
"""Return the structured log settings.
:return: Current log settings. False values are not returned.
:rtype: dict
"""
request = Request(method="get", endpoint="/_admin/log/structured")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerLogSettingError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def set_log_settings(self, **kwargs: Dict[str, Any]) -> Result[Json]:
"""Set the structured log settings.
This method takes arbitrary keyword arguments where the keys are the
structured log parameters and the values are true or false, for either
enabling or disabling the parameters.
.. code-block:: python
arango.set_log_settings(
database=True,
url=True,
username=False,
)
:param kwargs: Structured log parameters.
:type kwargs: Dict[str, Any]
:return: New log settings. False values are not returned.
:rtype: dict
"""
request = Request(method="put", endpoint="/_admin/log/structured", data=kwargs)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerLogSettingSetError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def log_levels(
self, server_id: Optional[str] = None, with_appenders: Optional[bool] = None
) -> Result[Json]:
"""Return current logging levels.
:param server_id: Forward log level to a specific server. This makes it
easier to adjust the log levels in clusters because DB-Servers require
JWT authentication whereas Coordinators also support authentication
using usernames and passwords.
:type server_id: str
:param with_appenders: Include appenders in the response.
:type with_appenders: bool
:return: Current logging levels.
:rtype: dict
"""
params: Params = {}
if server_id is not None:
params["serverId"] = server_id
if with_appenders is not None:
params["withAppenders"] = with_appenders
request = Request(method="get", endpoint="/_admin/log/level", params=params)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerLogLevelError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def set_log_levels(
self,
server_id: Optional[str] = None,
with_appenders: Optional[bool] = None,
**kwargs: Dict[str, Any],
) -> Result[Json]:
"""Set the logging levels.
This method takes arbitrary keyword arguments where the keys are the
logger names and the values are the logging levels. For example:
.. code-block:: python
arango.set_log_levels(
agency='DEBUG',
collector='INFO',
threads='WARNING'
)
Keys that are not valid logger names are ignored.
:param server_id: Forward log level to a specific server. This makes it
easier to adjust the log levels in clusters because DB-Servers require
JWT authentication whereas Coordinators also support authentication
using usernames and passwords.
:type server_id: str | None
:param with_appenders: Include appenders in the request.
:type with_appenders: bool | None
:param kwargs: Logging levels.
:type kwargs: Dict[str, Any]
:return: New logging levels.
:rtype: dict
"""
params: Params = {}
if server_id is not None:
params["serverId"] = server_id
if with_appenders is not None:
params["withAppenders"] = with_appenders
request = Request(
method="put", endpoint="/_admin/log/level", params=params, data=kwargs
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerLogLevelSetError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def reset_log_levels(self, server_id: Optional[str] = None) -> Result[Json]:
"""Reset the logging levels.
Revert the server’s log level settings to the values they had at startup,
as determined by the startup options specified on the command-line,
a configuration file, and the factory defaults.
:param server_id: Forward log level to a specific server. This makes it
easier to adjust the log levels in clusters because DB-Servers require
JWT authentication whereas Coordinators also support authentication
using usernames and passwords.
:type server_id: str | None
:return: New logging levels.
:rtype: dict
"""
params: Params = {}
if server_id is not None:
params["serverId"] = server_id
request = Request(method="delete", endpoint="/_admin/log/level", params=params)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerLogLevelResetError(resp, request)
result: Json = resp.body
return result
return self._execute(request, response_handler)
def reload_routing(self) -> Result[bool]:
"""Reload the routing information.
:return: True if routing was reloaded successfully.
:rtype: bool
:raise arango.exceptions.ServerReloadRoutingError: If reload fails.
"""
request = Request(method="post", endpoint="/_admin/routing/reload")
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise ServerReloadRoutingError(resp, request)
return True
return self._execute(request, response_handler)
def metrics(self) -> Result[str]:
"""Return server metrics in Prometheus format.
:return: Server metrics in Prometheus format.
:rtype: str
"""
request = Request(method="get", endpoint="/_admin/metrics/v2")
def response_handler(resp: Response) -> str:
if resp.is_success:
return resp.raw_body
raise ServerMetricsError(resp, request)
return self._execute(request, response_handler)
def jwt_secrets(self) -> Result[Json]: # pragma: no cover
"""Return information on currently loaded JWT secrets.
:return: Information on currently loaded JWT secrets.
:rtype: dict
"""
request = Request(method="get", endpoint="/_admin/server/jwt")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise JWTSecretListError(resp, request)
result: Json = resp.body["result"]
return result
return self._execute(request, response_handler)
def reload_jwt_secrets(self) -> Result[Json]: # pragma: no cover
"""Hot-reload JWT secrets.
Calling this without payload reloads JWT secrets from disk. Only files
specified via arangod startup option ``--server.jwt-secret-keyfile`` or
``--server.jwt-secret-folder`` are used. It is not possible to change
the location where files are loaded from without restarting the server.
:return: Information on reloaded JWT secrets.
:rtype: dict
"""
request = Request(method="post", endpoint="/_admin/server/jwt")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise JWTSecretReloadError(resp, request)
result: Json = resp.body["result"]
return result
return self._execute(request, response_handler)
def tls(self) -> Result[Json]:
"""Return TLS data (server key, client-auth CA).
:return: TLS data.
:rtype: dict
"""
request = Request(method="get", endpoint="/_admin/server/tls")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerTLSError(resp, request)
return format_tls(resp.body["result"])
return self._execute(request, response_handler)
def reload_tls(self) -> Result[Json]:
"""Reload TLS data (server key, client-auth CA).
:return: New TLS data.
:rtype: dict
"""
request = Request(method="post", endpoint="/_admin/server/tls")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ServerTLSReloadError(resp, request)
return format_tls(resp.body["result"])
return self._execute(request, response_handler)
def encryption(self) -> Result[Json]:
"""Rotate the user-supplied keys for encryption.
This method is available only for enterprise edition of ArangoDB.
:return: New TLS data.
:rtype: dict
:raise arango.exceptions.ServerEncryptionError: If retrieval fails.
"""
request = Request(method="post", endpoint="/_admin/server/encryption")
def response_handler(resp: Response) -> Json:
if resp.is_success: # pragma: no cover
result: Json = resp.body["result"]
return result
raise ServerEncryptionError(resp, request)
return self._execute(request, response_handler)
def options(self) -> Result[Json]:
"""Return the currently-set server options (ArangoDB 3.12+)
As this API may reveal sensitive data about the deployment, it can only
be accessed from inside the _system database. In addition, there is a
policy control startup option --server.options-api that determines if and
to whom the API is made available. This option can have the following
values:
- disabled: API is disabled.
- jwt: API can only be accessed via superuser JWT.
- admin: API can be accessed by admin users in the _system database only.
- public: everyone with access to _system database can access the API.
:return: Server options.
:rtype: dict
"""
request = Request(method="get", endpoint="/_admin/options")
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise ServerCurrentOptionsGetError(resp, request)
return self._execute(request, response_handler)
def options_available(self) -> Result[Json]:
"""Return a description of all available server options (ArangoDB 3.12+)
As this API may reveal sensitive data about the deployment, it can only
be accessed from inside the _system database. In addition, there is a
policy control startup option --server.options-api that determines if and
to whom the API is made available. This option can have the following
values:
- disabled: API is disabled.
- jwt: API can only be accessed via superuser JWT.
- admin: API can be accessed by admin users in the _system database only.
- public: everyone with access to _system database can access the options API.
:return: Server options.
:rtype: dict
"""
request = Request(method="get", endpoint="/_admin/options-description")
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise ServerAvailableOptionsGetError(resp, request)
return self._execute(request, response_handler)
#######################
# Database Management #
#######################
def databases(self) -> Result[List[str]]:
"""Return the names of all databases.
:return: Database names.
:rtype: [str]
:raise arango.exceptions.DatabaseListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/database")
def response_handler(resp: Response) -> List[str]:
if not resp.is_success:
raise DatabaseListError(resp, request)
result: List[str] = resp.body["result"]
return result
return self._execute(request, response_handler)
def databases_accessible_to_user(self) -> Result[List[str]]:
"""Return the names of all databases accessible by the user.
:return: Database names accesible by the current user.
:rtype: List[str]
:raise arango.exceptions.DatabaseListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/database/user")
def response_handler(resp: Response) -> List[str]:
if not resp.is_success:
raise DatabaseListError(resp, request)
result: List[str] = resp.body["result"]
return result
return self._execute(request, response_handler)
def has_database(self, name: str) -> Result[bool]:
"""Check if a database exists.
:param name: Database name.
:type name: str
:return: True if database exists, False otherwise.
:rtype: bool
"""
request = Request(method="get", endpoint="/_api/database")
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise DatabaseListError(resp, request)
return name in resp.body["result"]
return self._execute(request, response_handler)
def create_database(
self,
name: str,
users: Optional[Sequence[Json]] = None,
replication_factor: Union[int, str, None] = None,
write_concern: Optional[int] = None,
sharding: Optional[str] = None,
) -> Result[bool]:
"""Create a new database.
:param name: Database name.
:type name: str
:param users: List of users with access to the new database, where each
user is a dictionary with fields "username", "password", "active"
and "extra" (see below for example). If not set, the default user root
will be used to ensure that the new database will be accessible after
it is created.
:type users: [dict]
:param replication_factor: Default replication factor for collections
created in this database. Special values include "satellite" which
replicates the collection to every DBServer, and 1 which disables
replication. Used for clusters only.
:type replication_factor: int | str
:param write_concern: Default write concern for collections created in
this database. Determines how many copies of each shard are
required to be in sync on different DBServers. If there are less
than these many copies in the cluster a shard will refuse to write.
Writes to shards with enough up-to-date copies will succeed at the
same time, however. Value of this parameter can not be larger than
the value of **replication_factor**. Used for clusters only.
:type write_concern: int
:param sharding: Sharding method used for new collections in this
database. Allowed values are: "", "flexible" and "single". The
first two are equivalent. Used for clusters only.
:type sharding: str
:return: True if database was created successfully.
:rtype: bool
:raise arango.exceptions.DatabaseCreateError: If create fails.
Here is an example entry for parameter **users**:
.. code-block:: python
{
'username': 'john',
'password': 'password',
'active': True,
'extra': {'Department': 'IT'}
}
"""
data: Json = {"name": name}
options: Json = {}
if replication_factor is not None:
options["replicationFactor"] = replication_factor
if write_concern is not None:
options["writeConcern"] = write_concern
if sharding is not None:
options["sharding"] = sharding
if options:
data["options"] = options
if users is not None:
data["users"] = [
{
"username": user["username"],
"passwd": user["password"],
"active": user.get("active", True),
"extra": user.get("extra", {}),
}
for user in users
]
request = Request(method="post", endpoint="/_api/database", data=data)
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise DatabaseCreateError(resp, request)
return True
return self._execute(request, response_handler)
def delete_database(self, name: str, ignore_missing: bool = False) -> Result[bool]:
"""Delete the database.
:param name: Database name.
:type name: str
:param ignore_missing: Do not raise an exception on missing database.
:type ignore_missing: bool
:return: True if database was deleted successfully, False if database
was not found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.DatabaseDeleteError: If delete fails.
"""
request = Request(method="delete", endpoint=f"/_api/database/{name}")
def response_handler(resp: Response) -> bool:
if resp.error_code == 1228 and ignore_missing:
return False
if not resp.is_success:
raise DatabaseDeleteError(resp, request)
return True
return self._execute(request, response_handler)
#########################
# Collection Management #
#########################
def collection(self, name: str) -> StandardCollection:
"""Return the standard collection API wrapper.
:param name: Collection name.
:type name: str
:return: Standard collection API wrapper.
:rtype: arango.collection.StandardCollection
"""
return StandardCollection(self._conn, self._executor, name)
def has_collection(self, name: str) -> Result[bool]:
"""Check if collection exists in the database.
:param name: Collection name.
:type name: str
:return: True if collection exists, False otherwise.
:rtype: bool
"""
request = Request(method="get", endpoint="/_api/collection")
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise CollectionListError(resp, request)
return any(col["name"] == name for col in resp.body["result"])
return self._execute(request, response_handler)
def collections(self) -> Result[Jsons]:
"""Return the collections in the database.
:return: Collections in the database and their details.
:rtype: [dict]
:raise arango.exceptions.CollectionListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/collection")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise CollectionListError(resp, request)
return [
{
"id": col["id"],
"name": col["name"],
"system": col["isSystem"],
"type": StandardCollection.types[col["type"]],
"status": StandardCollection.statuses[col["status"]],
}
for col in resp.body["result"]
]
return self._execute(request, response_handler)
def create_collection(
self,
name: str,
sync: bool = False,
system: bool = False,
edge: bool = False,
user_keys: bool = True,
key_increment: Optional[int] = None,
key_offset: Optional[int] = None,
key_generator: str = "traditional",
shard_fields: Optional[Sequence[str]] = None,
shard_count: Optional[int] = None,
replication_factor: Optional[int] = None,
shard_like: Optional[str] = None,
sync_replication: Optional[bool] = None,
enforce_replication_factor: Optional[bool] = None,
sharding_strategy: Optional[str] = None,
smart_join_attribute: Optional[str] = None,
write_concern: Optional[int] = None,
schema: Optional[Json] = None,
computedValues: Optional[Jsons] = None,
) -> Result[StandardCollection]:
"""Create a new collection.
:param name: Collection name.
:type name: str
:param sync: If set to True, document operations via the collection
will block until synchronized to disk by default.
:type sync: bool | None
:param system: If set to True, a system collection is created. The
collection name must have leading underscore "_" character.
:type system: bool
:param edge: If set to True, an edge collection is created.
:type edge: bool
:param key_generator: Used for generating document keys. Allowed values
are "traditional" or "autoincrement".
:type key_generator: str
:param user_keys: If set to True, users are allowed to supply document
keys. If set to False, the key generator is solely responsible for
supplying the key values.
:type user_keys: bool
:param key_increment: Key increment value. Applies only when value of
**key_generator** is set to "autoincrement".
:type key_increment: int
:param key_offset: Key offset value. Applies only when value of
**key_generator** is set to "autoincrement".
:type key_offset: int
:param shard_fields: Field(s) used to determine the target shard.
:type shard_fields: [str]
:param shard_count: Number of shards to create.
:type shard_count: int
:param replication_factor: Number of copies of each shard on different
servers in a cluster. Allowed values are 1 (only one copy is kept
and no synchronous replication), and n (n-1 replicas are kept and
any two copies are replicated across servers synchronously, meaning
every write to the master is copied to all slaves before operation
is reported successful).
:type replication_factor: int
:param shard_like: Name of prototype collection whose sharding
specifics are imitated. Prototype collections cannot be dropped
before imitating collections. Applies to enterprise version of
ArangoDB only.
:type shard_like: str
:param sync_replication: If set to True, server reports success only
when collection is created in all replicas. You can set this to
False for faster server response, and if full replication is not a
concern.
:type sync_replication: bool
:param enforce_replication_factor: Check if there are enough replicas
available at creation time, or halt the operation.
:type enforce_replication_factor: bool
:param sharding_strategy: Sharding strategy. Available for ArangoDB
version and up only. Possible values are "community-compat",
"enterprise-compat", "enterprise-smart-edge-compat", "hash" and
"enterprise-hash-smart-edge". Refer to ArangoDB documentation for
more details on each value.
:type sharding_strategy: str
:param smart_join_attribute: Attribute of the collection which must
contain the shard key value of the smart join collection. The shard
key for the documents must contain the value of this attribute,
followed by a colon ":" and the primary key of the document.
Requires parameter **shard_like** to be set to the name of another
collection, and parameter **shard_fields** to be set to a single
shard key attribute, with another colon ":" at the end. Available
only for enterprise version of ArangoDB.
:type smart_join_attribute: str
:param write_concern: Write concern for the collection. Determines how
many copies of each shard are required to be in sync on different
DBServers. If there are less than these many copies in the cluster
a shard will refuse to write. Writes to shards with enough
up-to-date copies will succeed at the same time. The value of this
parameter cannot be larger than that of **replication_factor**.
Default value is 1. Used for clusters only.
:type write_concern: int
:param schema: Optional dict specifying the collection level schema
for documents. See ArangoDB documentation for more information on
document schema validation.
:type schema: dict
:param computedValues: Array of computed values for the new collection
enabling default values to new documents or the maintenance of
auxiliary attributes for search queries. Available in ArangoDB
version 3.10 or greater. See ArangoDB documentation for more
information on computed values.
:type computedValues: list
:return: Standard collection API wrapper.
:rtype: arango.collection.StandardCollection
:raise arango.exceptions.CollectionCreateError: If create fails.
"""
key_options: Json = {"type": key_generator, "allowUserKeys": user_keys}
if key_generator == "autoincrement":
if key_increment is not None:
key_options["increment"] = key_increment
if key_offset is not None:
key_options["offset"] = key_offset
data: Json = {
"name": name,
"waitForSync": sync,
"isSystem": system,
"keyOptions": key_options,
"type": 3 if edge else 2,
}
if shard_count is not None:
data["numberOfShards"] = shard_count
if shard_fields is not None:
data["shardKeys"] = shard_fields
if replication_factor is not None:
data["replicationFactor"] = replication_factor
if shard_like is not None:
data["distributeShardsLike"] = shard_like
if sharding_strategy is not None:
data["shardingStrategy"] = sharding_strategy
if smart_join_attribute is not None:
data["smartJoinAttribute"] = smart_join_attribute
if write_concern is not None:
data["writeConcern"] = write_concern
if schema is not None:
data["schema"] = schema
if computedValues is not None:
data["computedValues"] = computedValues
params: Params = {}
if sync_replication is not None:
params["waitForSyncReplication"] = sync_replication
if enforce_replication_factor is not None:
params["enforceReplicationFactor"] = enforce_replication_factor
request = Request(
method="post", endpoint="/_api/collection", params=params, data=data
)
def response_handler(resp: Response) -> StandardCollection:
if resp.is_success:
return self.collection(name)
raise CollectionCreateError(resp, request)
return self._execute(request, response_handler)
def delete_collection(
self, name: str, ignore_missing: bool = False, system: Optional[bool] = None
) -> Result[bool]:
"""Delete the collection.
:param name: Collection name.
:type name: str
:param ignore_missing: Do not raise an exception on missing collection.
:type ignore_missing: bool
:param system: Whether the collection is a system collection.
:type system: bool
:return: True if collection was deleted successfully, False if
collection was not found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.CollectionDeleteError: If delete fails.
"""
params: Params = {}
if system is not None:
params["isSystem"] = system
request = Request(
method="delete", endpoint=f"/_api/collection/{name}", params=params
)
def response_handler(resp: Response) -> bool:
if resp.error_code == 1203 and ignore_missing:
return False
if not resp.is_success:
raise CollectionDeleteError(resp, request)
return True
return self._execute(request, response_handler)
####################
# Graph Management #
####################
def graph(self, name: str) -> Graph:
"""Return the graph API wrapper.
:param name: Graph name.
:type name: str
:return: Graph API wrapper.
:rtype: arango.graph.Graph
"""
return Graph(self._conn, self._executor, name)
def has_graph(self, name: str) -> Result[bool]:
"""Check if a graph exists in the database.
:param name: Graph name.
:type name: str
:return: True if graph exists, False otherwise.
:rtype: bool
"""
request = Request(method="get", endpoint="/_api/gharial")
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise GraphListError(resp, request)
return any(name == graph["_key"] for graph in resp.body["graphs"])
return self._execute(request, response_handler)
def graphs(self) -> Result[Jsons]:
"""List all graphs in the database.
:return: Graphs in the database.
:rtype: [dict]
:raise arango.exceptions.GraphListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/gharial")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise GraphListError(resp, request)
return [
{
"id": body["_id"],
"name": body["_key"],
"revision": body["_rev"],
"orphan_collections": body["orphanCollections"],
"edge_definitions": [
{
"edge_collection": definition["collection"],
"from_vertex_collections": definition["from"],
"to_vertex_collections": definition["to"],
}
for definition in body["edgeDefinitions"]
],
"shard_count": body.get("numberOfShards"),
"replication_factor": body.get("replicationFactor"),
}
for body in resp.body["graphs"]
]
return self._execute(request, response_handler)
def create_graph(
self,
name: str,
edge_definitions: Optional[Sequence[Json]] = None,
orphan_collections: Optional[Sequence[str]] = None,
smart: Optional[bool] = None,
disjoint: Optional[bool] = None,
smart_field: Optional[str] = None,
shard_count: Optional[int] = None,
replication_factor: Optional[int] = None,
write_concern: Optional[int] = None,
satellite_collections: Optional[Sequence[str]] = None,
) -> Result[Graph]:
"""Create a new graph.
:param name: Graph name.
:type name: str
:param edge_definitions: List of edge definitions, where each edge
definition entry is a dictionary with fields "edge_collection",
"from_vertex_collections" and "to_vertex_collections" (see below
for example).
:type edge_definitions: [dict] | None
:param orphan_collections: Names of additional vertex collections that
are not in edge definitions.
:type orphan_collections: [str] | None
:param smart: If set to True, sharding is enabled (see parameter
**smart_field** below). Applies only to enterprise version of
ArangoDB.
:type smart: bool | None
:param disjoint: If set to True, create a disjoint SmartGraph instead
of a regular SmartGraph. Applies only to enterprise version of
ArangoDB.
:type disjoint: bool | None
:param smart_field: Document field used to shard the vertices of the
graph. To use this, parameter **smart** must be set to True and
every vertex in the graph must have the smart field. Applies only
to enterprise version of ArangoDB. NOTE: If this field is
None and **smart** is True, an Enterprise Graph will be created.
:type smart_field: str | None
:param shard_count: Number of shards used for every collection in the
graph. To use this, parameter **smart** must be set to True and
every vertex in the graph must have the smart field. This number
cannot be modified later once set. Applies only to enterprise
version of ArangoDB.
:type shard_count: int | None
:param replication_factor: Number of copies of each shard on different
servers in a cluster. Allowed values are 1 (only one copy is kept
and no synchronous replication), and n (n-1 replicas are kept and
any two copies are replicated across servers synchronously, meaning
every write to the master is copied to all slaves before operation
is reported successful).
:type replication_factor: int
:param write_concern: Write concern for the collection. Determines how
many copies of each shard are required to be in sync on different
DBServers. If there are less than these many copies in the cluster
a shard will refuse to write. Writes to shards with enough
up-to-date copies will succeed at the same time. The value of this
parameter cannot be larger than that of **replication_factor**.
Default value is 1. Used for clusters only.
:type write_concern: int
:param satellite_collections: An array of collection names that is
used to create SatelliteCollections for a (Disjoint) SmartGraph
using SatelliteCollections (Enterprise Edition only). Each array
element must be a string and a valid collection name. The
collection type cannot be modified later.
:type satellite_collections: [str] | None
:return: Graph API wrapper.
:rtype: arango.graph.Graph
:raise arango.exceptions.GraphCreateError: If create fails.
Here is an example entry for parameter **edge_definitions**:
.. code-block:: python
[
{
'edge_collection': 'teach',
'from_vertex_collections': ['teachers'],
'to_vertex_collections': ['lectures']
}
]
"""
data: Json = {"name": name, "options": dict()}
if edge_definitions is not None:
data["edgeDefinitions"] = [
{
"collection": definition["edge_collection"],
"from": definition["from_vertex_collections"],
"to": definition["to_vertex_collections"],
}
for definition in edge_definitions
]
if orphan_collections is not None:
data["orphanCollections"] = orphan_collections
if smart is not None: # pragma: no cover
data["isSmart"] = smart
if disjoint is not None: # pragma: no cover
data["isDisjoint"] = disjoint
if smart_field is not None: # pragma: no cover
data["options"]["smartGraphAttribute"] = smart_field
if shard_count is not None: # pragma: no cover
data["options"]["numberOfShards"] = shard_count
if replication_factor is not None: # pragma: no cover
data["options"]["replicationFactor"] = replication_factor
if write_concern is not None: # pragma: no cover
data["options"]["writeConcern"] = write_concern
if satellite_collections is not None: # pragma: no cover
data["options"]["satellites"] = satellite_collections
request = Request(method="post", endpoint="/_api/gharial", data=data)
def response_handler(resp: Response) -> Graph:
if resp.is_success:
return Graph(self._conn, self._executor, name)
raise GraphCreateError(resp, request)
return self._execute(request, response_handler)
def delete_graph(
self,
name: str,
ignore_missing: bool = False,
drop_collections: Optional[bool] = None,
) -> Result[bool]:
"""Drop the graph of the given name from the database.
:param name: Graph name.
:type name: str
:param ignore_missing: Do not raise an exception on missing graph.
:type ignore_missing: bool
:param drop_collections: Drop the collections of the graph also. This
is only if they are not in use by other graphs.
:type drop_collections: bool | None
:return: True if graph was deleted successfully, False if graph was not
found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.GraphDeleteError: If delete fails.
"""
params: Params = {}
if drop_collections is not None:
params["dropCollections"] = drop_collections
request = Request(
method="delete", endpoint=f"/_api/gharial/{name}", params=params
)
def response_handler(resp: Response) -> bool:
if resp.error_code == 1924 and ignore_missing:
return False
if not resp.is_success:
raise GraphDeleteError(resp, request)
return True
return self._execute(request, response_handler)
#######################
# Document Management #
#######################
def has_document(
self, document: Json, rev: Optional[str] = None, check_rev: bool = True
) -> Result[bool]:
"""Check if a document exists.
:param document: Document ID or body with "_id" field.
:type document: str | dict
:param rev: Expected document revision. Overrides value of "_rev" field
in **document** if present.
:type rev: str | None
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:return: True if document exists, False otherwise.
:rtype: bool
:raise arango.exceptions.DocumentInError: If check fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
return self._get_col_by_doc(document).has(
document=document, rev=rev, check_rev=check_rev
)
def document(
self, document: Json, rev: Optional[str] = None, check_rev: bool = True
) -> Result[Optional[Json]]:
"""Return a document.
:param document: Document ID or body with "_id" field.
:type document: str | dict
:param rev: Expected document revision. Overrides the value of "_rev"
field in **document** if present.
:type rev: str | None
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:return: Document, or None if not found.
:rtype: dict | None
:raise arango.exceptions.DocumentGetError: If retrieval fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
return self._get_col_by_doc(document).get(
document=document, rev=rev, check_rev=check_rev
)
def insert_document(
self,
collection: str,
document: Json,
return_new: bool = False,
sync: Optional[bool] = None,
silent: bool = False,
overwrite: bool = False,
return_old: bool = False,
overwrite_mode: Optional[str] = None,
keep_none: Optional[bool] = None,
merge: Optional[bool] = None,
) -> Result[Union[bool, Json]]:
"""Insert a new document.
:param collection: Collection name.
:type collection: str
:param document: Document to insert. If it contains the "_key" or "_id"
field, the value is used as the key of the new document (otherwise
it is auto-generated). Any "_rev" field is ignored.
:type document: dict
:param return_new: Include body of the new document in the returned
metadata. Ignored if parameter **silent** is set to True.
:type return_new: bool
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param silent: If set to True, no document metadata is returned. This
can be used to save resources.
:type silent: bool
:param overwrite: If set to True, operation does not fail on duplicate
key and the existing document is replaced.
:type overwrite: bool
:param return_old: Include body of the old document if replaced.
Applies only when value of **overwrite** is set to True.
:type return_old: bool
:param overwrite_mode: Overwrite behavior used when the document key
exists already. Allowed values are "replace" (replace-insert) or
"update" (update-insert). Implicitly sets the value of parameter
**overwrite**.
:type overwrite_mode: str | None
:param keep_none: If set to True, fields with value None are retained
in the document. Otherwise, they are removed completely. Applies
only when **overwrite_mode** is set to "update" (update-insert).
:type keep_none: bool | None
:param merge: If set to True (default), sub-dictionaries are merged
instead of the new one overwriting the old one. Applies only when
**overwrite_mode** is set to "update" (update-insert).
:type merge: bool | None
:return: Document metadata (e.g. document key, revision) or True if
parameter **silent** was set to True.
:rtype: bool | dict
:raise arango.exceptions.DocumentInsertError: If insert fails.
"""
return self.collection(collection).insert(
document=document,
return_new=return_new,
sync=sync,
silent=silent,
overwrite=overwrite,
return_old=return_old,
overwrite_mode=overwrite_mode,
keep_none=keep_none,
merge=merge,
)
def update_document(
self,
document: Json,
check_rev: bool = True,
merge: bool = True,
keep_none: bool = True,
return_new: bool = False,
return_old: bool = False,
sync: Optional[bool] = None,
silent: bool = False,
) -> Result[Union[bool, Json]]:
"""Update a document.
:param document: Partial or full document with the updated values. It
must contain the "_id" field.
:type document: dict
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:param merge: If set to True, sub-dictionaries are merged instead of
the new one overwriting the old one.
:type merge: bool | None
:param keep_none: If set to True, fields with value None are retained
in the document. Otherwise, they are removed completely.
:type keep_none: bool | None
:param return_new: Include body of the new document in the result.
:type return_new: bool
:param return_old: Include body of the old document in the result.
:type return_old: bool
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param silent: If set to True, no document metadata is returned. This
can be used to save resources.
:type silent: bool
:return: Document metadata (e.g. document key, revision) or True if
parameter **silent** was set to True.
:rtype: bool | dict
:raise arango.exceptions.DocumentUpdateError: If update fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
return self._get_col_by_doc(document).update(
document=document,
check_rev=check_rev,
merge=merge,
keep_none=keep_none,
return_new=return_new,
return_old=return_old,
sync=sync,
silent=silent,
)
def replace_document(
self,
document: Json,
check_rev: bool = True,
return_new: bool = False,
return_old: bool = False,
sync: Optional[bool] = None,
silent: bool = False,
) -> Result[Union[bool, Json]]:
"""Replace a document.
:param document: New document to replace the old one with. It must
contain the "_id" field. Edge document must also have "_from" and
"_to" fields.
:type document: dict
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:param return_new: Include body of the new document in the result.
:type return_new: bool
:param return_old: Include body of the old document in the result.
:type return_old: bool
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param silent: If set to True, no document metadata is returned. This
can be used to save resources.
:type silent: bool
:return: Document metadata (e.g. document key, revision) or True if
parameter **silent** was set to True.
:rtype: bool | dict
:raise arango.exceptions.DocumentReplaceError: If replace fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
return self._get_col_by_doc(document).replace(
document=document,
check_rev=check_rev,
return_new=return_new,
return_old=return_old,
sync=sync,
silent=silent,
)
def delete_document(
self,
document: Union[str, Json],
rev: Optional[str] = None,
check_rev: bool = True,
ignore_missing: bool = False,
return_old: bool = False,
sync: Optional[bool] = None,
silent: bool = False,
) -> Result[Union[bool, Json]]:
"""Delete a document.
:param document: Document ID, key or body. Document body must contain
the "_id" field.
:type document: str | dict
:param rev: Expected document revision. Overrides the value of "_rev"
field in **document** if present.
:type rev: str | None
:param check_rev: If set to True, revision of **document** (if given)
is compared against the revision of target document.
:type check_rev: bool
:param ignore_missing: Do not raise an exception on missing document.
This parameter has no effect in transactions where an exception is
always raised on failures.
:type ignore_missing: bool
:param return_old: Include body of the old document in the result.
:type return_old: bool
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param silent: If set to True, no document metadata is returned. This
can be used to save resources.
:type silent: bool
:return: Document metadata (e.g. document key, revision), or True if
parameter **silent** was set to True, or False if document was not
found and **ignore_missing** was set to True (does not apply in
transactions).
:rtype: bool | dict
:raise arango.exceptions.DocumentDeleteError: If delete fails.
:raise arango.exceptions.DocumentRevisionError: If revisions mismatch.
"""
return self._get_col_by_doc(document).delete(
document=document,
rev=rev,
check_rev=check_rev,
ignore_missing=ignore_missing,
return_old=return_old,
sync=sync,
silent=silent,
)
###################
# Task Management #
###################
def tasks(self) -> Result[Jsons]:
"""Return all currently active server tasks.
:return: Currently active server tasks.
:rtype: [dict]
:raise arango.exceptions.TaskListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/tasks")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TaskListError(resp, request)
result: Jsons = resp.body
return result
return self._execute(request, response_handler)
def task(self, task_id: str) -> Result[Json]:
"""Return the details of an active server task.
:param task_id: Server task ID.
:type task_id: str
:return: Server task details.
:rtype: dict
:raise arango.exceptions.TaskGetError: If retrieval fails.
"""
request = Request(method="get", endpoint=f"/_api/tasks/{task_id}")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise TaskGetError(resp, request)
return self._execute(request, response_handler)
def create_task(
self,
name: str,
command: str,
params: Optional[Json] = None,
period: Optional[int] = None,
offset: Optional[int] = None,
task_id: Optional[str] = None,
) -> Result[Json]:
"""Create a new server task.
:param name: Name of the server task.
:type name: str
:param command: Javascript command to execute.
:type command: str
:param params: Optional parameters passed into the Javascript command.
:type params: dict | None
:param period: Number of seconds to wait between executions. If set
to 0, the new task will be "timed", meaning it will execute only
once and be deleted afterwards.
:type period: int | None
:param offset: Initial delay before execution in seconds.
:type offset: int | None
:param task_id: Pre-defined ID for the new server task.
:type task_id: str | None
:return: Details of the new task.
:rtype: dict
:raise arango.exceptions.TaskCreateError: If create fails.
"""
data: Json = {"name": name, "command": command}
if params is not None:
data["params"] = params
if task_id is not None:
data["id"] = task_id
if period is not None:
data["period"] = period
if offset is not None:
data["offset"] = offset
if task_id is None:
task_id = ""
request = Request(method="post", endpoint=f"/_api/tasks/{task_id}", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise TaskCreateError(resp, request)
return self._execute(request, response_handler)
def delete_task(self, task_id: str, ignore_missing: bool = False) -> Result[bool]:
"""Delete a server task.
:param task_id: Server task ID.
:type task_id: str
:param ignore_missing: Do not raise an exception on missing task.
:type ignore_missing: bool
:return: True if task was successfully deleted, False if task was not
found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.TaskDeleteError: If delete fails.
"""
request = Request(method="delete", endpoint=f"/_api/tasks/{task_id}")
def response_handler(resp: Response) -> bool:
if resp.error_code == 1852 and ignore_missing:
return False
if not resp.is_success:
raise TaskDeleteError(resp, request)
return True
return self._execute(request, response_handler)
###################
# User Management #
###################
def has_user(self, username: str) -> Result[bool]:
"""Check if user exists.
:param username: Username.
:type username: str
:return: True if user exists, False otherwise.
:rtype: bool
"""
request = Request(method="get", endpoint="/_api/user")
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise UserListError(resp, request)
return any(user["user"] == username for user in resp.body["result"])
return self._execute(request, response_handler)
def users(self) -> Result[Jsons]:
"""Return all user details.
:return: List of user details.
:rtype: [dict]
:raise arango.exceptions.UserListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/user")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise UserListError(resp, request)
return [
{
"username": record["user"],
"active": record["active"],
"extra": record["extra"],
}
for record in resp.body["result"]
]
return self._execute(request, response_handler)
def user(self, username: str) -> Result[Json]:
"""Return user details.
:param username: Username.
:type username: str
:return: User details.
:rtype: dict
:raise arango.exceptions.UserGetError: If retrieval fails.
"""
request = Request(method="get", endpoint=f"/_api/user/{username}")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise UserGetError(resp, request)
return {
"username": resp.body["user"],
"active": resp.body["active"],
"extra": resp.body["extra"],
}
return self._execute(request, response_handler)
def create_user(
self,
username: str,
password: Optional[str] = None,
active: Optional[bool] = None,
extra: Optional[Json] = None,
) -> Result[Json]:
"""Create a new user.
:param username: Username.
:type username: str
:param password: Password.
:type password: str | None
:param active: True if user is active, False otherwise.
:type active: bool | None
:param extra: Additional data for the user.
:type extra: dict | None
:return: New user details.
:rtype: dict
:raise arango.exceptions.UserCreateError: If create fails.
"""
data: Json = {"user": username, "passwd": password, "active": active}
if extra is not None:
data["extra"] = extra
request = Request(method="post", endpoint="/_api/user", data=data)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise UserCreateError(resp, request)
return {
"username": resp.body["user"],
"active": resp.body["active"],
"extra": resp.body["extra"],
}
return self._execute(request, response_handler)
def update_user(
self,
username: str,
password: Optional[str] = None,
active: Optional[bool] = None,
extra: Optional[Json] = None,
) -> Result[Json]:
"""Update a user.
:param username: Username.
:type username: str
:param password: New password.
:type password: str | None
:param active: Whether the user is active.
:type active: bool | None
:param extra: Additional data for the user.
:type extra: dict | None
:return: New user details.
:rtype: dict
:raise arango.exceptions.UserUpdateError: If update fails.
"""
data: Json = {}
if password is not None:
data["passwd"] = password
if active is not None:
data["active"] = active
if extra is not None:
data["extra"] = extra
request = Request(
method="patch",
endpoint=f"/_api/user/{username}",
data=data,
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise UserUpdateError(resp, request)
return {
"username": resp.body["user"],
"active": resp.body["active"],
"extra": resp.body["extra"],
}
return self._execute(request, response_handler)
def replace_user(
self,
username: str,
password: str,
active: Optional[bool] = None,
extra: Optional[Json] = None,
) -> Result[Json]:
"""Replace a user.
:param username: Username.
:type username: str
:param password: New password.
:type password: str
:param active: Whether the user is active.
:type active: bool | None
:param extra: Additional data for the user.
:type extra: dict | None
:return: New user details.
:rtype: dict
:raise arango.exceptions.UserReplaceError: If replace fails.
"""
data: Json = {"user": username, "passwd": password}
if active is not None:
data["active"] = active
if extra is not None:
data["extra"] = extra
request = Request(method="put", endpoint=f"/_api/user/{username}", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return {
"username": resp.body["user"],
"active": resp.body["active"],
"extra": resp.body["extra"],
}
raise UserReplaceError(resp, request)
return self._execute(request, response_handler)
def delete_user(self, username: str, ignore_missing: bool = False) -> Result[bool]:
"""Delete a user.
:param username: Username.
:type username: str
:param ignore_missing: Do not raise an exception on missing user.
:type ignore_missing: bool
:return: True if user was deleted successfully, False if user was not
found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.UserDeleteError: If delete fails.
"""
request = Request(method="delete", endpoint=f"/_api/user/{username}")
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
elif resp.status_code == 404 and ignore_missing:
return False
raise UserDeleteError(resp, request)
return self._execute(request, response_handler)
#########################
# Permission Management #
#########################
def permissions(self, username: str) -> Result[Json]:
"""Return user permissions for all databases and collections.
:param username: Username.
:type username: str
:return: User permissions for all databases and collections.
:rtype: dict
:raise arango.exceptions.PermissionListError: If retrieval fails.
"""
request = Request(
method="get",
endpoint=f"/_api/user/{username}/database",
params={"full": True},
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body["result"]
return result
raise PermissionListError(resp, request)
return self._execute(request, response_handler)
def permission(
self, username: str, database: str, collection: Optional[str] = None
) -> Result[str]:
"""Return user permission for a specific database or collection.
:param username: Username.
:type username: str
:param database: Database name.
:type database: str
:param collection: Collection name.
:type collection: str | None
:return: Permission for given database or collection.
:rtype: str
:raise arango.exceptions.PermissionGetError: If retrieval fails.
"""
endpoint = f"/_api/user/{username}/database/{database}"
if collection is not None:
endpoint += "/" + collection
request = Request(method="get", endpoint=endpoint)
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["result"])
raise PermissionGetError(resp, request)
return self._execute(request, response_handler)
def update_permission(
self,
username: str,
permission: str,
database: str,
collection: Optional[str] = None,
) -> Result[bool]:
"""Update user permission for a specific database or collection.
:param username: Username.
:type username: str
:param permission: Allowed values are "rw" (read and write), "ro"
(read only) or "none" (no access).
:type permission: str
:param database: Database name.
:type database: str
:param collection: Collection name.
:type collection: str | None
:return: True if access was granted successfully.
:rtype: bool
:raise arango.exceptions.PermissionUpdateError: If update fails.
"""
endpoint = f"/_api/user/{username}/database/{database}"
if collection is not None:
endpoint += "/" + collection
request = Request(method="put", endpoint=endpoint, data={"grant": permission})
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise PermissionUpdateError(resp, request)
return self._execute(request, response_handler)
def reset_permission(
self, username: str, database: str, collection: Optional[str] = None
) -> Result[bool]:
"""Reset user permission for a specific database or collection.
:param username: Username.
:type username: str
:param database: Database name.
:type database: str
:param collection: Collection name.
:type collection: str
:return: True if permission was reset successfully.
:rtype: bool
:raise arango.exceptions.PermissionRestError: If reset fails.
"""
endpoint = f"/_api/user/{username}/database/{database}"
if collection is not None:
endpoint += "/" + collection
request = Request(method="delete", endpoint=endpoint)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise PermissionResetError(resp, request)
return self._execute(request, response_handler)
########################
# Async Job Management #
########################
def async_jobs(self, status: str, count: Optional[int] = None) -> Result[List[str]]:
"""Return IDs of async jobs with given status.
:param status: Job status (e.g. "pending", "done").
:type status: str
:param count: Max number of job IDs to return.
:type count: int
:return: List of job IDs.
:rtype: [str]
:raise arango.exceptions.AsyncJobListError: If retrieval fails.
"""
params: Params = {}
if count is not None:
params["count"] = count
request = Request(method="get", endpoint=f"/_api/job/{status}", params=params)
def response_handler(resp: Response) -> List[str]:
if resp.is_success:
result: List[str] = resp.body
return result
raise AsyncJobListError(resp, request)
return self._execute(request, response_handler)
def clear_async_jobs(self, threshold: Optional[int] = None) -> Result[bool]:
"""Clear async job results from the server.
Async jobs that are still queued or running are not stopped.
:param threshold: If specified, only the job results created prior to
the threshold (a unix timestamp) are deleted. Otherwise, all job
results are deleted.
:type threshold: int | None
:return: True if job results were cleared successfully.
:rtype: bool
:raise arango.exceptions.AsyncJobClearError: If operation fails.
"""
if threshold is None:
request = Request(method="delete", endpoint="/_api/job/all")
else:
request = Request(
method="delete",
endpoint="/_api/job/expired",
params={"stamp": threshold},
)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise AsyncJobClearError(resp, request)
return self._execute(request, response_handler)
###################
# View Management #
###################
def views(self) -> Result[Jsons]:
"""Return list of views and their summaries.
:return: List of views.
:rtype: [dict]
:raise arango.exceptions.ViewListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/view")
def response_handler(resp: Response) -> Jsons:
if resp.is_success:
return [format_view(view) for view in resp.body["result"]]
raise ViewListError(resp, request)
return self._execute(request, response_handler)
def view(self, name: str) -> Result[Json]:
"""Return the properties of a View.
:return: The View properties.
:rtype: dict
:raise arango.exceptions.ViewGetError: If retrieval fails.
"""
request = Request(method="get", endpoint=f"/_api/view/{name}/properties")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewGetError(resp, request)
return self._execute(request, response_handler)
def view_info(self, name: str) -> Result[Json]:
"""Return the id, name and type of a View.
:return: Some View information.
:rtype: dict
:raise arango.exceptions.ViewGetError: If retrieval fails.
"""
request = Request(method="get", endpoint=f"/_api/view/{name}")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewGetError(resp, request)
return self._execute(request, response_handler)
def create_view(
self, name: str, view_type: str, properties: Optional[Json] = None
) -> Result[Json]:
"""Create a view.
:param name: View name.
:type name: str
:param view_type: View type (e.g. "arangosearch" or "search-alias").
:type view_type: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewCreateError: If create fails.
"""
data: Json = {"name": name, "type": view_type}
if properties is not None:
data.update(properties)
request = Request(method="post", endpoint="/_api/view", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewCreateError(resp, request)
return self._execute(request, response_handler)
def update_view(self, name: str, properties: Json) -> Result[Json]:
"""Update a view.
:param name: View name.
:type name: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewUpdateError: If update fails.
"""
request = Request(
method="patch",
endpoint=f"/_api/view/{name}/properties",
data=properties,
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewUpdateError(resp, request)
return self._execute(request, response_handler)
def replace_view(self, name: str, properties: Json) -> Result[Json]:
"""Replace a view.
:param name: View name.
:type name: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewReplaceError: If replace fails.
"""
request = Request(
method="put",
endpoint=f"/_api/view/{name}/properties",
data=properties,
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewReplaceError(resp, request)
return self._execute(request, response_handler)
def delete_view(self, name: str, ignore_missing: bool = False) -> Result[bool]:
"""Delete a view.
:param name: View name.
:type name: str
:param ignore_missing: Do not raise an exception on missing view.
:type ignore_missing: bool
:return: True if view was deleted successfully, False if view was not
found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.ViewDeleteError: If delete fails.
"""
request = Request(method="delete", endpoint=f"/_api/view/{name}")
def response_handler(resp: Response) -> bool:
if resp.error_code == 1203 and ignore_missing:
return False
if resp.is_success:
return True
raise ViewDeleteError(resp, request)
return self._execute(request, response_handler)
def rename_view(self, name: str, new_name: str) -> Result[bool]:
"""Rename a view.
:param name: View name.
:type name: str
:param new_name: New view name.
:type new_name: str
:return: True if view was renamed successfully.
:rtype: bool
:raise arango.exceptions.ViewRenameError: If delete fails.
"""
request = Request(
method="put",
endpoint=f"/_api/view/{name}/rename",
data={"name": new_name},
)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise ViewRenameError(resp, request)
return self._execute(request, response_handler)
################################
# ArangoSearch View Management #
################################
def create_arangosearch_view(
self, name: str, properties: Optional[Json] = None
) -> Result[Json]:
"""Create an ArangoSearch view.
:param name: View name.
:type name: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict | None
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewCreateError: If create fails.
"""
data: Json = {"name": name, "type": "arangosearch"}
if properties is not None:
data.update(properties)
request = Request(method="post", endpoint="/_api/view#ArangoSearch", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewCreateError(resp, request)
return self._execute(request, response_handler)
def update_arangosearch_view(self, name: str, properties: Json) -> Result[Json]:
"""Update an ArangoSearch view.
:param name: View name.
:type name: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewUpdateError: If update fails.
"""
request = Request(
method="patch",
endpoint=f"/_api/view/{name}/properties#ArangoSearch",
data=properties,
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewUpdateError(resp, request)
return self._execute(request, response_handler)
def replace_arangosearch_view(self, name: str, properties: Json) -> Result[Json]:
"""Replace an ArangoSearch view.
:param name: View name.
:type name: str
:param properties: View properties. For more information see
https://www.arangodb.com/docs/stable/http/views-arangosearch.html
:type properties: dict
:return: View details.
:rtype: dict
:raise arango.exceptions.ViewReplaceError: If replace fails.
"""
request = Request(
method="put",
endpoint=f"/_api/view/{name}/properties#ArangoSearch",
data=properties,
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_view(resp.body)
raise ViewReplaceError(resp, request)
return self._execute(request, response_handler)
#######################
# Analyzer Management #
#######################
def analyzers(self) -> Result[Jsons]:
"""Return list of analyzers.
:return: List of analyzers.
:rtype: [dict]
:raise arango.exceptions.AnalyzerListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/analyzer")
def response_handler(resp: Response) -> Jsons:
if resp.is_success:
result: Jsons = resp.body["result"]
return result
raise AnalyzerListError(resp, request)
return self._execute(request, response_handler)
def analyzer(self, name: str) -> Result[Json]:
"""Return analyzer details.
:param name: Analyzer name.
:type name: str
:return: Analyzer details.
:rtype: dict
:raise arango.exceptions.AnalyzerGetError: If retrieval fails.
"""
request = Request(method="get", endpoint=f"/_api/analyzer/{name}")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise AnalyzerGetError(resp, request)
return self._execute(request, response_handler)
def create_analyzer(
self,
name: str,
analyzer_type: str,
properties: Optional[Json] = None,
features: Optional[Sequence[str]] = None,
) -> Result[Json]:
"""Create an analyzer.
:param name: Analyzer name.
:type name: str
:param analyzer_type: Analyzer type.
:type analyzer_type: str
:param properties: Analyzer properties.
:type properties: dict | None
:param features: Analyzer features.
:type features: list | None
:return: Analyzer details.
:rtype: dict
:raise arango.exceptions.AnalyzerCreateError: If create fails.
"""
data: Json = {"name": name, "type": analyzer_type}
if properties is not None:
data["properties"] = properties
if features is not None:
data["features"] = features
request = Request(method="post", endpoint="/_api/analyzer", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise AnalyzerCreateError(resp, request)
return self._execute(request, response_handler)
def delete_analyzer(
self, name: str, force: bool = False, ignore_missing: bool = False
) -> Result[bool]:
"""Delete an analyzer.
:param name: Analyzer name.
:type name: str
:param force: Remove the analyzer configuration even if in use.
:type force: bool
:param ignore_missing: Do not raise an exception on missing analyzer.
:type ignore_missing: bool
:return: True if analyzer was deleted successfully, False if analyzer
was not found and **ignore_missing** was set to True.
:rtype: bool
:raise arango.exceptions.AnalyzerDeleteError: If delete fails.
"""
request = Request(
method="delete",
endpoint=f"/_api/analyzer/{name}",
params={"force": force},
)
def response_handler(resp: Response) -> bool:
if resp.error_code in {1202, 404} and ignore_missing:
return False
if resp.is_success:
return True
raise AnalyzerDeleteError(resp, request)
return self._execute(request, response_handler)
###########
# Support #
###########
def support_info(self) -> Result[Json]:
"""Return information about the deployment.
Retrieves deployment information for support purposes.
The endpoint returns data about the ArangoDB version used,
the host (operating system, server ID, CPU and storage capacity,
current utilization, a few metrics) and the other servers in the
deployment (in case of Active Failover or cluster deployments).
NOTE: This method can only be accessed from inside the **_system** database.
The is a policy control startup option `--server.support-info-api` that controls
if and to whom the API is made available.
:return: Deployment information.
:rtype: dict
:raise arango.exceptions.DatabaseSupportInfoError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/support-info")
def response_handler(resp: Response) -> Json:
if resp.is_success:
result: Json = resp.body
return result
raise DatabaseSupportInfoError(resp, request)
return self._execute(request, response_handler)
[docs]
class StandardDatabase(Database):
"""Standard database API wrapper."""
def __init__(self, connection: Connection) -> None:
super().__init__(connection=connection, executor=DefaultApiExecutor(connection))
def __repr__(self) -> str:
return f"<StandardDatabase {self.name}>"
[docs]
def begin_async_execution(self, return_result: bool = True) -> "AsyncDatabase":
"""Begin async execution.
:param return_result: If set to True, API executions return instances
of :class:`arango.job.AsyncJob`, which you can use to retrieve
results from server once available. If set to False, API executions
return None and no results are stored on server.
:type return_result: bool
:return: Database API wrapper object specifically for async execution.
:rtype: arango.database.AsyncDatabase
"""
return AsyncDatabase(self._conn, return_result)
[docs]
def begin_batch_execution(
self,
return_result: bool = True,
max_workers: Optional[int] = 1,
) -> "BatchDatabase":
"""Begin batch execution.
.. warning::
The batch request API is deprecated since ArangoDB 3.8.0.
This functionality should no longer be used.
To send multiple documents at once to an ArangoDB instance,
please use any of :class:`arango.collection.Collection` methods
that accept a list of documents as input.
See :func:`~arango.collection.Collection.insert_many`,
:func:`~arango.collection.Collection.update_many`,
:func:`~arango.collection.Collection.replace_many`,
:func:`~arango.collection.Collection.delete_many`.
:param return_result: If set to True, API executions return instances
of :class:`arango.job.BatchJob` that are populated with results on
commit. If set to False, API executions return None and no results
are tracked client-side.
:type return_result: bool
:param max_workers: Maximum number of workers to use for submitting
requests asynchronously. If None, the default value is the minimum
between `os.cpu_count()` and the number of requests.
:type max_workers: Optional[int]
:return: Database API wrapper object specifically for batch execution.
:rtype: arango.database.BatchDatabase
"""
return BatchDatabase(self._conn, return_result, max_workers)
[docs]
def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""Fetch an existing transaction.
:param transaction_id: The ID of the existing transaction.
:type transaction_id: str
"""
return TransactionDatabase(connection=self._conn, transaction_id=transaction_id)
[docs]
def begin_transaction(
self,
read: Union[str, Sequence[str], None] = None,
write: Union[str, Sequence[str], None] = None,
exclusive: Union[str, Sequence[str], None] = None,
sync: Optional[bool] = None,
allow_implicit: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_size: Optional[int] = None,
skip_fast_lock_round: Optional[bool] = None,
) -> "TransactionDatabase":
"""Begin a transaction.
:param read: Name(s) of collections read during transaction. Read-only
collections are added lazily but should be declared if possible to
avoid deadlocks.
:type read: str | [str] | None
:param write: Name(s) of collections written to during transaction with
shared access.
:type write: str | [str] | None
:param exclusive: Name(s) of collections written to during transaction
with exclusive access.
:type exclusive: str | [str] | None
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param allow_implicit: Allow reading from undeclared collections.
:type allow_implicit: bool | None
:param lock_timeout: Timeout for waiting on collection locks. If not
given, a default value is used. Setting it to 0 disables the
timeout.
:type lock_timeout: int | None
:param max_size: Max transaction size in bytes.
:type max_size: int | None
:param skip_fast_lock_round: Whether to disable fast locking for write
operations.
:type skip_fast_lock_round: bool | None
:return: Database API wrapper object specifically for transactions.
:rtype: arango.database.TransactionDatabase
"""
return TransactionDatabase(
connection=self._conn,
read=read,
write=write,
exclusive=exclusive,
sync=sync,
allow_implicit=allow_implicit,
lock_timeout=lock_timeout,
max_size=max_size,
skip_fast_lock_round=skip_fast_lock_round,
)
[docs]
def begin_controlled_execution(
self, max_queue_time_seconds: Optional[float] = None
) -> "OverloadControlDatabase":
"""Begin a controlled connection, with options to handle server-side overload.
:param max_queue_time_seconds: Maximum time in seconds a request can be queued
on the server-side. If set to 0 or None, the server ignores this setting.
:type max_queue_time_seconds: Optional[float]
:return: Database API wrapper object specifically for queue bounded execution.
:rtype: arango.database.OverloadControlDatabase
"""
return OverloadControlDatabase(self._conn, max_queue_time_seconds)
[docs]
class AsyncDatabase(Database):
"""Database API wrapper tailored specifically for async execution.
See :func:`arango.database.StandardDatabase.begin_async_execution`.
:param connection: HTTP connection.
:param return_result: If set to True, API executions return instances of
:class:`arango.job.AsyncJob`, which you can use to retrieve results
from server once available. If set to False, API executions return None
and no results are stored on server.
:type return_result: bool
"""
def __init__(self, connection: Connection, return_result: bool) -> None:
self._executor: AsyncApiExecutor
super().__init__(
connection=connection, executor=AsyncApiExecutor(connection, return_result)
)
def __repr__(self) -> str:
return f"<AsyncDatabase {self.name}>"
[docs]
class BatchDatabase(Database):
"""Database API wrapper tailored specifically for batch execution.
.. note::
This class is not intended to be instantiated directly.
See
:func:`arango.database.StandardDatabase.begin_batch_execution`.
:param connection: HTTP connection.
:param return_result: If set to True, API executions return instances of
:class:`arango.job.BatchJob` that are populated with results on commit.
If set to False, API executions return None and no results are tracked
client-side.
:type return_result: bool
:param max_workers: Use a thread pool of at most `max_workers`.
:type max_workers: Optional[int]
"""
def __init__(
self, connection: Connection, return_result: bool, max_workers: Optional[int]
) -> None:
self._executor: BatchApiExecutor
super().__init__(
connection=connection,
executor=BatchApiExecutor(connection, return_result, max_workers),
)
warn(
"The batch request API is deprecated since ArangoDB version 3.8.0.",
FutureWarning,
stacklevel=3,
)
def __repr__(self) -> str:
return f"<BatchDatabase {self.name}>"
def __enter__(self) -> "BatchDatabase":
return self
def __exit__(self, exception: Exception, *_: Any) -> None:
if exception is None:
self._executor.commit()
[docs]
def queued_jobs(self) -> Optional[Sequence[BatchJob[Any]]]:
"""Return the queued batch jobs.
:return: Queued batch jobs or None if **return_result** parameter was
set to False during initialization.
:rtype: [arango.job.BatchJob] | None
"""
return self._executor.jobs
[docs]
def commit(self) -> Optional[Sequence[BatchJob[Any]]]:
"""Execute the queued requests in a single batch API request.
If **return_result** parameter was set to True during initialization,
:class:`arango.job.BatchJob` instances are populated with results.
:return: Batch jobs, or None if **return_result** parameter was set to
False during initialization.
:rtype: [arango.job.BatchJob] | None
:raise arango.exceptions.BatchStateError: If batch state is invalid
(e.g. batch was already committed or the response size did not
match expected).
:raise arango.exceptions.BatchExecuteError: If commit fails.
"""
return self._executor.commit()
[docs]
class TransactionDatabase(Database):
"""Database API wrapper tailored specifically for transactions.
See :func:`arango.database.StandardDatabase.begin_transaction`.
:param connection: HTTP connection.
:param read: Name(s) of collections read during transaction. Read-only
collections are added lazily but should be declared if possible to
avoid deadlocks.
:type read: str | [str] | None
:param write: Name(s) of collections written to during transaction with
shared access.
:type write: str | [str] | None
:param exclusive: Name(s) of collections written to during transaction
with exclusive access.
:type exclusive: str | [str] | None
:param sync: Block until operation is synchronized to disk.
:type sync: bool | None
:param allow_implicit: Allow reading from undeclared collections.
:type allow_implicit: bool | None
:param lock_timeout: Timeout for waiting on collection locks. If not given,
a default value is used. Setting it to 0 disables the timeout.
:type lock_timeout: int | None
:param max_size: Max transaction size in bytes.
:type max_size: int | None
:param transaction_id: Initialize using an existing transaction instead of creating
a new transaction.
:type transaction_id: str | None
:param skip_fast_lock_round: Whether to disable fast locking for write operations.
:type skip_fast_lock_round: bool | None
"""
def __init__(
self,
connection: Connection,
read: Union[str, Sequence[str], None] = None,
write: Union[str, Sequence[str], None] = None,
exclusive: Union[str, Sequence[str], None] = None,
sync: Optional[bool] = None,
allow_implicit: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_size: Optional[int] = None,
transaction_id: Optional[str] = None,
skip_fast_lock_round: Optional[bool] = None,
) -> None:
self._executor: TransactionApiExecutor
super().__init__(
connection=connection,
executor=TransactionApiExecutor(
connection=connection,
read=read,
write=write,
exclusive=exclusive,
sync=sync,
allow_implicit=allow_implicit,
lock_timeout=lock_timeout,
max_size=max_size,
transaction_id=transaction_id,
skip_fast_lock_round=skip_fast_lock_round,
),
)
def __repr__(self) -> str:
return f"<TransactionDatabase {self.name}>"
@property
def transaction_id(self) -> str:
"""Return the transaction ID.
:return: Transaction ID.
:rtype: str
"""
return self._executor.id
[docs]
def transaction_status(self) -> str:
"""Return the transaction status.
:return: Transaction status.
:rtype: str
:raise arango.exceptions.TransactionStatusError: If retrieval fails.
"""
return self._executor.status()
[docs]
def commit_transaction(self) -> bool:
"""Commit the transaction.
:return: True if commit was successful.
:rtype: bool
:raise arango.exceptions.TransactionCommitError: If commit fails.
"""
return self._executor.commit()
[docs]
def abort_transaction(self) -> bool:
"""Abort the transaction.
:return: True if the abort operation was successful.
:rtype: bool
:raise arango.exceptions.TransactionAbortError: If abort fails.
"""
return self._executor.abort()
[docs]
class OverloadControlDatabase(Database):
"""Database API wrapper tailored to gracefully handle server overload scenarios.
See :func:`arango.database.StandardDatabase.begin_controlled_execution`.
:param connection: HTTP connection.
:param max_queue_time_seconds: Maximum server-side queuing time in seconds.
If the server-side queuing time exceeds the client's specified limit,
the request will be rejected.
:type max_queue_time_seconds: Optional[float]
"""
def __init__(
self, connection: Connection, max_queue_time_seconds: Optional[float] = None
) -> None:
self._executor: OverloadControlApiExecutor
super().__init__(
connection=connection,
executor=OverloadControlApiExecutor(connection, max_queue_time_seconds),
)
def __repr__(self) -> str: # pragma: no cover
return f"<OverloadControlDatabase {self.name}>"
@property
def last_queue_time(self) -> float:
"""Return the most recently recorded server-side queuing time in seconds.
:return: Server-side queuing time in seconds.
:rtype: float
"""
return self._executor.queue_time_seconds
@property
def max_queue_time(self) -> Optional[float]:
"""Return the maximum server-side queuing time in seconds.
:return: Maximum server-side queuing time in seconds.
:rtype: Optional[float]
"""
return self._executor.max_queue_time_seconds
[docs]
def adjust_max_queue_time(self, max_queue_time_seconds: Optional[float]) -> None:
"""Adjust the maximum server-side queuing time in seconds.
:param max_queue_time_seconds: New maximum server-side queuing time
in seconds. Setting it to None disables the limit.
:type max_queue_time_seconds: Optional[float]
"""
self._executor.max_queue_time_seconds = max_queue_time_seconds