Skip to content

agent_k.toolsets.kaggle

Kaggle API toolset for competition discovery and leaderboard access.

agent_k.toolsets.kaggle

Kaggle toolset for AGENT-K agents.

@notice: | Kaggle toolset for AGENT-K agents.

@dev: | See module for implementation details and extension points.

@graph: id: agent_k.toolsets.kaggle provides: - agent_k.toolsets.kaggle:kaggle_toolset - agent_k.toolsets.kaggle:kaggle_search_competitions - agent_k.toolsets.kaggle:kaggle_get_competition - agent_k.toolsets.kaggle:kaggle_get_leaderboard - agent_k.toolsets.kaggle:kaggle_list_datasets pattern: toolset

@similar: - id: agent_k.adapters.kaggle when: "Adapter implementation backing these tools."

@agent-guidance: do: - "Use agent_k.toolsets.kaggle as the canonical home for this capability." do_not: - "Create parallel modules without updating @similar or @graph."

@human-review: last-verified: 2026-01-26 owners: - agent-k-core

(c) Mike Casale 2025. Licensed under the MIT License.

KaggleDeps dataclass

Bases: BaseDeps

Dependencies for Kaggle toolsets.

@pattern: name: dependency-container rationale: "Groups Kaggle adapter and toolset settings." violations: "Direct adapter access bypasses shared settings."

@collaborators: required: - agent_k.adapters.kaggle:KaggleAdapter - agent_k.ui.agui:EventEmitter optional: - httpx:AsyncClient injection: constructor lifecycle: "Allocated per agent run."

Source code in agent_k/core/deps.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@dataclass(kw_only=True)
class KaggleDeps(BaseDeps):
    """Dependencies for Kaggle toolsets.

    @pattern:
        name: dependency-container
        rationale: "Groups Kaggle adapter and toolset settings."
        violations: "Direct adapter access bypasses shared settings."

    @collaborators:
        required:
            - agent_k.adapters.kaggle:KaggleAdapter
            - agent_k.ui.agui:EventEmitter
        optional:
            - httpx:AsyncClient
        injection: constructor
        lifecycle: "Allocated per agent run."
    """

    kaggle_adapter: KaggleAdapter
    max_results: int = 50

    @property
    def platform_adapter(self) -> KaggleAdapter:
        """Expose the Kaggle adapter as a platform adapter."""
        return self.kaggle_adapter
platform_adapter property
platform_adapter: KaggleAdapter

Expose the Kaggle adapter as a platform adapter.

P module-attribute

P = ParamSpec('P')

Parameter specification for tool wrapper callables.

ToolResultT module-attribute

ToolResultT = TypeVar('ToolResultT')

Type variable for tool result payloads.

with_tool_telemetry

with_tool_telemetry(
    *,
    task_id: str,
    tool_type: str,
    operation: str,
    error_response: Callable[[str], ToolResultT],
    result_summary: Callable[[ToolResultT], dict[str, Any]],
) -> Callable[
    [Callable[P, Awaitable[ToolResultT]]],
    Callable[P, Awaitable[ToolResultT]],
]

Wrap a tool function with standard telemetry and error handling.

