173 lines
9.5 KiB
Markdown
173 lines
9.5 KiB
Markdown
|
|
# Task ID: 14
|
||
|
|
|
||
|
|
**Title:** Implement Audit Logging System
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
|
||
|
|
**Dependencies:** 3, 6
|
||
|
|
|
||
|
|
**Priority:** medium
|
||
|
|
|
||
|
|
**Description:** Create comprehensive audit logging for all MCP operations including who ran what, when, and what data was accessed.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create audit logging system:
|
||
|
|
|
||
|
|
```typescript
|
||
|
|
// services/audit-logger.ts
|
||
|
|
export class AuditLogger {
|
||
|
|
constructor(private prisma: PrismaClient) {}
|
||
|
|
|
||
|
|
async logMcpCall(params: {
|
||
|
|
userId: string;
|
||
|
|
sessionId: string;
|
||
|
|
serverId: string;
|
||
|
|
tool: string;
|
||
|
|
arguments: any;
|
||
|
|
responseSize: number;
|
||
|
|
filteredSize: number;
|
||
|
|
duration: number;
|
||
|
|
success: boolean;
|
||
|
|
error?: string;
|
||
|
|
}) {
|
||
|
|
await this.prisma.auditLog.create({
|
||
|
|
data: {
|
||
|
|
userId: params.userId,
|
||
|
|
action: 'mcp_call',
|
||
|
|
resource: `${params.serverId}:${params.tool}`,
|
||
|
|
details: {
|
||
|
|
sessionId: params.sessionId,
|
||
|
|
arguments: params.arguments,
|
||
|
|
responseSize: params.responseSize,
|
||
|
|
filteredSize: params.filteredSize,
|
||
|
|
reductionPercent: Math.round((1 - params.filteredSize / params.responseSize) * 100),
|
||
|
|
duration: params.duration,
|
||
|
|
success: params.success,
|
||
|
|
error: params.error
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
async logServerAction(params: {
|
||
|
|
userId: string;
|
||
|
|
action: 'start' | 'stop' | 'configure';
|
||
|
|
serverId: string;
|
||
|
|
details?: any;
|
||
|
|
}) {
|
||
|
|
await this.prisma.auditLog.create({
|
||
|
|
data: {
|
||
|
|
userId: params.userId,
|
||
|
|
action: `server_${params.action}`,
|
||
|
|
resource: params.serverId,
|
||
|
|
details: params.details
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
async getAuditTrail(filters: {
|
||
|
|
userId?: string;
|
||
|
|
serverId?: string;
|
||
|
|
action?: string;
|
||
|
|
from?: Date;
|
||
|
|
to?: Date;
|
||
|
|
limit?: number;
|
||
|
|
}) {
|
||
|
|
return this.prisma.auditLog.findMany({
|
||
|
|
where: {
|
||
|
|
userId: filters.userId,
|
||
|
|
resource: filters.serverId ? { contains: filters.serverId } : undefined,
|
||
|
|
action: filters.action,
|
||
|
|
timestamp: {
|
||
|
|
gte: filters.from,
|
||
|
|
lte: filters.to
|
||
|
|
}
|
||
|
|
},
|
||
|
|
orderBy: { timestamp: 'desc' },
|
||
|
|
take: filters.limit || 100,
|
||
|
|
include: { user: true }
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// CLI command for audit
|
||
|
|
program
|
||
|
|
.command('audit')
|
||
|
|
.description('View audit logs')
|
||
|
|
.option('--user <userId>', 'Filter by user')
|
||
|
|
.option('--server <serverId>', 'Filter by MCP server')
|
||
|
|
.option('--since <date>', 'Show logs since date')
|
||
|
|
.option('--limit <n>', 'Limit results', '50')
|
||
|
|
.action(async (options) => {
|
||
|
|
const logs = await client.get('/api/audit', options);
|
||
|
|
console.table(logs.map(l => ({
|
||
|
|
TIME: l.timestamp,
|
||
|
|
USER: l.user?.email,
|
||
|
|
ACTION: l.action,
|
||
|
|
RESOURCE: l.resource
|
||
|
|
})));
|
||
|
|
});
|
||
|
|
```
|
||
|
|
|
||
|
|
**Test Strategy:**
|
||
|
|
|
||
|
|
Test audit log creation for all operation types. Test query filtering works correctly. Test log retention/cleanup. Verify sensitive data is not logged.
|
||
|
|
|
||
|
|
## Subtasks
|
||
|
|
|
||
|
|
### 14.1. Design audit log schema and write TDD tests for AuditLogger methods
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
**Dependencies:** None
|
||
|
|
|
||
|
|
Define the AuditLog Prisma schema with SIEM-compatible structure, correlation IDs, and date partitioning support. Write comprehensive Vitest tests for all AuditLogger methods BEFORE implementation.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create src/mcpd/tests/unit/services/audit-logger.test.ts with TDD tests covering: (1) logMcpCall() creates audit record with correct fields including correlationId, sessionId, serverId, tool, sanitized arguments, responseSize, filteredSize, duration, success/error status; (2) logServerAction() logs start/stop/configure actions with serverId and details; (3) getAuditTrail() supports filtering by userId, serverId, action, date range, and limit; (4) Sensitive data scrubbing - verify arguments containing password, token, secret, apiKey, credentials patterns are redacted; (5) Structured JSON format compatible with Splunk/ELK (include timestamp in ISO8601, log level, correlation_id, user_id, action, resource fields). Create src/db/prisma/audit-log-schema.prisma addition with: correlationId (uuid), action (indexed), resource, details (Json), timestamp (indexed, for partitioning), userId (optional FK), responseTimeMs, success (boolean). Add @@index([timestamp, action]) and @@index([userId, timestamp]) for query performance per SRE requirements.
|
||
|
|
|
||
|
|
### 14.2. Implement AuditLogger service with async buffered writes and sensitive data scrubbing
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
**Dependencies:** 14.1
|
||
|
|
|
||
|
|
Implement the AuditLogger class with high-throughput async write buffer, batch inserts to prevent performance impact, and comprehensive sensitive data scrubbing to prevent credential leakage.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create src/mcpd/src/services/audit-logger.ts implementing: Constructor accepting PrismaClient with configurable buffer settings (bufferSize: 100 default, flushIntervalMs: 1000 default). Implement logMcpCall() that adds entry to in-memory buffer, triggers flush when buffer full. Implement logServerAction() similarly. Implement private flushBuffer() using prisma.auditLog.createMany() for batch inserts. Implement scrubSensitiveData(obj: unknown): unknown that recursively traverses objects and redacts values for keys matching patterns: /password/i, /token/i, /secret/i, /apiKey/i, /credentials/i, /authorization/i - replace with '[REDACTED]'. Add correlationId generation using crypto.randomUUID(). Implement getAuditTrail() with Prisma query supporting all filter parameters from task spec. Add graceful shutdown: flush remaining buffer before process exit. Performance consideration: Use setImmediate/process.nextTick for non-blocking buffer operations. Add JSDoc documenting the async nature and security guarantees.
|
||
|
|
|
||
|
|
### 14.3. Implement audit query API with aggregation support for data analysts
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
**Dependencies:** 14.2
|
||
|
|
|
||
|
|
Create REST API endpoints for querying audit logs with aggregation capabilities (requests per user/server/time window) and export functionality (CSV/JSON) for data analyst dashboards and usage reports.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create src/mcpd/src/routes/audit.ts with endpoints: GET /api/audit - paginated audit log query with filters (userId, serverId, action, from, to, limit, offset); GET /api/audit/aggregations - aggregation queries returning counts grouped by user, server, action, or time window (hourly/daily/weekly); GET /api/audit/export - export audit data as CSV or JSON file download with same filter support. Implement AuditQueryService in src/mcpd/src/services/audit-query.service.ts with methods: queryAuditLogs(filters: AuditFilters): Promise<PaginatedResult<AuditLog>>; getAggregations(groupBy: 'user' | 'server' | 'action' | 'hour' | 'day', filters: AuditFilters): Promise<AggregationResult[]>; exportToCsv(filters: AuditFilters): Promise<ReadableStream>; exportToJson(filters: AuditFilters): Promise<ReadableStream>. Use Prisma groupBy for aggregations. For CSV export, use streaming to handle large datasets without memory issues. Add rate limiting on export endpoint to prevent DoS. Write Zod schemas for all query parameters.
|
||
|
|
|
||
|
|
### 14.4. Implement CLI audit command with SRE-friendly output formats
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
**Dependencies:** 14.3
|
||
|
|
|
||
|
|
Create the mcpctl audit CLI command with filters, multiple output formats (table/json/yaml for SIEM integration), and tail-like streaming capability for real-time log monitoring.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create src/cli/src/commands/audit.ts implementing CommandModule with: 'mcpctl audit' - list recent audit logs; 'mcpctl audit --user <userId>' - filter by user; 'mcpctl audit --server <serverId>' - filter by MCP server; 'mcpctl audit --since <date>' - logs since date (supports ISO8601, relative like '1h', '24h', '7d'); 'mcpctl audit --action <action>' - filter by action type; 'mcpctl audit --limit <n>' - limit results (default 50); 'mcpctl audit --output json' - JSON output for jq piping; 'mcpctl audit --output yaml' - YAML output; 'mcpctl audit --follow' - stream new logs in real-time (WebSocket or polling); 'mcpctl audit export --format csv --since 7d > audit.csv' - export to file. Table output format: TIME | USER | ACTION | RESOURCE (aligned columns). JSON output must be valid JSON array parseable by jq. Add --no-color flag for CI environments. Use chalk for colored output (green=success, red=error actions).
|
||
|
|
|
||
|
|
### 14.5. Implement log streaming to external SIEM systems and retention policy
|
||
|
|
|
||
|
|
**Status:** pending
|
||
|
|
**Dependencies:** 14.2
|
||
|
|
|
||
|
|
Add support for streaming audit logs to external systems (Splunk HEC, ELK/Elasticsearch, generic webhook) and implement configurable log retention policy with automatic cleanup for compliance and storage management.
|
||
|
|
|
||
|
|
**Details:**
|
||
|
|
|
||
|
|
Create src/mcpd/src/services/audit-streaming.ts with AuditStreamingService supporting multiple destinations: SplunkHecDestination (HTTP Event Collector with HEC token auth), ElasticsearchDestination (bulk API with index templates), WebhookDestination (generic HTTP POST with configurable auth). Each destination implements AuditDestination interface: send(logs: AuditLog[]): Promise<void>; healthCheck(): Promise<boolean>. Implement retry logic with exponential backoff for failed sends. Create src/mcpd/src/services/audit-retention.ts with AuditRetentionService: configure(policy: RetentionPolicy): void where policy includes retentionDays (default 90), archiveEnabled (boolean), archiveDestination (S3 path or local path). Implement cleanup job using node-cron: DELETE FROM audit_logs WHERE timestamp < NOW() - retentionDays (with batched deletes to avoid long locks). Add archiveBeforeDelete option that exports to configured destination before deletion. Add configuration in .env: AUDIT_RETENTION_DAYS, AUDIT_SPLUNK_HEC_URL, AUDIT_SPLUNK_HEC_TOKEN, AUDIT_ELASTICSEARCH_URL. Write unit tests mocking external services.
|