Source code for k8s.watcher

#!/usr/bin/env python
# -*- coding: utf-8

# Copyright 2017-2019 The FIAAS Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import cachetools

from .base import APIServerError, WatchEvent

DEFAULT_CAPACITY = 1000


[docs] class Watcher(object): """Higher-level interface to watch for changes in objects The low-level :py:meth:`~.watch_list` method will stop when the API-server drops the connection. When reconnecting using that method, the API-server will send a list of :py:const:`~k8s.base.WatchEvent.ADDED` events for all objects, even if they have been seen before. The Watcher will hide this complexity for you, and make sure to reconnect when the connection drops, and skip events that have already been seen. It additionally uses bookmarks to avoid the increased load that might be caused by reconnecting. :param Model model: The model class to watch :param int capacity: How many seen objects to keep track of """ def __init__(self, model, capacity=DEFAULT_CAPACITY): self._seen = cachetools.LRUCache(capacity) self._model = model self._run_forever = True
[docs] def watch(self, namespace=None): """Watch for events :param str namespace: the namespace to watch for events in. The default (None) results in watching for events in all namespaces. :return: a generator that yields :py:class:`~.WatchEvent` objects not seen before """ # last_seen_resource_version is used to resume the watch from the last seen event. # Only used on reconnects, the first call to watch does a quorum read. last_seen_resource_version = None while self._run_forever: try: for event in self._model.watch_list( namespace=namespace, resource_version=last_seen_resource_version, allow_bookmarks=True ): last_seen_resource_version = event.resource_version if self._should_yield(event): yield event except APIServerError as e: # A 410 response indicates our resourceVersion is too old, and we need to do a new quorum read. if e.api_error["code"] == 410: last_seen_resource_version = None else: raise
def _should_yield(self, event) -> bool: """Check if this is a new event, and if so, mark it as seen""" if not event.has_object(): return False o = event.object key = (o.metadata.name, o.metadata.namespace) if self._seen.get(key) == o.metadata.resourceVersion and event.type != WatchEvent.DELETED: return False self._seen[key] = o.metadata.resourceVersion return True