Source code in agent_k/toolsets/kaggle.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def with_tool_telemetry(
    *,
    task_id: str,
    tool_type: str,
    operation: str,
    error_response: Callable[[str], ToolResultT],
    result_summary: Callable[[ToolResultT], dict[str, Any]],
) -> Callable[[Callable[P, Awaitable[ToolResultT]]], Callable[P, Awaitable[ToolResultT]]]:
    """Wrap a tool function with standard telemetry and error handling."""

    def decorator(func: Callable[P, Awaitable[ToolResultT]]) -> Callable[P, Awaitable[ToolResultT]]:
        @wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> ToolResultT:
            ctx_obj = args[0] if args else kwargs.get("ctx")
            if ctx_obj is None:
                raise RuntimeError("RunContext is required for tool telemetry")
            ctx = cast("RunContext[Any]", ctx_obj)
            tool_call_id = f"{task_id}_{id(ctx):x}"
            start_time = time.perf_counter()
            await _emit_tool_event(
                ctx,
                "emit_tool_start",
                task_id=task_id,
                tool_call_id=tool_call_id,
                tool_type=tool_type,
                operation=operation,
            )

            try:
                result = await func(*args, **kwargs)
            except Exception as exc:
                await _emit_tool_event(
                    ctx,
                    "emit_tool_error",
                    task_id=task_id,
                    tool_call_id=tool_call_id,
                    tool_type=tool_type,
                    operation=operation,
                    error=str(exc),
                )
                return error_response(str(exc))

            duration_ms = int((time.perf_counter() - start_time) * 1000)
            await _emit_tool_event(
                ctx,
                "emit_tool_result",
                task_id=task_id,
                tool_call_id=tool_call_id,
                tool_type=tool_type,
                operation=operation,
                result=result_summary(result),
                duration_ms=duration_ms,
            )
            return result

        return wrapper

    return decorator

kaggle_search_competitions async

kaggle_search_competitions(
    ctx: RunContext[Any],
    categories: Annotated[
        list[str] | None,
        Doc("Competition categories to filter."),
    ] = None,
    keywords: Annotated[
        list[str] | None, Doc("Keyword filters for search.")
    ] = None,
    min_prize: Annotated[
        int | None,
        Doc("Minimum prize pool in USD."),
        Range(0, 1000000000),
    ] = None,
    active_only: Annotated[
        bool, Doc("Only return active competitions.")
    ] = True,
) -> list[dict[str, Any]]

Search Kaggle for active competitions.

@notice: | Returns metadata for competitions matching search criteria.

@effects: io: - Kaggle API request state: - in-module cache

Source code in agent_k/toolsets/kaggle.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@kaggle_toolset.tool
@with_tool_telemetry(
    task_id="kaggle_search",
    tool_type="kaggle_mcp",
    operation="competitions.list",
    error_response=_error_list_response,
    result_summary=_search_summary,
)
async def kaggle_search_competitions(
    ctx: RunContext[Any],
    categories: Annotated[list[str] | None, Doc("Competition categories to filter.")] = None,
    keywords: Annotated[list[str] | None, Doc("Keyword filters for search.")] = None,
    min_prize: Annotated[int | None, Doc("Minimum prize pool in USD."), Range(0, 1_000_000_000)] = None,
    active_only: Annotated[bool, Doc("Only return active competitions.")] = True,
) -> list[dict[str, Any]]:
    """Search Kaggle for active competitions.

    @notice: |
        Returns metadata for competitions matching search criteria.

    @effects:
        io:
            - Kaggle API request
        state:
            - in-module cache
    """
    with logfire.span("kaggle_search_competitions", categories=categories, keywords=keywords):
        adapter = _require_adapter(ctx)

        competitions: list[dict[str, Any]] = []

        async for comp in adapter.search_competitions(
            categories=categories, keywords=keywords, min_prize=min_prize, active_only=active_only
        ):
            _store_competition(ctx, comp)
            competitions.append(
                {
                    "id": comp.id,
                    "title": comp.title,
                    "type": comp.competition_type.value,
                    "metric": comp.metric.value,
                    "days_remaining": comp.days_remaining,
                    "prize_pool": comp.prize_pool,
                    "tags": list(comp.tags) if comp.tags else [],
                    "is_active": comp.is_active,
                }
            )
            max_results = getattr(ctx.deps, "max_results", 50) or 50
            if len(competitions) >= max_results:
                break

        return competitions

kaggle_get_competition async

kaggle_get_competition(
    ctx: RunContext[Any],
    competition_id: Annotated[
        str, Doc("Competition identifier (slug).")
    ],
) -> dict[str, Any]

Get detailed information about a specific Kaggle competition.

@notice: | Fetches competition metadata and caches results.

