Important Notice: this service will be discontinued by the end of 2024 because for multiple years now, Plume is no longer under active/continuous development. Sadly each time there was hope, active development came to a stop again. Please consider using our Writefreely instance instead.

Migration.py - Cleanup Methode und Schließen von ungenutzten Indexen

Das Migration Script hinterlässt ältere Versionen in Indexen die wir später auch für ein Undo nutzen wollen. Allerdings ist ein Clean-Up auch nützlich, um alte Indexe zu verwerfen.

Einleitung

Es gibt ein paar Verbesserungen für das Migration.py Script und ich werde ein paar kleine Blog-Artikel zu den Verbesserungen schreiben.

Index close

Eine winzige Änderung ist das Schließen eines Index. Es ist zu empfehlen Indexe, die man nicht mehr zur Verfügung stellen will, zu schließen (siehe Close Index API). Das Gegenteil bietet die Index Open API.

Ich habe mal zwei Methoden im Migration-Script dafür angelegt:

def _close_index (client: OpenSearch, index_name: str):
    client.indices.close(index=index_name, ignore_unavailable=True)


def _open_index (client: OpenSearch, index_name: str):
    client.indices.open(index=index_name)

Die switch_version hat einen neuen optionalen Parameter erhalten, mit dem Indexnamen, den man schließen will. Das rufen wir von _run_create_index auf.

Ein Versuch auf einen geschlossenen Index zuzugreifen verursacht ein Bad Request (Exception in Python) und diese Response:

{
    "error": {
        "root_cause": [
            {
                "type": "index_closed_exception",
                "reason": "closed",
                "index": "toots_v1_002",
                "index_uuid": "o7OXHpaZQoebIqxhRdkz4g"
            }
        ],
        "type": "index_closed_exception",
        "reason": "closed",
        "index": "toots_v1_002",
        "index_uuid": "o7OXHpaZQoebIqxhRdkz4g"
    },
    "status": 400
}

Das ist die der Link zur: Commit-Version von Migration.py.

Clean-Up

Jeder Migrations-Schritt kann zu einem neuen Index führen, weil wir grundsätzlich bei “create index” eine neue Version anlegen und die Dokumente kopieren.

Wenn man die alten Daten nicht mehr nutzen will, wäre eine Hilfemethode sinnvoll, diese zu löschen.

Die Hilfsmethode muss etwas suchen, da wir keine wirkliche strukturierte Information haben, welche Indexe die Vorversionen eines Index sein soll. Wir haben nur ein Namensschema, das aussagt, dass ein Index mit _v#_# endet. Wir müssen uns also auf diese Namenskonvention verlassen. Um sich mit sowas sicherer zu fühlen, werden, per Default, nur geschlossene Index-Versionen gelöscht. Das kann man aber über einen Parameter abschalten.

Meine Anforderungen sehen als Signatur so aus:

def clean_up (client: OpenSearch, index_alias: str, keep_version: float = None, only_closed=True):
    """
    Removes older indexes.
    :param client: OpenSearch client
    :param index_alias: the Alias name
    :param keep_version: the version to keep (and all higher). None, if we keep the latest
    :param only_closed: delete/hide only the closed indexes
    :return:
    """
    pass

Es müssen erstmal ein paar Dinge in Erfahrung gebracht werden.

  1. Gibt es den Alias und ist dahinter eine Version?
  2. Welche Indexe gibt es mit Versionen, die zum Index-Alias-Prefix passen?
  3. Wenn eine keep_version mitgegeben wurde, gibt es diese? Wenn nicht, nehmen wir die aus Punkt 1

Sowas wie die aktuelle Version zu ermitteln, hatten wir schon in _run_create_index. Ich extrahiere das mal in eine Hilfsmethode:

def _get_current_version_index (client: OpenSearch, index_alias: str) -> str:
    alias_response = client.indices.get_alias(name=index_alias)

    # We hope, we have not a spanning alias, this means only a single result:
    if len(alias_response) != 1:
        raise RuntimeError(f"The alias {index_alias} is a spanning alias over more than one index")

    return next(iter(alias_response.keys()))

Um aus dem Namen die Version zu extrahieren, wieder eine Hilfsmethode:

def _parse_version (index_alias: str, index_name: str):
    match = re.match(index_alias + "_v([0-9_]+)", index_name)
    if match:
        return match.group(1).replace('_', '.')
    return None

Also Punkt 1 ist dann das hier:

    if not client.indices.exists_alias(name=index_alias):
        raise RuntimeError (f"Alias {index_alias} doesn't exists, cannot proceed with clean_up")

    index_current = _get_current_version_index(client, index_alias)
    version_str = _parse_version(index_alias, index_current)
    if not version_str:
        raise RuntimeError (f"Alias {index_alias} is not assigned to a version index {index_current}")

Für den zweiten Punkt suchen wir einfach im Cluster nach allen Indexen, die mit dem Namen des Alias anfangen. Die Methoden dazu sind vielfältig, aber eine ist ganz nett: http://localhost:9200/_cluster/state/metadata/toots_v*?expand_wildcards=all&filter_path=metadata.indices.*.state, was in Python so aussieht:

client.cluster.state(metric='metadata', index=index_alias + '_v*', expand_wildcards='all',
                         filter_path='metadata.indices.*.state')

Das liefert alle Indexe, ungefiltert nach unserem Alias-Prefix.

Zum Vergleich der Versionen nutze ich package.Version. Zusammengebaut sieht das dann so aus:

    found_indexes={}
    meta = client.cluster.state(metric='metadata', index=index_alias + '_v*', expand_wildcards='all',
                         filter_path='metadata.indices.*.state')
    for index_name in meta['metadata']['indices'].keys():
        v = _parse_version(index_alias, index_name)
        if v and (Version(v) < keep):
            if only_closed and meta['metadata']['indices'][index_name]['state'] == 'open':
                continue
            found_indexes[Version(v)]=index_name

Bei einer Migration bis Version 1.004 bekommt man entweder nix (weil toots_v1_002 nicht geschlossen ist) oder <Version('1.2')>: 'toots_v1_002' in found_indexes.

Das ist schon mal sehr schön, denn Punkt 3 haben wir damit nebenbei auch erledigt.

Jetzt können wir zur Tat schreiten. Alle Informationen, was wir zu tun haben, liegen vor.

    for version, index_name in found_indexes.items():
        client.indices.delete(index=index_name)

Man sieht, dass die eigentliche Funktion beeindruckend trivial ist und der Hauptfokus immer auf den Randbedingungen und Parametern liegt.

Der Aufruf der Methode Migration.clean_up (...) ist meistens manuell, am besten über eine Konsole. Man könnte auch ein CLI Tools dazu basteln. Die clean_up Funktion baue ich aber später in migration_history_prepare ein. D.h. für diesen Index unserer selbstverwalteten History räumen wir automatisch die Reste auf. Das ist aber ein extra Artikel, da wir das migration_history_prepare umbauen müssen, um ebenfalls eine vollständige Migration zu unterstützen.

Die aktuelle Version des Migration.py und der Commit

Update: Leider habe ich vergessen zu prüfen, ob die keep_version (wenn sie als Parameter übergeben wird), existiert. Das ist mit dem Commit 879c606663963fdabeb10aaf5977f01c057344af gefixed.

Mal wieder Danke, dass ihr so weit gefolgt seid.