__all__ = ["WAL"]
from typing import Optional
from arango.api import ApiGroup
from arango.exceptions import (
WALConfigureError,
WALFlushError,
WALLastTickError,
WALPropertiesError,
WALTailError,
WALTickRangesError,
WALTransactionListError,
)
from arango.formatter import (
format_replication_header,
format_tick_values,
format_wal_properties,
format_wal_transactions,
)
from arango.request import Request
from arango.response import Response
from arango.result import Result
from arango.typings import Json, Params
[docs]
class WAL(ApiGroup): # pragma: no cover
"""WAL (Write-Ahead Log) API wrapper."""
[docs]
def properties(self) -> Result[Json]:
"""Return WAL properties.
:return: WAL properties.
:rtype: dict
:raise arango.exceptions.WALPropertiesError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/wal/properties")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_wal_properties(resp.body)
raise WALPropertiesError(resp, request)
return self._execute(request, response_handler)
[docs]
def transactions(self) -> Result[Json]:
"""Return details on currently running WAL transactions.
Fields in the returned details are as follows:
.. code-block:: none
"last_collected" : ID of the last collected log file (at the
start of each running transaction) or None
if no transactions are running.
"last_sealed" : ID of the last sealed log file (at the start
of each running transaction) or None if no
transactions are running.
"count" : Number of currently running transactions.
:return: Details on currently running WAL transactions.
:rtype: dict
:raise arango.exceptions.WALTransactionListError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/wal/transactions")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_wal_transactions(resp.body)
raise WALTransactionListError(resp, request)
return self._execute(request, response_handler)
[docs]
def flush(self, sync: bool = True, garbage_collect: bool = True) -> Result[bool]:
"""Synchronize WAL to disk.
:param sync: Block until the synchronization is complete.
:type sync: bool | None
:param garbage_collect: Block until flushed data is garbage collected.
:type garbage_collect: bool
:return: True if WAL was flushed successfully.
:rtype: bool
:raise arango.exceptions.WALFlushError: If flush operation fails.
"""
request = Request(
method="put",
endpoint="/_admin/wal/flush",
params={"waitForSync": sync, "waitForCollector": garbage_collect},
)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise WALFlushError(resp, request)
return self._execute(request, response_handler)
[docs]
def tick_ranges(self) -> Result[Json]:
"""Return the available ranges of tick values for all WAL files.
:return: Ranges of tick values.
:rtype: dict
:raise arango.exceptions.WALTickRangesError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/wal/range")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_tick_values(resp.body)
raise WALTickRangesError(resp, request)
return self._execute(request, response_handler)
[docs]
def last_tick(self) -> Result[Json]:
"""Return the last available tick value (last successful operation).
:return: Last tick value in the WAL.
:rtype: dict
:raise arango.exceptions.WALLastTickError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_api/wal/lastTick")
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_tick_values(resp.body)
raise WALLastTickError(resp, request)
return self._execute(request, response_handler)
[docs]
def tail(
self,
lower: Optional[str] = None,
upper: Optional[str] = None,
last_scanned: Optional[str] = None,
all_databases: Optional[bool] = None,
chunk_size: Optional[int] = None,
syncer_id: Optional[int] = None,
server_id: Optional[int] = None,
client_info: Optional[str] = None,
barrier_id: Optional[int] = None,
deserialize: bool = False,
) -> Result[Json]:
"""Fetch recent WAL operations.
:param lower: Exclusive lower bound tick value. On successive calls to
this method you should set this to the value of "last_included"
from previous call (unless the value was 0).
:type lower: str | None
:param upper: Inclusive upper bound tick value for results.
:type upper: str | None
:param last_scanned: On successive calls to this method you should
set this to the value of "last_scanned" from previous call (or 0
on first try). This allows the rocksdb engine to break up large
transactions over multiple responses.
:type last_scanned: str | None
:param all_databases: Whether operations for all databases should be
included. When set to False only the operations for the current
database are included. The value True is only valid on "_system"
database. The default is False.
:type all_databases: bool | None
:param chunk_size: Approximate maximum size of the returned result.
:type chunk_size: int | None
:param syncer_id: ID of the client used to tail results. The server
will use this to keep operations until the client has fetched them.
Must be a positive integer. Note this or **server_id** is required
to have a chance at fetching reading all operations with the
rocksdb storage engine.
:type syncer_id: int | None
:param server_id: ID of the client machine. If unset, the server will
use this to keep operations until the client has fetched them. Must
be a positive integer. Note this or **syncer_id** is required to
have a chance at fetching reading all operations with the rocksdb
storage engine.
:type server_id: int | None
:param client_info: Short description of the client, used for
informative purposes only.
:type client_info: str | None
:param barrier_id: ID of barrier used to keep WAL entries around.
:type barrier_id: int | None
:param deserialize: Deserialize the response content. Default is False.
:type deserialize: bool
:return: If **deserialize** is set to False, content is returned raw as
a string. If **deserialize** is set to True, it is deserialized and
returned as a list of dictionaries.
:rtype: dict
:raise arango.exceptions.WALTailError: If tail operation fails.
"""
params: Params = {}
if lower is not None:
params["from"] = lower
if upper is not None:
params["to"] = upper
if last_scanned is not None:
params["lastScanned"] = last_scanned
if all_databases is not None:
params["global"] = all_databases
if chunk_size is not None:
params["chunkSize"] = chunk_size
if syncer_id is not None:
params["syncerId"] = syncer_id
if server_id is not None:
params["serverId"] = server_id
if client_info is not None:
params["clientInfo"] = client_info
if barrier_id is not None:
params["barrierId"] = barrier_id
request = Request(
method="get", endpoint="/_api/wal/tail", params=params, deserialize=False
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
result = format_replication_header(resp.headers)
result["content"] = (
[
self._conn.deserialize(line)
for line in resp.body.split("\n")
if line
]
if deserialize
else resp.body
)
return result
raise WALTailError(resp, request)
return self._execute(request, response_handler)