Source code in agent_k/toolsets/kaggle.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@kaggle_toolset.tool
@with_tool_telemetry(
    task_id="kaggle_competition",
    tool_type="kaggle_mcp",
    operation="competitions.get",
    error_response=_error_dict_response,
    result_summary=_competition_summary,
)
async def kaggle_get_competition(
    ctx: RunContext[Any], competition_id: Annotated[str, Doc("Competition identifier (slug).")]
) -> dict[str, Any]:
    """Get detailed information about a specific Kaggle competition.

    @notice: |
        Fetches competition metadata and caches results.
    """
    with logfire.span("kaggle_get_competition", competition_id=competition_id):
        adapter = _require_adapter(ctx)

        if competition_id in _cache:
            comp = _cache[competition_id]
        else:
            comp = await adapter.get_competition(competition_id)
            _store_competition(ctx, comp)

        return _serialize_competition(comp)

kaggle_get_leaderboard async

kaggle_get_leaderboard(
    ctx: RunContext[Any],
    competition_id: Annotated[
        str, Doc("Competition identifier (slug).")
    ],
    limit: Annotated[
        int,
        Doc("Maximum entries to return."),
        Range(1, 1000),
    ] = 20,
) -> dict[str, Any]

Get the current leaderboard for a competition.

@notice: | Returns leaderboard entries ordered by rank.

Source code in agent_k/toolsets/kaggle.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@kaggle_toolset.tool
@with_tool_telemetry(
    task_id="kaggle_leaderboard",
    tool_type="kaggle_mcp",
    operation="competitions.leaderboard",
    error_response=_error_dict_response,
    result_summary=_leaderboard_summary,
)
async def kaggle_get_leaderboard(
    ctx: RunContext[Any],
    competition_id: Annotated[str, Doc("Competition identifier (slug).")],
    limit: Annotated[int, Doc("Maximum entries to return."), Range(1, 1000)] = 20,
) -> dict[str, Any]:
    """Get the current leaderboard for a competition.

    @notice: |
        Returns leaderboard entries ordered by rank.
    """
    with logfire.span("kaggle_get_leaderboard", competition_id=competition_id):
        adapter = _require_adapter(ctx)
        entries = await adapter.get_leaderboard(competition_id, limit=limit)
        return {
            "competition_id": competition_id,
            "total_entries": len(entries),
            "entries": [{"rank": e.rank, "team_name": e.team_name, "score": e.score} for e in entries],
        }

kaggle_list_datasets async

kaggle_list_datasets(
    ctx: RunContext[Any],
    competition_id: Annotated[
        str, Doc("Competition identifier (slug).")
    ],
) -> dict[str, Any]

List available datasets for a competition.

@notice: | Returns file metadata for competition datasets.

Source code in agent_k/toolsets/kaggle.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@kaggle_toolset.tool
@with_tool_telemetry(
    task_id="kaggle_datasets",
    tool_type="kaggle_mcp",
    operation="competitions.data",
    error_response=_error_dict_response,
    result_summary=_dataset_summary,
)
async def kaggle_list_datasets(
    ctx: RunContext[Any], competition_id: Annotated[str, Doc("Competition identifier (slug).")]
) -> dict[str, Any]:
    """List available datasets for a competition.

    @notice: |
        Returns file metadata for competition datasets.
    """
    with logfire.span("kaggle_list_datasets", competition_id=competition_id):
        adapter = _require_adapter(ctx)
        request = getattr(adapter, "_request", None)
        if request is None:
            raise RuntimeError("Adapter does not support listing datasets")

        response = await request("GET", f"/competitions/data/list/{competition_id}")
        if response.status_code != 200:
            raise RuntimeError(f"Failed to list datasets: {response.status_code}")

        files = response.json()
        return {
            "competition_id": competition_id,
            "files": [
                {"name": f.get("name"), "size": f.get("totalBytes"), "description": f.get("description")} for f in files
            ],
        }