# encoding: utf-8 from typing import Optional, Awaitable from prometheus_client import Gauge from prometheus_client.core import CollectorRegistry from prometheus_client.exposition import choose_encoder import tornado.ioloop import tornado.web import os import tornado.gen from datetime import datetime from kubernetes import client import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) api_token = os.getenv('API_TOKEN') api_addr = os.getenv('API_ADDR') namespace = os.getenv('NAMESPACE') def date_time_to_date(date): """ 专门给k8s api使用 获取到的时间对象+8小时后转换为日期字符串 :param date: 2022-06-29 04:13:33+00:00 :return: 2022-06-29 04:13:33+00:00 +8 小时并且 转换 str """ from datetime import timedelta, datetime new_data = date + timedelta(hours=8) new_data = datetime.strftime(new_data, '%Y-%m-%d %H:%M') return new_data def list_namespaced_deployment(api_addr, api_token, namespace): """ 获取连接 :return: conn """ configuration = client.Configuration() setattr(configuration, 'verify_ssl', False) client.Configuration.set_default(configuration) configuration.host = api_addr configuration.verify_ssl = False configuration.debug = False configuration.api_key = {"authorization": "Bearer " + api_token} conn = client.Configuration.set_default(configuration) _api_client = client.AppsV1Api(api_client=conn) req = _api_client.list_namespaced_deployment(namespace).items deployment_list = [] for i in req: create_time = date_time_to_date(i.metadata.creation_timestamp) ns = { "name": i.metadata.name, "image": i.spec.template.spec.containers[0].image, "namespace": i.metadata.namespace, "labels": i.spec.template.metadata.labels, "create_time": create_time, "run": i.status.replicas if i.status.replicas else 0, "status": i.status.conditions[0].status, } deployment_list.append(ns) return deployment_list class ExporterApi: def __init__(self): # 注册收集器&最大耗时map self.collector_registry = CollectorRegistry(auto_describe=False) # 接口调用summary统计 self.deployment_list = Gauge(name="deployment", documentation="Num of request time summary", labelnames=("name", "image", "create_time", "run"), registry=self.collector_registry) # 获取/metrics结果 def get_prometheus_metrics_info(self, handler): encoder, content_type = choose_encoder(handler.request.headers.get('accept')) handler.set_header("Content-Type", content_type) handler.write(encoder(self.collector_registry)) # summary统计 def set_prometheus_request_summary(self): data = list_namespaced_deployment(api_addr=api_addr, api_token=api_token, namespace=namespace) for i in data: print(i) name = i.get("name") image = i.get("image") create_time = i.get("create_time") run = i.get("run") self.deployment_list.labels(name, image, create_time, run).set("1") global g_monitor class MetricsHandler(tornado.web.RequestHandler): def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: pass def get(self): print('INFO', datetime.now(), "/metrics Get.") g_monitor.set_prometheus_request_summary() # 通过Metrics接口返回统计结果 g_monitor.get_prometheus_metrics_info(self) if __name__ == "__main__": try: g_monitor = ExporterApi() app = tornado.web.Application([(r"/metrics?", MetricsHandler), ]) app.listen(80) print("listen:80") tornado.ioloop.IOLoop.current().start() except Exception as e: print(e)