__all__ = ["Cluster"]
from typing import List, Optional
from arango.api import ApiGroup
from arango.exceptions import (
ClusterEndpointsError,
ClusterHealthError,
ClusterMaintenanceModeError,
ClusterRebalanceError,
ClusterServerCountError,
ClusterServerEngineError,
ClusterServerIDError,
ClusterServerRoleError,
ClusterServerStatisticsError,
ClusterServerVersionError,
)
from arango.formatter import format_body
from arango.request import Request
from arango.response import Response
from arango.result import Result
from arango.typings import Json
[docs]class Cluster(ApiGroup): # pragma: no cover
[docs] def server_id(self) -> Result[str]:
"""Return the server ID.
:return: Server ID.
:rtype: str
:raise arango.exceptions.ClusterServerIDError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/server/id")
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["id"])
raise ClusterServerIDError(resp, request)
return self._execute(request, response_handler)
[docs] def server_role(self) -> Result[str]:
"""Return the server role.
:return: Server role. Possible values are "SINGLE" (server which is
not in a cluster), "COORDINATOR" (cluster coordinator), "PRIMARY",
"SECONDARY", "AGENT" (Agency server in a cluster) or "UNDEFINED".
:rtype: str
:raise arango.exceptions.ClusterServerRoleError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/server/role")
def response_handler(resp: Response) -> str:
if resp.is_success:
return str(resp.body["role"])
raise ClusterServerRoleError(resp, request)
return self._execute(request, response_handler)
[docs] def server_version(self, server_id: str) -> Result[Json]:
"""Return the version of the given server.
:param server_id: Server ID.
:type server_id: str
:return: Version of the given server.
:rtype: dict
:raise arango.exceptions.ClusterServerVersionError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_admin/cluster/nodeVersion",
params={"ServerID": server_id},
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ClusterServerVersionError(resp, request)
return self._execute(request, response_handler)
[docs] def server_engine(self, server_id: str) -> Result[Json]:
"""Return the engine details for the given server.
:param server_id: Server ID.
:type server_id: str
:return: Engine details of the given server.
:rtype: dict
:raise arango.exceptions.ClusterServerEngineError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_admin/cluster/nodeEngine",
params={"ServerID": server_id},
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ClusterServerEngineError(resp, request)
return self._execute(request, response_handler)
[docs] def server_count(self) -> Result[int]:
"""Return the number of servers in the cluster.
:return: Number of servers in the cluster.
:rtype: int
:raise arango.exceptions.ClusterServerCountError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/cluster/numberOfServers")
def response_handler(resp: Response) -> int:
if resp.is_success:
result: int = resp.body
return result
raise ClusterServerCountError(resp, request)
return self._execute(request, response_handler)
[docs] def server_statistics(self, server_id: str) -> Result[Json]:
"""Return the statistics for the given server.
:param server_id: Server ID.
:type server_id: str
:return: Statistics for the given server.
:rtype: dict
:raise arango.exceptions.ClusterServerStatisticsError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_admin/cluster/nodeStatistics",
params={"ServerID": server_id},
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ClusterServerStatisticsError(resp, request)
return self._execute(request, response_handler)
[docs] def health(self) -> Result[Json]:
"""Return the cluster health.
:return: Cluster health.
:rtype: dict
:raise arango.exceptions.ClusterHealthError: If retrieval fails.
"""
request = Request(
method="get",
endpoint="/_admin/cluster/health",
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ClusterHealthError(resp, request)
return self._execute(request, response_handler)
[docs] def toggle_maintenance_mode(self, mode: str) -> Result[Json]:
"""Enable or disable the cluster supervision (agency) maintenance mode.
:param mode: Maintenance mode. Allowed values are "on" and "off".
:type mode: str
:return: Result of the operation.
:rtype: dict
:raise arango.exceptions.ClusterMaintenanceModeError: If toggle fails.
"""
request = Request(
method="put",
endpoint="/_admin/cluster/maintenance",
data=f'"{mode}"',
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_body(resp.body)
raise ClusterMaintenanceModeError(resp, request)
return self._execute(request, response_handler)
[docs] def endpoints(self) -> Result[List[str]]:
"""Return coordinate endpoints. This method is for clusters only.
:return: List of endpoints.
:rtype: [str]
:raise arango.exceptions.ServerEndpointsError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/cluster/endpoints")
def response_handler(resp: Response) -> List[str]:
if not resp.is_success:
raise ClusterEndpointsError(resp, request)
return [item["endpoint"] for item in resp.body["endpoints"]]
return self._execute(request, response_handler)
[docs] def calculate_imbalance(self) -> Result[Json]:
"""Compute the current cluster imbalance, including
the amount of ongoing and pending move shard operations.
:return: Cluster imbalance information.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/cluster/rebalance")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result
return self._execute(request, response_handler)
[docs] def rebalance(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute and execute a cluster rebalance plan.
:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan that has been executed.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded
request = Request(method="put", endpoint="/_admin/cluster/rebalance", data=data)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result
return self._execute(request, response_handler)
[docs] def calculate_rebalance_plan(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute the cluster rebalance plan.
:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded
request = Request(
method="post", endpoint="/_admin/cluster/rebalance", data=data
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result
return self._execute(request, response_handler)
[docs] def execute_rebalance_plan(
self, moves: List[Json], version: int = 1
) -> Result[bool]:
"""Execute the given set of move shard operations.
You can use :meth:`Cluster.calculate_rebalance_plan` to calculate
these operations to improve the balance of shards, leader shards,
and follower shards.
:param moves: List of move shard operations.
:type moves: [dict]
:param version: Must be set to 1.
:type version: int
:return: True if the methods have been accepted and scheduled
for execution.
:rtype: bool
:raise: arango.exceptions.ClusterRebalanceError: If request fails.
"""
data: Json = dict(version=version, moves=moves)
request = Request(
method="post", endpoint="/_admin/cluster/rebalance/execute", data=data
)
def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: bool = resp.body["code"] == 202
return result
return self._execute(request, response_handler)