#!/usr/bin/python3 import datetime import requests from resultsdb_api import ResultsDBapi, ResultsDBapiException, ResultsDBAuth class ResultsdbSender(object): def __init__(self): authmethod = ResultsDBAuth.basic_auth("resultsdbuser", "the real password") self.rdb_connection = ResultsDBapi("https://resultsdb.fedoraproject.org/api/v2.0/", request_auth=authmethod) def _check_testcase_exists(self, testcase): try: self.rdb_connection.get_testcase(testcase) except ResultsDBapiException: raise ValueError("Testcase was not found") def _create_testcase(self, testcase): self.rdb_connection.create_testcase(testcase) def _translate_outcome(self, message): testcase = message["topic"] if testcase.endswith(".error"): return "FAILED" if testcase.endswith(".queued"): return "QUEUED" if testcase.endswith(".running"): return "RUNNING" outcome_mapping = { "SUCCESS": "PASSED", "FAILURE": "FAILED", "UNSTABLE": "FAILED", # outcome values defined on message version 0.2.1 # https://pagure.io/fedora-ci/messages/tree/6715da742c6669fa725f615872291c58f82d3718 "PASSED": "PASSED", "FAILED": "FAILED", "NEEDS_INSPECTION": "FAILED", } if "status" not in message: return "FAILED" outcome = message["status"].upper() if outcome in outcome_mapping: return outcome_mapping[outcome] else: return "FAILED" # topic is the testcase # add all data as kv pairs def send(self, result): # Old version of CI messages didn't define namespace, type or category # topic was used for test case name if ( "namespace" in result and "test_type" in result and "category" in result ): testcase_name = ".".join( [result["namespace"], result["test_type"], result["category"]] ) else: testcase_name = result["topic"] # make sure that the testcase exists, if not, create it try: self._check_testcase_exists(testcase_name) except ValueError: self._create_testcase(testcase_name) # translate the outcome from CI to resultsdb-speak rdb_outcome = self._translate_outcome(result) # construct the data that is to be sent ref_url = result["build_url"] # send the result to resultsdb try: self.rdb_connection.create_result( outcome=rdb_outcome, testcase=testcase_name, group=None, note=None, ref_url=ref_url, **result ) except ResultsDBapiException: _log.exception("An error occured while uploading to resultsdb") def m2p(message): # ignore scratch builds if message["msg"]["artifact"].get("scratch"): _log.info("Ignoring scratch build - msg_id : %s", message["msg_id"]) return None # ignore invalid messages: for key in ["component", "nvr", "scratch", "issuer", "id", "type"]: if key not in message["msg"]["artifact"]: _log.info( "Ignoring message format, missing %s in 'artifact' " "- msg_id : %s", key, message["msg_id"], ) return None for key in ["url"]: if key not in message["msg"]["run"]: _log.info( "Ignoring message format, missing %s in 'run' " "- msg_id : %s", key, message["msg_id"], ) return None for key in ["namespace", "type", "category"]: if key not in message["msg"]["test"]: _log.info( "Ignoring message format, missing %s in 'test' " "- msg_id : %s", key, message["msg_id"], ) return None data = { "build_url": message["msg"]["run"]["url"], "repo": message["msg"]["artifact"]["component"], "nvr": message["msg"]["artifact"]["nvr"], "item": message["msg"]["artifact"]["nvr"], "scratch": message["msg"]["artifact"]["scratch"], "username": message["msg"]["artifact"]["issuer"], "koji_task_id": message["msg"]["artifact"]["id"], "namespace": message["msg"]["test"]["namespace"], "type": message["msg"]["artifact"]["type"].replace("-", "_"), "test_type": message["msg"]["test"]["type"], "category": message["msg"]["test"]["category"], } # pipeline.build field is optional in 0.2.X messages if "build" in message["msg"]["pipeline"]: data["build_id"] = message["msg"]["pipeline"]["build"] if "result" in message["msg"]["test"]: data["status"] = message["msg"]["test"]["result"] if "scenario" in message["msg"]["test"]: data["scenario"] = message["msg"]["test"]["scenario"] data["topic"] = message["topic"] data["msg_id"] = message["msg_id"] return data # get datagrepper data. it's paginated. of course. topics = ("org.centos.prod.ci.koji-build.test.complete", "org.centos.prod.ci.koji-build.test.error") toreport = [] for topic in topics: url = f"https://apps.fedoraproject.org/datagrepper/raw?topic={topic}&start=1708779383.0&end=1709157383.0" res = requests.get(url) toreport.extend(res.json()["raw_messages"]) print("Got datagrepper page 1...") if res.json()["pages"] > 1: for i in range(2, res.json()["pages"] + 1): newurl = f"{url}&page={i}" res = requests.get(newurl) toreport.extend(res.json()["raw_messages"]) print(f"Got datagrepper page {i} of {res.json()['pages']}") toreport = [msg for msg in toreport if msg["msg"]["version"] != "0.2.1"] sender = ResultsdbSender() for msg in toreport: payload = m2p(msg) if payload: url = f"https://resultsdb.fedoraproject.org/api/v2.0/results?item={payload['item']}&namespace={payload['namespace']}&type={payload['type']}&test_type={payload['test_type']}&topic={payload['topic']}" if "scenario" in payload: url += f"&scenario={payload['test_type']}" if requests.get(url).json()["data"]: print(f"Result already reported for {payload['item']} {payload['namespace']} {payload['type']} {payload['test_type']} {payload['topic']} {payload.get('scenario', '')}") else: print(f"Reporting {payload['msg_id']}...") sender.send(payload)