|
| 1 | +import requests |
| 2 | +from tabulate import tabulate |
| 3 | + |
| 4 | +def check_config(): |
| 5 | + """ |
| 6 | + Check the configuration |
| 7 | + :return: bool |
| 8 | + """ |
| 9 | + results = [] |
| 10 | + try: |
| 11 | + import config.config as config |
| 12 | + if check_exist(config, ["llm_api_impl", "api_config", "gpt_message", |
| 13 | + "gitlab_server_url", "gitlab_private_token", "dingding_bot_webhook", "dingding_secret"]): |
| 14 | + results.append(["Configuration parameter existence", "Passed", "", "✅ Required parameters are available."]) |
| 15 | + else: |
| 16 | + results.append(["Configuration parameter existence", "Failed", "Required parameters are missing", "❌ Required parameters are missing"]) |
| 17 | + return print_results(results) |
| 18 | + |
| 19 | + api_check = check_api_config(config) |
| 20 | + if api_check['passed']: |
| 21 | + results.append(["API interface", "Passed", "", "✅ Model invocation can be used."]) |
| 22 | + else: |
| 23 | + results.append(["API interface", "Failed", "\n".join(api_check['errors']), "❌ Model invocation cannot be used."]) |
| 24 | + |
| 25 | + gitlab_check = check_gitlab_config(config) |
| 26 | + if gitlab_check['passed']: |
| 27 | + results.append(["Gitlab configuration", "Passed", "", "✅ Code review function can be used.\n✅ Comment function can be used."]) |
| 28 | + else: |
| 29 | + results.append(["Gitlab configuration", "Failed", |
| 30 | + "\n".join(gitlab_check['errors']), |
| 31 | + "❌ Code review function cannot be used.\n❌ Comment function cannot be used."]) |
| 32 | + dingding_check = check_dingding_config(config) |
| 33 | + if dingding_check['passed']: |
| 34 | + results.append(["Dingding configuration", "Passed", "", "✅ Notification on Dingtalk function can be used."]) |
| 35 | + else: |
| 36 | + results.append(["Dingding configuration", "Failed", |
| 37 | + "\n".join(dingding_check['errors']), |
| 38 | + "⚠️ Notification on Dingtalk function cannot be used."]) |
| 39 | + except ImportError: |
| 40 | + results.append(["Configuration file", "Failed", "config.py not found", |
| 41 | + "❌ Cannot run any Service, please create a config.py file"]) |
| 42 | + return print_results(results) |
| 43 | + except Exception as e: |
| 44 | + results.append(["Configuration file", "Failed", f"Error loading config.py: {e}", |
| 45 | + "❌ Cannot run any Service, please check config.py file"]) |
| 46 | + return print_results(results) |
| 47 | + |
| 48 | + return print_results(results) |
| 49 | + |
| 50 | +def check_dingding_config(config): |
| 51 | + """ |
| 52 | + Check the dingding configuration |
| 53 | + :return: dict |
| 54 | + """ |
| 55 | + result = {'passed': True, 'errors': []} |
| 56 | + try: |
| 57 | + from utils.dingding import send_dingtalk_message_by_sign |
| 58 | + response = send_dingtalk_message_by_sign("连通性测试:测试消息,请勿回复。") |
| 59 | + if not response: |
| 60 | + error_msg = "Dingding configuration is invalid" |
| 61 | + result['errors'].append(error_msg) |
| 62 | + result['passed'] = False |
| 63 | + |
| 64 | + except Exception as e: |
| 65 | + result['errors'].append(str(e)) |
| 66 | + result['passed'] = False |
| 67 | + |
| 68 | + return result |
| 69 | + |
| 70 | +def check_gitlab_config(config): |
| 71 | + """ |
| 72 | + Check the gitlab configuration |
| 73 | + :return: dict |
| 74 | + """ |
| 75 | + result = {'passed': True, 'errors': []} |
| 76 | + try: |
| 77 | + response = requests.get(config.gitlab_server_url) |
| 78 | + if response.status_code != 200: |
| 79 | + error_msg = f"Gitlab server URL {config.gitlab_server_url} is not available" |
| 80 | + result['errors'].append(error_msg) |
| 81 | + result['passed'] = False |
| 82 | + |
| 83 | + response = requests.get(f"{config.gitlab_server_url}/api/v4/projects", |
| 84 | + headers={"PRIVATE-TOKEN": config.gitlab_private_token}) |
| 85 | + if response.status_code != 200: |
| 86 | + error_msg = "Gitlab private token is invalid" |
| 87 | + result['errors'].append(error_msg) |
| 88 | + result['passed'] = False |
| 89 | + |
| 90 | + except Exception as e: |
| 91 | + result['errors'].append(str(e)) |
| 92 | + result['passed'] = False |
| 93 | + |
| 94 | + return result |
| 95 | + |
| 96 | +def check_api_config(config): |
| 97 | + """ |
| 98 | + Check the API configuration |
| 99 | + :return: dict |
| 100 | + """ |
| 101 | + result = {'passed': True, 'errors': []} |
| 102 | + try: |
| 103 | + from llm_api.load_api import create_llm_api_instance |
| 104 | + api = create_llm_api_instance() |
| 105 | + api.set_config(config.api_config) |
| 106 | + api.generate_text([ |
| 107 | + {"role": "system", |
| 108 | + "content": "你是一个有用的助手" |
| 109 | + }, |
| 110 | + {"role": "user", |
| 111 | + "content": "请输出ok两个小写字母,不要输出其他任何内容", |
| 112 | + } |
| 113 | + ]) |
| 114 | + res_str = api.get_respond_content() |
| 115 | + if not res_str or res_str == "": |
| 116 | + error_msg = "Model interface check failed: Please check if the model call related configuration is correct" |
| 117 | + result['errors'].append(error_msg) |
| 118 | + result['passed'] = False |
| 119 | + elif "ok" not in res_str: |
| 120 | + warning_msg = "Model interface check failed: The model did not return the expected result, but may still be available" |
| 121 | + result['errors'].append(warning_msg) |
| 122 | + result['passed'] = False |
| 123 | + |
| 124 | + except Exception as e: |
| 125 | + result['errors'].append(str(e)) |
| 126 | + result['passed'] = False |
| 127 | + |
| 128 | + return result |
| 129 | + |
| 130 | +def check_exist(config, arg_names): |
| 131 | + """ |
| 132 | + Check if the variable is defined |
| 133 | + :param arg_names: variable name list |
| 134 | + :return: bool |
| 135 | + """ |
| 136 | + res = True |
| 137 | + errors = [] |
| 138 | + for arg_name in arg_names: |
| 139 | + if not hasattr(config, arg_name): |
| 140 | + errors.append(f"{arg_name} not found in config.py") |
| 141 | + res = False |
| 142 | + if errors: |
| 143 | + print("\n".join(errors)) |
| 144 | + return res |
| 145 | + |
| 146 | +def wrap_text(text, width): |
| 147 | + """ |
| 148 | + Wrap text to a specified width. |
| 149 | + :param text: The text to wrap. |
| 150 | + :param width: The maximum width of each line. |
| 151 | + :return: The wrapped text. |
| 152 | + """ |
| 153 | + if not text: |
| 154 | + return "" |
| 155 | + lines = [] |
| 156 | + while len(text) > width: |
| 157 | + # Find the last space within the width limit |
| 158 | + wrap_at = text.rfind(' ', 0, width) |
| 159 | + if wrap_at == -1: |
| 160 | + wrap_at = width |
| 161 | + lines.append(text[:wrap_at]) |
| 162 | + text = text[wrap_at:].lstrip() |
| 163 | + lines.append(text) |
| 164 | + return "\n".join(lines) |
| 165 | + |
| 166 | +def print_results(results): |
| 167 | + """ |
| 168 | + Print the results in a tabulated format |
| 169 | + :param results: list of lists containing the check results |
| 170 | + """ |
| 171 | + wrapped_results = [] |
| 172 | + for result in results: |
| 173 | + wrapped_result = [wrap_text(result[0], 30), wrap_text(result[1], 10), |
| 174 | + wrap_text(result[2], 50), result[3]] |
| 175 | + wrapped_results.append(wrapped_result) |
| 176 | + table = tabulate(wrapped_results, headers=["Check", "Status", "Details", "Influence Service"], tablefmt="grid", stralign="left") |
| 177 | + print(table) |
| 178 | + return all(result[1] == "Passed" for result in results) |
| 179 | + |
| 180 | +# 示例调用 |
| 181 | +if __name__ == "__main__": |
| 182 | + if check_config(): |
| 183 | + print("All configuration checks passed") |
| 184 | + else: |
| 185 | + print("Some configuration checks failed") |
0 commit comments