基本的な API¶
このチュートリアルでは、lambapi を使って実際の API を構築しながら、基本的な機能を学びます。
目標¶
このチュートリアルを完了すると、以下ができるようになります:
- 基本的な CRUD API の作成
- パスパラメータとクエリパラメータの使用
- リクエストボディの処理
- カスタムレスポンスの返却
- エラーハンドリングの実装
1. プロジェクトのセットアップ¶
ディレクトリ構造¶
必要なパッケージ¶
2. データモデルの定義¶
まず、API で使用するデータモデルを定義しましょう。
models.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class User:
id: str
name: str
email: str
age: int
created_at: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now().isoformat()
@dataclass
class CreateUserRequest:
name: str
email: str
age: int
@dataclass
class UpdateUserRequest:
name: Optional[str] = None
email: Optional[str] = None
age: Optional[int] = None
# インメモリデータストア(本番環境では DB を使用)
USERS_DB = {}
3. 基本的な API の実装¶
app.py
from lambapi import API, Response, create_lambda_handler
from lambapi.exceptions import NotFoundError, ValidationError
from models import User, CreateUserRequest, UpdateUserRequest, USERS_DB
import uuid
def create_app(event, context):
app = API(event, context)
# ヘルスチェック
@app.get("/health")
def health_check():
"""API のヘルスチェック"""
return {
"status": "healthy",
"timestamp": "2025-01-01T00:00:00Z",
"version": "1.0.0"
}
# ユーザー一覧取得
@app.get("/users")
def get_users(limit: int = 10, offset: int = 0, search: str = ""):
"""ユーザー一覧を取得"""
all_users = list(USERS_DB.values())
# 検索フィルタリング
if search:
all_users = [
user for user in all_users
if search.lower() in user.name.lower()
or search.lower() in user.email.lower()
]
# ページネーション
total = len(all_users)
users = all_users[offset:offset + limit]
return {
"users": [user.__dict__ for user in users],
"pagination": {
"total": total,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total
}
}
# 特定ユーザー取得
@app.get("/users/{user_id}")
def get_user(user_id: str):
"""特定のユーザーを取得"""
if user_id not in USERS_DB:
raise NotFoundError("User", user_id)
user = USERS_DB[user_id]
return {"user": user.__dict__}
# ユーザー作成
@app.post("/users")
def create_user(request):
"""新しいユーザーを作成"""
try:
data = request.json()
# 基本的なバリデーション
if not data.get("name"):
raise ValidationError("Name is required", field="name")
if not data.get("email"):
raise ValidationError("Email is required", field="email")
if not isinstance(data.get("age"), int) or data["age"] < 0:
raise ValidationError("Age must be a positive integer", field="age")
# メール重複チェック
for existing_user in USERS_DB.values():
if existing_user.email == data["email"]:
raise ValidationError("Email already exists", field="email", value=data["email"])
# ユーザー作成
user_id = str(uuid.uuid4())
user = User(
id=user_id,
name=data["name"],
email=data["email"],
age=data["age"]
)
USERS_DB[user_id] = user
return Response(
{
"message": "User created successfully",
"user": user.__dict__
},
status_code=201
)
except Exception as e:
if isinstance(e, ValidationError):
raise
raise ValidationError("Invalid request data")
# ユーザー更新
@app.put("/users/{user_id}")
def update_user(user_id: str, request):
"""既存ユーザーを更新"""
if user_id not in USERS_DB:
raise NotFoundError("User", user_id)
data = request.json()
user = USERS_DB[user_id]
# 更新可能なフィールドのみ処理
if "name" in data and data["name"]:
user.name = data["name"]
if "email" in data and data["email"]:
# メール重複チェック(自分以外)
for uid, existing_user in USERS_DB.items():
if uid != user_id and existing_user.email == data["email"]:
raise ValidationError("Email already exists", field="email")
user.email = data["email"]
if "age" in data and isinstance(data["age"], int) and data["age"] >= 0:
user.age = data["age"]
return {
"message": "User updated successfully",
"user": user.__dict__
}
# ユーザー削除
@app.delete("/users/{user_id}")
def delete_user(user_id: str):
"""ユーザーを削除"""
if user_id not in USERS_DB:
raise NotFoundError("User", user_id)
user = USERS_DB.pop(user_id)
return {
"message": f"User {user.name} deleted successfully",
"deleted_user_id": user_id
}
# 統計情報
@app.get("/stats")
def get_stats():
"""ユーザー統計を取得"""
users = list(USERS_DB.values())
if not users:
return {
"total_users": 0,
"average_age": 0,
"age_distribution": {}
}
total_users = len(users)
average_age = sum(user.age for user in users) / total_users
# 年齢分布
age_ranges = {
"0-18": 0,
"19-30": 0,
"31-50": 0,
"51+": 0
}
for user in users:
if user.age <= 18:
age_ranges["0-18"] += 1
elif user.age <= 30:
age_ranges["19-30"] += 1
elif user.age <= 50:
age_ranges["31-50"] += 1
else:
age_ranges["51+"] += 1
return {
"total_users": total_users,
"average_age": round(average_age, 2),
"age_distribution": age_ranges
}
return app
# Lambda エントリーポイント
lambda_handler = create_lambda_handler(create_app)
# ローカルテスト用
if __name__ == "__main__":
# サンプルデータ
sample_users = [
User("1", "Alice", "alice@example.com", 25),
User("2", "Bob", "bob@example.com", 30),
User("3", "Charlie", "charlie@example.com", 35)
]
for user in sample_users:
USERS_DB[user.id] = user
# テスト実行
test_events = [
{
'httpMethod': 'GET',
'path': '/users',
'queryStringParameters': {'limit': '2'},
'headers': {},
'body': None
},
{
'httpMethod': 'GET',
'path': '/users/1',
'headers': {},
'body': None
},
{
'httpMethod': 'GET',
'path': '/stats',
'headers': {},
'body': None
}
]
context = type('Context', (), {'aws_request_id': 'test-123'})()
for event in test_events:
print(f"\n=== {event['httpMethod']} {event['path']} ===")
result = lambda_handler(event, context)
print(f"Status: {result['statusCode']}")
print(f"Response: {result['body']}")
4. API のテスト¶
基本的なテスト¶
test_api.py
import json
from app import lambda_handler
def create_test_context():
return type('Context', (), {'aws_request_id': 'test-123'})()
def test_health_check():
event = {
'httpMethod': 'GET',
'path': '/health',
'headers': {},
'body': None
}
result = lambda_handler(event, create_test_context())
assert result['statusCode'] == 200
body = json.loads(result['body'])
assert body['status'] == 'healthy'
def test_get_users():
event = {
'httpMethod': 'GET',
'path': '/users',
'queryStringParameters': {'limit': '5'},
'headers': {},
'body': None
}
result = lambda_handler(event, create_test_context())
assert result['statusCode'] == 200
body = json.loads(result['body'])
assert 'users' in body
assert 'pagination' in body
def test_create_user():
event = {
'httpMethod': 'POST',
'path': '/users',
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'name': 'Test User',
'email': 'test@example.com',
'age': 25
})
}
result = lambda_handler(event, create_test_context())
assert result['statusCode'] == 201
body = json.loads(result['body'])
assert body['message'] == 'User created successfully'
assert body['user']['name'] == 'Test User'
if __name__ == "__main__":
test_health_check()
test_get_users()
test_create_user()
print("All tests passed!")
5. エラーハンドリングの追加¶
error_handlers.py
from lambapi.exceptions import ValidationError, NotFoundError
def add_error_handlers(app):
"""カスタムエラーハンドラーを追加"""
@app.error_handler(ValidationError)
def handle_validation_error(error, request, context):
return Response({
"error": "VALIDATION_ERROR",
"message": error.message,
"field": getattr(error, 'field', None),
"request_id": context.aws_request_id
}, status_code=400)
@app.error_handler(NotFoundError)
def handle_not_found_error(error, request, context):
return Response({
"error": "NOT_FOUND",
"message": error.message,
"request_id": context.aws_request_id
}, status_code=404)
@app.default_error_handler
def handle_unknown_error(error, request, context):
return Response({
"error": "INTERNAL_ERROR",
"message": "An unexpected error occurred",
"request_id": context.aws_request_id
}, status_code=500)
6. 実用的な機能の追加¶
ページネーションの改善¶
@app.get("/users")
def get_users(
limit: int = 10,
offset: int = 0,
sort_by: str = "name",
sort_order: str = "asc",
search: str = ""
):
"""高度なフィルタリングとソート機能付きユーザー一覧"""
all_users = list(USERS_DB.values())
# 検索
if search:
all_users = [
user for user in all_users
if search.lower() in user.name.lower()
or search.lower() in user.email.lower()
]
# ソート
reverse = sort_order.lower() == "desc"
if sort_by == "name":
all_users.sort(key=lambda u: u.name, reverse=reverse)
elif sort_by == "age":
all_users.sort(key=lambda u: u.age, reverse=reverse)
elif sort_by == "created_at":
all_users.sort(key=lambda u: u.created_at, reverse=reverse)
# ページネーション
total = len(all_users)
users = all_users[offset:offset + limit]
return {
"users": [user.__dict__ for user in users],
"pagination": {
"total": total,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total
},
"filters": {
"search": search,
"sort_by": sort_by,
"sort_order": sort_order
}
}
バッチ操作¶
@app.post("/users/batch")
def create_users_batch(request):
"""複数ユーザーの一括作成"""
data = request.json()
users_data = data.get("users", [])
if not users_data:
raise ValidationError("No users provided")
created_users = []
errors = []
for i, user_data in enumerate(users_data):
try:
# バリデーション
if not user_data.get("name"):
errors.append(f"User {i}: Name is required")
continue
# ユーザー作成
user_id = str(uuid.uuid4())
user = User(
id=user_id,
name=user_data["name"],
email=user_data["email"],
age=user_data["age"]
)
USERS_DB[user_id] = user
created_users.append(user.__dict__)
except Exception as e:
errors.append(f"User {i}: {str(e)}")
return {
"message": f"Batch operation completed",
"created": len(created_users),
"errors": len(errors),
"users": created_users,
"error_details": errors if errors else None
}
7. 次のステップ¶
このチュートリアルで基本的な CRUD API を作成しました。次は以下のトピックに進みましょう:
- CORS 設定 - フロントエンドとの連携
- API リファレンス - すべてのクラスとメソッドの詳細
- デプロイメント - 本番環境での運用
まとめ¶
このチュートリアルでは以下を学びました:
- ✅ 基本的な CRUD API の実装
- ✅ パスパラメータとクエリパラメータの使用
- ✅ リクエストボディの処理
- ✅ カスタムレスポンスとステータスコード
- ✅ バリデーションとエラーハンドリング
- ✅ ページネーションとフィルタリング
- ✅ バッチ操作の実装
これらの基礎をマスターすることで、実際のプロダクションで使える API を構築できるようになります。