fastapi-master/docs_src/nosql_databases/tutorial001.py
lqs d20566fe07
Some checks failed
Test / lint (push) Has been cancelled
Test / test (pydantic-v1, 3.10) (push) Has been cancelled
Test / test (pydantic-v1, 3.11) (push) Has been cancelled
Test / test (pydantic-v1, 3.12) (push) Has been cancelled
Test / test (pydantic-v1, 3.8) (push) Has been cancelled
Test / test (pydantic-v1, 3.9) (push) Has been cancelled
Test / test (pydantic-v2, 3.10) (push) Has been cancelled
Test / test (pydantic-v2, 3.11) (push) Has been cancelled
Test / test (pydantic-v2, 3.12) (push) Has been cancelled
Test / test (pydantic-v2, 3.8) (push) Has been cancelled
Test / test (pydantic-v2, 3.9) (push) Has been cancelled
Test / coverage-combine (push) Has been cancelled
Test / check (push) Has been cancelled
Issue Manager / issue-manager (push) Has been cancelled
Label Approved / label-approved (push) Has been cancelled
init
2024-08-24 04:41:47 +00:00

54 lines
1.3 KiB
Python

from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user