Coverage for src/configuration/openapi_tool_call.py: 71%
7 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-25 22:39 -0400
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-25 22:39 -0400
1# pylint: skip-file
2from jsonref import JsonRef # pylint: disable = import-error
4from common.models.openapi_path import OpenAPIOperation
5from configuration.openapi import OperationObject, PathItemObject
8def set_tool_call_spec(operation: OpenAPIOperation, operationJson: JsonRef):
9 pass
12def make_tool_call_spec(
13 path: PathItemObject, operation_object: OperationObject, http_verb: str
14) -> dict:
15 return {}
17 # @model_validator(mode="wrap")
18 # @classmethod
19 # def validate_model(cls, values, handler):
20 # if values["servers"] is None or values["servers"].empty():
21 # values["servers"] = [
22 # ServerObject(
23 # url=values["base_url"],
24 # description=None,
25 # variables=None,
26 # uuid=uuid5(namespace=NAMESPACE_URL, name="/"),
27 # oas_uuid=id.get(),
28 # )
29 # ]
31 # try:
32 # spec_id = values["openapi_spec_id"]
33 # except KeyError:
34 # raise ValueError(
35 # "openapi_spec_id must be passed first as a `uuid.UUID` object"
36 # )
38 # try:
39 # session = values["db_session"]
40 # except KeyError:
41 # raise ValueError(
42 # "db_session must be passed as the second positional keyword"
43 # )
45 # try:
46 # base_url = values["base_url"]
47 # except KeyError:
48 # raise ValueError("base_url must be passed as the third positional keyword")
50 # id_token = openapi_spec_id.set(values["openapi_spec_uuid"])
51 # config_info[spec_id]["session"] = session
52 # # session_token = context_session.set(values.pop("db_session"))
53 # # tag_uuids_token = context_tag_uuids.set({})
54 # try:
55 # return handler(values)
56 # finally:
57 # # context_session.reset(session_token)
58 # openapi_spec_id.reset(id_token)
59 # # tag_uuids_token.reset(tag_uuids_token)
61 # # TODO: Review & test this, high priority
62 # @staticmethod
63 # def _gen_func(
64 # servers: List[ServerObject],
65 # path: str,
66 # operation: OperationObject,
67 # operation_name: str,
68 # ) -> dict:
69 # if operation.servers is not None:
70 # servers = operation.servers
72 # func_name = path + "+" + operation_name
73 # func_description = operation.x_cuecode
74 # if func_description is None:
75 # func_description = operation.description
76 # if func_description is None:
77 # func_description = operation.summary
79 # params: dict = {}
80 # required = []
81 # if operation.parameters is not None:
82 # for param in operation.parameters:
84 # if not isinstance(param, ParameterObjectSchema):
85 # continue
87 # param_description = param.x_cuecode
88 # if param.description is None:
89 # param_description = param.description
90 # param_info: dict = {}
91 # if param.schema_ is not None:
92 # param_info = param.schema_
94 # if param_description is not None:
95 # param_info["description"] = param_description
96 # if param.examples is not None:
97 # param_info["examples"] = param.examples
99 # param_name = param.in_ + "/" + param.name
100 # params[param_name] = param_info
102 # if param.required:
103 # required.append(param_name)
105 # if operation.request_body is not None:
106 # request_body_required = False
107 # if operation.request_body.required:
108 # request_body_required = True
109 # required.append("requestBody")
111 # one_of: dict = {}
113 # for k, v in operation.request_body.content.items():
114 # param_info = {}
115 # if v.schema_ is not None:
116 # param_info = v.schema_
118 # one_of[k] = param_info
120 # request_body: dict = {"requestBody": {"type": "object", "oneOf": one_of}}
122 # out: dict = {}
124 # if len(servers) > 1:
125 # raise ValueError(
126 # "Per CueCode restrictions, each unique path must have one and only one server"
127 # )
129 # for server in servers:
130 # out[(server.uuid.int, server.url + path)] = {
131 # "type": "function",
132 # "function": {
133 # "name": server.url + func_name,
134 # "description": func_description,
135 # "parameters": {
136 # "type": "object",
137 # "properties": {**params, **request_body},
138 # },
139 # "required": required,
140 # },
141 # }
143 # return out
145 # def generate_tools(self) -> dict[int, list]:
146 # """generate function calls for the api"""
148 # out: List[dict] = []
150 # out2: defaultdict = defaultdict(list)
152 # server_stack = [self.servers]
154 # for path, path_item in self.paths.items():
155 # if path_item.servers is not None:
156 # server_stack.append(path_item.servers)
157 # if path_item.get is not None:
158 # result = self._gen_func(server_stack[-1], path, path_item.get, "get")
159 # for k, v in result.items():
160 # out2[k].append(v)
161 # if path_item.post is not None:
162 # result = self._gen_func(server_stack[-1], path, path_item.post, "post")
163 # for k, v in result.items():
164 # out2[k].append(v)
165 # if path_item.head is not None:
166 # result = self._gen_func(server_stack[-1], path, path_item.head, "head")
167 # for k, v in result.items():
168 # out2[k].append(v)
169 # if path_item.put is not None:
170 # result = self._gen_func(server_stack[-1], path, path_item.put, "put")
171 # for k, v in result.items():
172 # out2[k].append(v)
173 # if path_item.patch is not None:
174 # result = self._gen_func(
175 # server_stack[-1], path, path_item.patch, "patch"
176 # )
177 # for k, v in result.items():
178 # out2[k].append(v)
179 # if path_item.trace is not None:
180 # result = self._gen_func(
181 # server_stack[-1], path, path_item.trace, "trace"
182 # )
183 # for k, v in result.items():
184 # out2[k].append(v)
185 # return out2