手把手实现终端实时聊天系统

这篇文章按 chat-tui 仓库的真实提交顺序来写。这个项目不是先把所有模块铺开,再回头补测试,而是按一条能跑通的聊天链路逐步推进:先做协议,再做认证,再做群,再做消息和私聊,最后接上 Textual 终端界面、启动命令、恢复测试、并发测试、日志、指标和性能验收。

这也是写这类网络项目比较稳的顺序。实时聊天看起来像 UI 项目,实际第一步不是画界面,而是先把 TCP 字节流切成应用层消息。协议不稳,后面的服务端、客户端、TUI 都会被迫猜数据边界,测试也很难写。

本文会从零实现一个简化但完整的 chat-tui:一个 asyncio TCP 服务端,多个 Textual 终端客户端,服务端和客户端各自用 SQLite 存状态,线上协议是 4 字节长度前缀加 UTF-8 JSON。功能包括注册登录、群聊、私聊、在线状态、未读数、本地历史,以及 Base64 图片消息。每一节都包含要写的代码位置、核心实现和对应测试,不只讲架构图。


Table of contents

Open Table of contents

开发顺序按提交历史走

先看历史提交能少走很多弯路。这个仓库的主线提交基本是一份实现计划:

Step 1      项目脚手架、包结构、CLI 入口
Step 2-4    协议分帧、消息 schema、协议契约样例
Step 5-8    服务端用户持久化、认证连接流、客户端网络层、认证端到端测试
Step 9-12   群持久化、群协议处理、客户端群状态、群端到端测试
Step 13-19  消息持久化、群消息投递、客户端消息状态、私聊和在线状态
Step 20-23  Textual 登录页、主工作区、群/私聊控件、Base64 图片模式
Step 24-27  启动命令、文档、恢复验收、50 idle client 并发验收
后续提交    修复发送方重复回显、稳定 TUI snapshot、补日志、指标和主动负载测试

这条线比“先写完整服务端,再写完整客户端,再补 UI”更适合新手跟着实现。每一阶段都有一个可运行切片,也有对应测试:协议阶段测帧,认证阶段测真实 TCP 登录,群阶段测端到端,消息阶段测实时投递和未读数,TUI 阶段用 headless 测试,最后用恢复、并发和性能测试确认系统边界。

先把范围定死

开始写代码前先定范围。这个项目的 MVP 是终端实时聊天,不是互联网级 IM 系统。

要做的功能:

  • 用户注册和登录,阻止同一个用户重复在线登录。
  • 群创建、加入、离开、列表查询。
  • 群实时消息和一对一私聊实时消息。
  • 私聊联系人在线/离线状态。
  • 客户端本地会话列表、聊天历史和未读数。
  • 文本消息和 Base64 图片消息。
  • 服务端重启后保留用户、群和消息;客户端重启后保留本地历史。

明确不做的功能:

  • 不做 TLS 和端到端加密。
  • 不做任意文件传输,图片只当 Base64 字符串。
  • 不做离线消息重放。
  • 不做 ACK/retry。
  • 不做多服务端集群。
  • 不做协议版本协商和批量发送。

范围定清楚后,项目结构也就比较自然:

src/chat_tui/
  protocol/      # 字节帧和协议消息
  server/        # TCP 服务端、路由、服务端 SQLite
  client/        # TCP 客户端、本地 SQLite、客户端状态
  tui/           # Textual 界面
  logging.py     # 运行日志配置
tests/
  test_protocol_*.py
  test_server_*.py
  test_client_*.py
  test_*_e2e.py
  test_tui_*.py
  test_*_acceptance.py

pyproject.toml 只需要几类依赖:

[project]
requires-python = ">=3.12"
dependencies = [
    "aiosqlite>=0.21.0",
    "textual>=1.0.0",
]

[project.scripts]
chat-tui = "chat_tui.cli:main"
chat-tui-client = "chat_tui.client.cli:main"
chat-tui-server = "chat_tui.server.cli:main"

[dependency-groups]
dev = [
    "pytest>=8.3",
    "pytest-asyncio>=0.25",
    "pytest-textual-snapshot>=1.1",
]

第一个测试不用复杂,先确认包能 import,三个命令入口存在,后续每个阶段再补自己的测试。

uv run pytest tests/test_scaffold.py

协议第一步:TCP 字节流没有消息边界

聊天消息不能直接 writer.write(json.dumps(message).encode()) 就完事。TCP 给应用的是可靠、有序的字节流,不保留应用层写入边界。你写一次,对方可能读两次;你写两次,对方可能一次读完。

所以第一步要做应用层分帧。chat-tui 用的是 4 字节大端长度前缀:

4-byte big-endian payload length
UTF-8 JSON payload of exactly that many bytes

实现文件放在 src/chat_tui/protocol/framing.py。先定义错误类型和常量:

LENGTH_PREFIX_SIZE = 4
DEFAULT_MAX_FRAME_SIZE = 1_048_576


class FrameError(ValueError):
    pass


class InvalidFrameLength(FrameError):
    pass


class FrameTooLarge(FrameError):
    pass


class TruncatedFrame(FrameError):
    pass


class InvalidFrameEncoding(FrameError):
    pass


class InvalidFrameJSON(FrameError):
    pass


class InvalidFramePayload(FrameError):
    pass

这里把错误拆细,是为了服务端后面能区分哪些错误可以继续读下一帧,哪些错误应该断开当前连接。比如非法 JSON 可以回一个 error 后继续读;超大帧或截断帧通常直接关掉这条连接。

编码函数要做四件事:确认输入是对象,JSON 序列化,检查大小,加长度前缀。

def encode_frame(message: Mapping[str, Any], *, max_size: int = DEFAULT_MAX_FRAME_SIZE) -> bytes:
    if not isinstance(message, Mapping):
        raise InvalidFramePayload("frame payload must be a JSON object")

    try:
        payload = json.dumps(
            dict(message),
            ensure_ascii=False,
            separators=(",", ":"),
            sort_keys=True,
        ).encode("utf-8")
    except (TypeError, ValueError) as exc:
        raise InvalidFramePayload("frame payload must be JSON serializable") from exc

    _validate_declared_length(len(payload), max_size=max_size)
    return len(payload).to_bytes(LENGTH_PREFIX_SIZE, "big") + payload

解码函数不能只 json.loads(frame[4:]),它必须验证声明长度和真实长度一致:

def decode_frame(frame: bytes | bytearray | memoryview, *, max_size: int = DEFAULT_MAX_FRAME_SIZE) -> dict[str, Any]:
    frame_bytes = bytes(frame)
    if len(frame_bytes) < LENGTH_PREFIX_SIZE:
        raise InvalidFrameLength("frame is missing the 4-byte length prefix")

    declared_length = int.from_bytes(frame_bytes[:LENGTH_PREFIX_SIZE], "big")
    _validate_declared_length(declared_length, max_size=max_size)

    payload = frame_bytes[LENGTH_PREFIX_SIZE:]
    if len(payload) < declared_length:
        raise TruncatedFrame("frame payload is truncated")
    if len(payload) > declared_length:
        raise InvalidFrameLength("frame contains extra payload bytes")

    return _decode_json_payload(payload)

真正跑在网络上的版本用 asyncio.StreamReader.readexactly

async def read_frame(reader: asyncio.StreamReader, *, max_size: int = DEFAULT_MAX_FRAME_SIZE) -> dict[str, Any]:
    try:
        prefix = await reader.readexactly(LENGTH_PREFIX_SIZE)
    except asyncio.IncompleteReadError as exc:
        raise TruncatedFrame("stream ended before length prefix") from exc

    declared_length = int.from_bytes(prefix, "big")
    _validate_declared_length(declared_length, max_size=max_size)

    try:
        payload = await reader.readexactly(declared_length)
    except asyncio.IncompleteReadError as exc:
        raise TruncatedFrame("stream ended before payload was complete") from exc

    return _decode_json_payload(payload)


async def write_frame(writer: asyncio.StreamWriter, message: Mapping[str, Any], *, max_size: int = DEFAULT_MAX_FRAME_SIZE) -> None:
    writer.write(encode_frame(message, max_size=max_size))
    await writer.drain()

这一层的测试先不用碰聊天业务。重点测 TCP 分片、非法长度、超大帧、非法 UTF-8、非法 JSON、JSON 顶层不是对象。

async def test_read_frame_handles_fragmented_tcp_reads() -> None:
    frame = encode_frame({"type": "fragmented", "payload": {"parts": 3}})
    reader = asyncio.StreamReader()

    async def feed_fragments() -> None:
        for chunk in (frame[:2], frame[2:7], frame[7:]):
            reader.feed_data(chunk)
            await asyncio.sleep(0)
        reader.feed_eof()

    feeder = asyncio.create_task(feed_fragments())
    try:
        assert await read_frame(reader) == {
            "type": "fragmented",
            "payload": {"parts": 3},
        }
    finally:
        await feeder

对应命令:

uv run pytest tests/test_protocol_framing.py

协议第二步:JSON 外面还要有消息 schema

长度前缀只解决字节边界,JSON 只解决数据表达。业务协议还需要定义哪些消息类型合法、每种消息需要哪些字段、错误响应长什么样。

src/chat_tui/protocol/messages.py 里统一使用这个外壳:

{
  "type": "group.send",
  "payload": {
    "group": "general",
    "content_type": "text",
    "content": "hello"
  }
}

先列出协议类型:

CONTENT_TYPES = frozenset({"text", "image"})

AUTH_MESSAGE_TYPES = frozenset({"register", "login"})
GROUP_MESSAGE_TYPES = frozenset({
    "group.create",
    "group.join",
    "group.leave",
    "group.list",
    "group.send",
    "group.message",
})
DIRECT_MESSAGE_TYPES = frozenset({"direct.open", "direct.send", "direct.message"})
PRESENCE_MESSAGE_TYPES = frozenset({"presence.update"})
RESPONSE_MESSAGE_TYPES = frozenset({"success", "error"})
SUPPORTED_MESSAGE_TYPES = frozenset(
    AUTH_MESSAGE_TYPES
    | GROUP_MESSAGE_TYPES
    | DIRECT_MESSAGE_TYPES
    | PRESENCE_MESSAGE_TYPES
    | RESPONSE_MESSAGE_TYPES
)

解析函数只返回验证过的 ProtocolMessage

@dataclass(frozen=True, slots=True)
class ProtocolMessage:
    type: str
    payload: dict[str, Any]


def parse_message(raw: Mapping[str, Any]) -> ProtocolMessage:
    if "type" not in raw:
        raise MessageValidationError("protocol message is missing required field: type")
    message_type = raw["type"]
    if not isinstance(message_type, str):
        raise MessageValidationError("protocol message field 'type' must be a string")
    if message_type not in SUPPORTED_MESSAGE_TYPES:
        raise MessageValidationError(
            f"unsupported protocol message type: {message_type}",
            code="unsupported_message_type",
        )

    payload = raw.get("payload")
    if not isinstance(payload, Mapping):
        raise MessageValidationError("protocol message field 'payload' must be an object")

    payload_copy = dict(payload)
    _VALIDATORS[message_type](payload_copy)
    return ProtocolMessage(type=message_type, payload=payload_copy)

每种消息有自己的校验函数。比如 group.send 需要群名、内容类型和内容;presence.update 需要用户名和布尔值;group.list 必须是空 payload。

def _validate_group_send_payload(payload: dict[str, Any]) -> None:
    _require_non_empty_string(payload, "group")
    _validate_content_payload(payload)


def _validate_content_payload(payload: dict[str, Any]) -> None:
    _require_non_empty_string(payload, "content")
    _require_non_empty_string(payload, "content_type")
    if payload["content_type"] not in CONTENT_TYPES:
        raise MessageValidationError(
            "payload field 'content_type' must be either 'text' or 'image'"
        )

响应也走同一套消息外壳:

def make_success_response(request_type: str, *, data: Mapping[str, Any] | None = None) -> ProtocolMessage:
    payload: dict[str, Any] = {"request": _safe_text(request_type)}
    if data is not None:
        payload["data"] = dict(data)
    return parse_message({"type": "success", "payload": payload})


def make_error_response(error: object, *, code: str, request_type: str | None = None) -> ProtocolMessage:
    payload = {"code": _safe_text(code), "message": _safe_text(error)}
    if request_type is not None:
        payload["request"] = _safe_text(request_type)
    return parse_message({"type": "error", "payload": payload})

这里有个小细节:make_error_response 不直接相信异常的 __str__,而是包一层 _safe_text。异常对象本身也可能字符串化失败,协议层不能因为构造错误响应再抛一次异常。

这一层测试要覆盖所有合法消息,也要故意缺字段、传错类型、传未知消息类型。

@pytest.mark.parametrize("raw", VALID_MESSAGES)
def test_parse_valid_protocol_messages(raw: Mapping[str, Any]) -> None:
    message = parse_message(raw)

    assert message == ProtocolMessage(type=raw["type"], payload=dict(raw["payload"]))
    assert message_to_json(message) == raw

对应命令:

uv run pytest tests/test_protocol_messages.py tests/test_protocol_contract.py

服务端持久化:先把事实存下来

服务端数据库保存的是服务事实:用户是否存在、密码校验数据是什么、群有哪些、成员关系是什么、服务端收到过哪些消息。

实现文件是 src/chat_tui/server/persistence.py。类名现在叫 UserStore,实际已经包含用户、群、成员关系和消息。真实项目里可以改名成 ServerStore,不过本仓库沿用了早期认证阶段的命名。

SQLite schema 先按业务事实建表:

CREATE TABLE IF NOT EXISTS users (
    username TEXT PRIMARY KEY,
    password_hash TEXT NOT NULL,
    password_salt TEXT NOT NULL,
    password_iterations INTEGER NOT NULL CHECK (password_iterations > 0),
    created_at TEXT NOT NULL,
    CHECK (length(username) > 0)
);

CREATE TABLE IF NOT EXISTS groups (
    name TEXT PRIMARY KEY,
    owner_username TEXT NOT NULL,
    created_at TEXT NOT NULL,
    FOREIGN KEY (owner_username)
        REFERENCES users (username)
        ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS group_members (
    group_name TEXT NOT NULL,
    username TEXT NOT NULL,
    joined_at TEXT NOT NULL,
    PRIMARY KEY (group_name, username),
    FOREIGN KEY (group_name)
        REFERENCES groups (name)
        ON DELETE CASCADE,
    FOREIGN KEY (username)
        REFERENCES users (username)
        ON DELETE CASCADE
);

消息表同时保存群消息和私聊消息,用 CHECK 约束保证两种消息形态不混:

CREATE TABLE IF NOT EXISTS messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    conversation_type TEXT NOT NULL CHECK (conversation_type IN ('group', 'direct')),
    group_name TEXT,
    sender_username TEXT NOT NULL,
    recipient_username TEXT,
    content_type TEXT NOT NULL CHECK (content_type IN ('text', 'image')),
    content TEXT NOT NULL,
    created_at TEXT NOT NULL,
    CHECK (
        (
            conversation_type = 'group'
            AND group_name IS NOT NULL
            AND recipient_username IS NULL
        )
        OR
        (
            conversation_type = 'direct'
            AND group_name IS NULL
            AND recipient_username IS NOT NULL
        )
    )
);

应用层会校验,数据库层也要兜底。后面如果某次重构绕过了应用层校验,SQLite 约束仍然能挡住一部分脏数据。

密码不要明文存。仓库用 PBKDF2-HMAC-SHA256,随机 salt,默认 600,000 次迭代:

def _hash_password(*, password: str, salt: bytes, iterations: int) -> str:
    return hashlib.pbkdf2_hmac(
        "sha256",
        password.encode("utf-8"),
        salt,
        iterations,
    ).hex()


async def _hash_password_async(*, password: str, salt: bytes, iterations: int) -> str:
    return await asyncio.to_thread(
        _hash_password,
        password=password,
        salt=salt,
        iterations=iterations,
    )

asyncio.to_thread 很关键。PBKDF2 是 CPU 计算,如果直接在事件循环里跑,登录时会卡住同一个 loop 上的其他连接。

用户注册写入 hash 和 salt:

async def create_user(self, username: str, password: str) -> UserRecord:
    salt = secrets.token_bytes(SALT_BYTES)
    password_hash = await _hash_password_async(
        password=password,
        salt=salt,
        iterations=DEFAULT_PASSWORD_ITERATIONS,
    )
    created_at = _utc_timestamp()

    try:
        await connection.execute(
            """
            INSERT INTO users (
                username,
                password_hash,
                password_salt,
                password_iterations,
                created_at
            )
            VALUES (?, ?, ?, ?, ?)
            """,
            (username, password_hash, salt.hex(), DEFAULT_PASSWORD_ITERATIONS, created_at),
        )
    except sqlite3.IntegrityError as error:
        await connection.rollback()
        raise DuplicateUserError(f"username already exists: {username}") from error

    await connection.commit()
    return UserRecord(username=username, created_at=created_at)

密码校验用 hmac.compare_digest,不要用普通字符串比较:

async def verify_password(self, username: str, password: str) -> bool:
    row = await load_password_row(username)
    if row is None:
        return False

    candidate_hash = await _hash_password_async(
        password=password,
        salt=bytes.fromhex(row["password_salt"]),
        iterations=row["password_iterations"],
    )
    return hmac.compare_digest(candidate_hash, row["password_hash"])

这一层测试从 SQLite 行为开始,不需要启动 TCP 服务端:

  • 创建用户后能验证密码。
  • 重复用户名抛 DuplicateUserError
  • 创建群时 owner 自动成为成员。
  • 加入群是幂等的。
  • 离开不存在的成员关系返回 false。
  • 群消息和私聊消息能按插入顺序取出。
  • 文本和图片 content type 都能保存。

对应命令:

uv run pytest tests/test_server_persistence.py tests/test_server_message_persistence.py

服务端连接:每条 TCP 连接都是一台小状态机

服务端在 src/chat_tui/server/connection.py。一条 TCP 连接的状态很简单:

  • 未认证,只能发 registerlogin
  • 已认证,不能再发认证请求,可以发群、消息和私聊请求。
  • 断开时清理在线用户,必要时广播离线状态。

先写在线用户注册表。它需要一个 asyncio.Lock,因为多个连接可能同时登录同一个用户名:

class OnlineUserRegistry:
    def __init__(self) -> None:
        self._lock = asyncio.Lock()
        self._connections: dict[str, ClientConnection] = {}
        self._direct_relations: dict[str, set[str]] = {}

    async def claim(self, username: str, owner: ClientConnection) -> bool:
        async with self._lock:
            if username in self._connections:
                return False
            self._connections[username] = owner
            return True

    async def release(self, username: str, owner: ClientConnection) -> None:
        async with self._lock:
            if self._connections.get(username) is owner:
                del self._connections[username]

连接主循环只做几件事:读帧、解析消息、按状态分发、异常时返回错误或关闭连接。

async def run(self) -> None:
    try:
        while True:
            try:
                raw_message = await read_frame(self.reader, max_size=self.max_frame_size)
            except TruncatedFrame:
                break
            except FrameTooLarge as error:
                await self._send_error(error, code="frame_too_large")
                break
            except FrameError as error:
                await self._send_error(error, code="invalid_frame")
                continue

            message = await self._parse_raw_message(raw_message)
            if message is None:
                continue

            keep_open = await self._handle_message(message)
            if not keep_open:
                break
    finally:
        await self._cleanup()

_handle_message 是状态机核心:

async def _handle_message(self, message: ProtocolMessage) -> bool:
    if self.username is None:
        if message.type == "register":
            return await self._handle_register(message)
        if message.type == "login":
            return await self._handle_login(message)
        await self._send_error(
            "authenticate before sending this request",
            code="unauthenticated",
            request_type=message.type,
        )
        return True

    if message.type in AUTH_MESSAGE_TYPES:
        await self._send_error(
            "connection is already authenticated",
            code="already_authenticated",
            request_type=message.type,
        )
        return True

    if message.type in GROUP_MANAGEMENT_MESSAGE_TYPES:
        response = await handle_group_request(
            message,
            username=self.username,
            user_store=self.user_store,
        )
        await self._send_message(response)
        return True

注册成功后要立刻 claim 在线用户名。否则两个连接可能都注册或登录成功,然后服务端不知道该把实时消息发给哪一条连接。

async def _handle_login(self, message: ProtocolMessage) -> bool:
    username = message.payload["username"]
    password = message.payload["password"]

    if not await self.user_store.verify_password(username, password):
        await self._send_error(
            "invalid username or password",
            code="invalid_credentials",
            request_type=message.type,
        )
        return True

    if not await self.online_users.claim(username, self):
        await self._send_error(
            "user is already online",
            code="already_online",
            request_type=message.type,
        )
        return True

    self.username = username
    await self._send_success(message.type, username=username, status="logged_in")
    await self._broadcast_presence_update(username, online=True)
    return True

ChatServer 是一个薄包装,负责打开 SQLite、启动 asyncio.start_server,每来一条连接就交给 ClientConnection

class ChatServer:
    async def start(self) -> None:
        if self._server is not None:
            return
        await self.user_store.connect()
        self._server = await asyncio.start_server(
            self._handle_client,
            self.host,
            self.port,
        )

    async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        connection = ClientConnection(
            reader=reader,
            writer=writer,
            user_store=self.user_store,
            online_users=self.online_users,
            metrics=self.metrics,
            max_frame_size=self.max_frame_size,
        )
        await connection.run()

这一层的测试应该用真实 TCP,而不是直接调用私有方法。测试里启动一个 ChatServer,用 asyncio.open_connection 发协议帧:

async def request(reader, writer, message_type: str, payload: dict[str, object]) -> dict[str, object]:
    await write_frame(writer, {"type": message_type, "payload": payload})
    return await asyncio.wait_for(read_frame(reader), timeout=1)

要覆盖这些行为:

  • 注册成功后数据库里能验证密码。
  • 登录成功后 server.is_online("alice") 为 true。
  • 错密码返回 invalid_credentials
  • 同一个用户重复在线登录返回 already_online
  • 未认证发 group.list 返回 unauthenticated
  • 缺字段、非法 JSON、未知消息类型不会拖垮连接,后续仍能正常注册。
  • 断开连接后清理在线状态,用户可以重新登录。

对应命令:

uv run pytest tests/test_server_connection.py

客户端网络层:请求响应和后台事件要分开

客户端网络层在 src/chat_tui/client/network.py。它只关心 TCP 和协议,不关心 UI,不直接写本地 SQLite。

这里最容易想错的是响应匹配。当前协议没有请求 ID,所以客户端选择串行化请求:同一时间只允许一个挂起请求。后台接收循环读到 successerror 时,如果 payload.request 等于当前挂起请求类型,就完成这个请求;其他消息放到事件队列。

对象状态大概是这样:

class ChatClient:
    def __init__(self, host: str, port: int) -> None:
        self.host = host
        self.port = port
        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._receive_task: asyncio.Task[None] | None = None
        self._events: asyncio.Queue[ClientEvent] = asyncio.Queue()
        self._request_lock = asyncio.Lock()
        self._pending_request_type: str | None = None
        self._pending_response: asyncio.Future[ProtocolMessage] | None = None

连接成功后启动后台接收任务:

async def connect(self) -> None:
    if self.is_connected:
        return

    try:
        self._reader, self._writer = await asyncio.open_connection(self.host, self.port)
    except OSError as error:
        raise ClientConnectionError(f"could not connect to {self.host}:{self.port}") from error

    self._closing = False
    self._receive_task = asyncio.create_task(self._receive_loop())

发送请求时加锁,写帧后等待匹配响应:

async def send_request(
    self,
    message_type: str,
    payload: Mapping[str, Any],
    *,
    timeout: float | None = None,
) -> ProtocolMessage:
    request = parse_message({"type": message_type, "payload": dict(payload)})

    async with self._request_lock:
        writer = self._writer
        if writer is None or writer.is_closing():
            raise ClientNotConnectedError("client is not connected")

        loop = asyncio.get_running_loop()
        response_future = loop.create_future()
        self._pending_request_type = request.type
        self._pending_response = response_future

        await write_frame(writer, message_to_json(request), max_size=self.max_frame_size)

        try:
            if timeout is None:
                return await response_future
            return await asyncio.wait_for(response_future, timeout=timeout)
        finally:
            self._clear_pending(response_future)

后台接收循环把三类东西分开处理:

  • successerror,尝试完成当前请求。
  • group.messagedirect.messagepresence.update,放到事件队列。
  • 坏帧和坏消息,转成 ClientProtocolError 放到事件队列,能恢复就继续读。
async def _dispatch_message(self, message: ProtocolMessage) -> None:
    if message.type == "success":
        if self._complete_matching_request(message):
            return
        await self._events.put(message)
        return

    if message.type == "error":
        error = ServerResponseError.from_message(message)
        if self._complete_matching_request(message, error=error):
            return
        await self._events.put(error)
        return

    await self._events.put(message)

客户端网络测试不需要启动真实服务端业务,可以写一个最小 TCP 协议服务器。它读一帧,看客户端发了什么,再返回构造好的帧。

async def test_register_sends_request_and_returns_auth_result() -> None:
    seen_request = None

    async def handler(reader, writer) -> None:
        nonlocal seen_request
        seen_request = await read_frame(reader)
        await write_success(writer, request_type="register", username="alice", status="registered")
        await reader.read()

    async with LocalProtocolServer(handler) as server:
        async with ChatClient(server.host, server.port) as client:
            result = await client.register("alice", "secret", timeout=1)

    assert seen_request == {
        "type": "register",
        "payload": {"username": "alice", "password": "secret"},
    }
    assert result.username == "alice"

对应命令:

uv run pytest tests/test_client_network.py

群管理:服务端处理事实,客户端处理工作区

群功能分成三层:

  • 服务端 SQLite:保存群和成员关系。
  • 服务端 handler:把协议请求翻译成存储操作和协议响应。
  • 客户端状态:把服务端结果保存到本地 SQLite,并维护会话列表。

服务端 handler 放在 src/chat_tui/server/groups.py。入口函数只做路由:

async def handle_group_request(
    message: ProtocolMessage,
    *,
    username: str,
    user_store: UserStore,
) -> ProtocolMessage:
    if message.type == "group.create":
        return await _handle_create(message, username=username, user_store=user_store)
    if message.type == "group.join":
        return await _handle_join(message, username=username, user_store=user_store)
    if message.type == "group.leave":
        return await _handle_leave(message, username=username, user_store=user_store)
    if message.type == "group.list":
        return await _handle_list(message, username=username, user_store=user_store)

创建群时,服务端持久化群,并把创建者加入成员表:

async def _handle_create(message: ProtocolMessage, *, username: str, user_store: UserStore) -> ProtocolMessage:
    group_name = message.payload["group"]
    try:
        group = await user_store.create_group(group_name, username)
    except DuplicateGroupError as error:
        return make_error_response(error, code="group_exists", request_type=message.type)

    return make_success_response(
        message.type,
        data={"status": "created", "group": _group_to_data(group)},
    )

客户端本地 SQLite 存的是工作区状态,不是服务端事实的完整副本。src/chat_tui/client/persistence.py 里有 group_sessions

CREATE TABLE IF NOT EXISTS group_sessions (
    name TEXT PRIMARY KEY,
    owner_username TEXT NOT NULL,
    created_at TEXT NOT NULL,
    membership_state TEXT NOT NULL CHECK (membership_state IN ('joined', 'joinable')),
    unread_count INTEGER NOT NULL DEFAULT 0 CHECK (unread_count >= 0),
    updated_at TEXT NOT NULL
);

ClientGroupSessionState 调用网络层,再把结果写进本地 store:

async def create_group(self, group: str, *, timeout: float | None = None) -> GroupOperationResult:
    result = await self.network_client.create_group(group, timeout=timeout)
    await self._save_group(result.group, GROUP_MEMBERSHIP_JOINED)
    await self._refresh_from_store()
    return result


async def list_groups(self, *, timeout: float | None = None) -> GroupListResult:
    result = await self.network_client.list_groups(timeout=timeout)
    for group in result.joined:
        await self._save_group(group, GROUP_MEMBERSHIP_JOINED)
    for group in result.joinable:
        await self._save_group(group, GROUP_MEMBERSHIP_JOINABLE)
    await self._refresh_from_store()
    return result

这一节要分层测试:

uv run pytest tests/test_server_group_connection.py
uv run pytest tests/test_client_persistence.py tests/test_client_state.py
uv run pytest tests/test_group_e2e.py

端到端测试要验证真实流程:Alice 注册并创建群,Bob 注册后 list 能看到 joinable,Bob 加入后双方状态都正确,服务端重启或客户端重启后仍能恢复。

群消息:先落库,再广播

群消息服务端在 src/chat_tui/server/messages.py。核心顺序是:

  1. 检查群存在。
  2. 检查发送者是群成员。
  3. 写入服务端 SQLite。
  4. 查询群成员。
  5. 返回 success 给发送者,并向在线成员广播 group.message

这个顺序不要反过来。先广播再落库,数据库失败时会出现接收方看到了消息但服务端没有记录。

async def handle_group_send_request(
    message: ProtocolMessage,
    *,
    username: str,
    user_store: UserStore,
) -> GroupSendResult:
    group_name = message.payload["group"]
    content_type = cast(MessageContentType, message.payload["content_type"])
    content = message.payload["content"]

    group = await user_store.get_group(group_name)
    if group is None:
        return GroupSendResult(
            response=make_error_response(
                f"unknown group: {group_name}",
                code="unknown_group",
                request_type=message.type,
            )
        )

    membership = await user_store.get_group_member(group_name, username)
    if membership is None:
        return GroupSendResult(
            response=make_error_response(
                f"user is not a member of group: {group_name}",
                code="not_group_member",
                request_type=message.type,
            )
        )

    stored_message = await user_store.create_group_message(
        group_name=group_name,
        sender_username=username,
        content_type=content_type,
        content=content,
    )
    members = await user_store.list_group_members(group_name)

返回值里同时带响应和投递事件:

delivery_payload = {
    "group": stored_message.group_name,
    "sender": stored_message.sender_username,
    "content_type": stored_message.content_type,
    "content": stored_message.content,
    "timestamp": stored_message.created_at,
}
delivery = parse_message({"type": "group.message", "payload": delivery_payload})
response = make_success_response(
    message.type,
    data={"status": "sent", "message": delivery_payload},
)
return GroupSendResult(
    response=response,
    delivery=delivery,
    recipient_usernames=tuple(member.username for member in members),
)

ClientConnection 收到结果后发送响应,再广播给在线成员:

result = await handle_group_send_request(
    message,
    username=self.username,
    user_store=self.user_store,
)
await self._send_message(result.response)
if result.delivery is not None:
    await self._broadcast_to_online_users(
        result.delivery,
        result.recipient_usernames,
    )

客户端群消息状态在 src/chat_tui/client/messages.py。本地消息表要保存方向,方便 UI 区分自己发的和别人发的:

CREATE TABLE IF NOT EXISTS group_messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    group_name TEXT NOT NULL,
    sender_username TEXT NOT NULL,
    content_type TEXT NOT NULL CHECK (content_type IN ('text', 'image')),
    content TEXT NOT NULL,
    timestamp TEXT NOT NULL,
    direction TEXT NOT NULL CHECK (direction IN ('incoming', 'outgoing')),
    stored_at TEXT NOT NULL
);

发送消息时,客户端先记录一个待发送签名,再等服务端成功响应,把响应里的 server timestamp 存成本地 outgoing:

async def send_group_message(
    self,
    group_name: str,
    *,
    content_type: MessageContentType,
    content: str,
    timeout: float | None = None,
) -> GroupMessageRecord:
    signature = _group_outgoing_signature(group_name, content_type, content)
    self._add_pending_outgoing(signature)
    try:
        response = await self.network_client.send_request(
            "group.send",
            {
                "group": group_name,
                "content_type": content_type,
                "content": content,
            },
            timeout=timeout,
        )
        payload = _message_payload_from_send_response(response)
        return await self._save_message_payload(
            payload,
            direction=MESSAGE_DIRECTION_OUTGOING,
        )
    finally:
        self._remove_pending_outgoing(signature)

为什么要有 _pending_outgoing?因为服务端会把 group.message 广播给群内所有在线成员,发送者自己也是成员。发送方可能先收到成功响应,也可能先收到自广播事件。如果不处理,发送方本地历史里会有重复消息。

处理后台事件时分三种情况:

  • 不是 group.message,忽略。
  • 是别人发来的,保存为 incoming,不在当前会话就未读加一。
  • 是自己发来的,优先匹配已有 outgoing;如果请求还没返回,就先忽略自广播,等成功响应存本地。
async def apply_event(self, event: ProtocolMessage | ClientNetworkError) -> GroupMessageRecord | None:
    if isinstance(event, ClientNetworkError):
        return None
    if event.type != "group.message":
        return None

    direction = MESSAGE_DIRECTION_INCOMING
    sender = _required_string(event.payload, "sender")
    if self.local_username is not None and sender == self.local_username:
        existing = self._find_matching_outgoing_echo(event.payload)
        if existing is not None:
            return existing
        if self._has_pending_outgoing(_group_outgoing_signature_from_payload(event.payload)):
            return None
        direction = MESSAGE_DIRECTION_OUTGOING

    record = await self._save_message_payload(event.payload, direction=direction)
    if record.direction == MESSAGE_DIRECTION_INCOMING and record.group_name != self._current_group:
        await self.store.increment_group_unread_count(record.group_name)
        await self._refresh_sessions_from_store()
    return record

这一节的测试要专门写竞态用例。仓库历史里有一次修复就叫 Fix duplicate outgoing message echoes,说明这个点很容易漏。

async def test_pending_group_send_ignores_self_broadcast_until_response_echo_persists(tmp_path) -> None:
    send_task = asyncio.create_task(
        state.send_group_message(
            "general",
            content_type="text",
            content="sent while pending",
            timeout=1,
        )
    )
    await network.called.wait()

    broadcast_record = await state.apply_event(delivery)
    network.release()
    record = await send_task
    persisted_messages = await store.list_group_messages("general")

    assert broadcast_record is None
    assert persisted_messages == (record,)

对应命令:

uv run pytest tests/test_server_message_persistence.py tests/test_server_group_connection.py
uv run pytest tests/test_client_messages.py
uv run pytest tests/test_group_message_e2e.py

私聊和在线状态:只通知相关的人

私聊服务端在 src/chat_tui/server/direct.py。协议有两个请求:

  • direct.open:打开一个私聊会话,并告诉客户端对方当前是否在线。
  • direct.send:写入私聊消息,并推给发送者和接收者中在线的连接。

先打开关系:

async def _handle_open(
    message: ProtocolMessage,
    *,
    username: str,
    user_store: UserStore,
    recipient_online: bool,
) -> DirectRequestResult:
    recipient_username = message.payload["username"]
    recipient = await user_store.get_user(recipient_username)
    if recipient is None:
        return DirectRequestResult(
            response=make_error_response(
                f"unknown user: {recipient_username}",
                code="unknown_user",
                request_type=message.type,
            )
        )

    return DirectRequestResult(
        response=make_success_response(
            message.type,
            data={
                "status": "opened",
                "username": recipient_username,
                "online": recipient_online,
            },
        ),
        related_username=recipient_username,
    )

related_username 会被 ClientConnection 记录到 OnlineUserRegistry 的 direct relation 里。后续某个用户上线或离线,只通知已经打开过直接聊天关系的人,不做全站 presence 广播。

async def _broadcast_presence_update(self, username: str, *, online: bool) -> None:
    related_usernames = await self.online_users.related_usernames(username)
    if not related_usernames:
        return
    presence = parse_message(
        {
            "type": "presence.update",
            "payload": {"username": username, "online": online},
        }
    )
    await self._broadcast_to_online_users(presence, related_usernames)

私聊发送和群发送类似,也是先落库,再构造 direct.message

stored_message = await user_store.create_direct_message(
    sender_username=username,
    recipient_username=recipient_username,
    content_type=content_type,
    content=content,
)
delivery_payload = {
    "sender": stored_message.sender_username,
    "recipient": stored_message.recipient_username,
    "content_type": stored_message.content_type,
    "content": stored_message.content,
    "timestamp": stored_message.created_at,
}
return DirectRequestResult(
    response=make_success_response(
        message.type,
        data={"status": "sent", "message": delivery_payload},
    ),
    related_username=recipient_username,
    delivery=parse_message({"type": "direct.message", "payload": delivery_payload}),
    recipient_usernames=_unique_usernames((username, recipient_username)),
)

客户端私聊状态在 src/chat_tui/client/direct.py。它和群消息有两点不同:

  • 私聊消息要根据本地用户名算出 partner。
  • presence.update 只更新已经存在的 direct session,不自动创建陌生联系人。
def _partner_username(payload: Mapping[str, Any], local_username: str) -> str:
    sender_username = _required_string(payload, "sender")
    recipient_username = _required_string(payload, "recipient")
    if sender_username == local_username:
        return recipient_username
    if recipient_username == local_username:
        return sender_username
    raise ClientProtocolError("direct message does not involve the local user")

打开私聊时保存在线标记并清空未读:

async def open_direct_chat(self, username: str, *, timeout: float | None = None) -> DirectSessionRecord:
    response = await self.network_client.send_request(
        "direct.open",
        {"username": username},
        timeout=timeout,
    )
    opened_username, online = _direct_open_result_from_response(response)
    await self.store.save_direct_session(username=opened_username, online=online)
    self._current_username = opened_username
    await self.store.clear_direct_unread_count(opened_username)
    await self._refresh_from_store()
    return self._sessions_by_username[opened_username]

这一节测试要覆盖完整私聊生命周期:

  • Alice 打开 Bob,Bob 不在线,Alice 看到 offline。
  • Bob 登录后,Alice 收到 presence.update online=True
  • Alice 发消息,Alice 和 Bob 都收到 direct.message,Carol 不收到。
  • Bob 断开后,Alice 收到 offline。
  • Bob 离线期间 Alice 发的消息只进服务端和 Alice 本地,不会在 Bob 重连时自动补发。
  • Bob 重连后再发实时消息,Bob 能收到新的实时事件。

对应命令:

uv run pytest tests/test_server_direct_connection.py
uv run pytest tests/test_client_direct.py
uv run pytest tests/test_direct_e2e.py

TUI:界面只是网络状态的投影

TUI 放在 src/chat_tui/tui/。这里不要让 UI 直接拼协议帧,也不要让 UI 直接知道服务端数据库。界面只调用客户端状态对象,再把状态渲染出来。

AuthenticationView 很薄,只负责控件:

class AuthenticationView(Container):
    def compose(self) -> ComposeResult:
        with Container(id="auth-panel"):
            with Horizontal(id="brand-row"):
                yield Static("CHAT TUI", id="brand-mark")
                yield Static(f"DISCONNECTED {self.host}:{self.port}", id="connection-status")
            yield Static("Sign in to continue", id="auth-title")
            yield Input(placeholder="Username", id="username")
            yield Input(placeholder="Password", password=True, id="password")
            with Horizontal(id="auth-actions"):
                yield Button("Log in", id="login-button", variant="primary")
                yield Button("Register", id="register-button")
            yield Static("", id="auth-error")

ChatTuiApp 负责连接、登录、注册和切换到 workspace:

async def _authenticate(self, action: str) -> None:
    view = self.query_one(AuthenticationView)
    username, password = view.credentials
    if not username or not password:
        view.set_error("Username and password are required.")
        return

    if not await self._ensure_connected():
        return

    try:
        if action == "register":
            result = await self._client.register(username, password, timeout=self.auth_timeout)
        else:
            result = await self._client.login(username, password, timeout=self.auth_timeout)
    except ClientNetworkError as error:
        view.set_error(_format_client_error(error))
        return

    self._authenticated_username = result.username
    await self._show_workspace(result.username)

WorkspaceView 是主界面,左侧会话列表和群/私聊操作,右侧时间线和输入框。它组合三个客户端状态对象:

self.group_messages = ClientGroupMessageState(
    network_client,
    store,
    local_username=username,
)
self.group_sessions = ClientGroupSessionState(network_client, store)
self.direct_chats = ClientDirectChatState(
    network_client,
    store,
    local_username=username,
)

挂载时先加载本地历史,再启动后台事件循环:

async def on_mount(self) -> None:
    await self.store.connect()
    await self.group_sessions.load()
    await self.group_messages.load()
    await self.direct_chats.load()
    self._sessions = self._collect_sessions()
    if self._sessions:
        await self.select_session(self._sessions[0].key)
    else:
        self._render_all()
    self._event_task = asyncio.create_task(self._event_loop())
    self.query_one("#composer", Input).focus()

发送消息时,根据当前会话类型调用群消息或私聊状态:

async def send_current_message(self) -> None:
    content = self.query_one("#composer", Input).value.strip()
    if self._current_session is None:
        self._set_status("Select a session before sending.")
        return

    validation_error = _validate_composer_content(self._composer_mode, content)
    if validation_error is not None:
        self._set_status(validation_error)
        return

    if self._current_session.kind == "group":
        await self.group_messages.send_group_message(
            self._current_session.name,
            content_type=self._composer_mode,
            content=content,
            timeout=self.send_timeout,
        )
    else:
        await self.direct_chats.send_direct_message(
            self._current_session.name,
            content_type=self._composer_mode,
            content=content,
            timeout=self.send_timeout,
        )

图片模式只校验 Base64,不处理文件上传:

def _validate_composer_content(mode: ComposerMode, content: str) -> str | None:
    if mode == "text":
        if not content:
            return "Message text is required."
        return None

    if not content:
        return "Image payload is required."
    try:
        base64.b64decode(content, validate=True)
    except (binascii.Error, ValueError):
        return "Image payload must be valid Base64."
    return None

Textual 测试用 App.run_test(),配合假的网络客户端,不需要真的启动终端:

async def test_send_text_message_from_composer(tmp_path: Path) -> None:
    database_path = tmp_path / "client.sqlite3"
    await seed_workspace_history(database_path)
    client = FakeWorkspaceNetworkClient(
        responses=[
            group_send_success(
                group="general",
                sender="alice",
                content="message from tui",
            )
        ]
    )
    app = workspace_app(database_path, client)

    async with app.run_test() as pilot:
        await pilot.pause()
        app.query_one("#composer", Input).value = "message from tui"

        await pilot.press("enter")
        await pilot.pause()

        assert client.calls == [
            (
                "group.send",
                {
                    "group": "general",
                    "content_type": "text",
                    "content": "message from tui",
                },
                5.0,
            )
        ]

对应命令:

uv run pytest tests/test_tui_auth.py tests/test_tui_workspace.py

启动命令、日志和指标最后接入

业务链路能跑通后,再补启动命令。服务端 CLI 在 src/chat_tui/server/cli.py

uv run chat-tui-server --host 127.0.0.1 --port 8765 --db data/server.sqlite3

需要支持这些参数:

  • --host--port:TCP 监听地址。
  • --db:服务端 SQLite 路径。
  • --log-level--log-file:日志配置。
  • --metrics-interval:周期性输出运行指标。

客户端 CLI 在 src/chat_tui/client/cli.py

uv run chat-tui-client --host 127.0.0.1 --port 8765 --history-db data/alice.sqlite3

日志要遵守一个边界:记录运行元数据,不记录敏感内容。可以记录 host、port、request type、error code、用户名、群名、私聊对象;不要记录密码、消息正文、Base64 图片负载。

共享日志配置可以写成一个幂等函数:

def configure_logging(
    log_level: str | int = DEFAULT_LOG_LEVEL,
    *,
    log_file: str | PathLike[str] | None = None,
    stream: TextIO | None = None,
) -> logging.Logger:
    logger = logging.getLogger("chat_tui")
    logger.setLevel(parse_log_level(log_level))
    logger.propagate = False

    _remove_managed_handlers(logger)

    formatter = logging.Formatter(_LOG_FORMAT, datefmt=_DATE_FORMAT)
    logger.addHandler(_managed_handler(logging.StreamHandler(stream), formatter))

    if log_file is not None:
        path = Path(log_file)
        path.parent.mkdir(parents=True, exist_ok=True)
        logger.addHandler(
            _managed_handler(logging.FileHandler(path, encoding="utf-8"), formatter)
        )

    return logger

指标也不要一上来做 dashboard。仓库只做了服务端内存计数器和周期日志:

@dataclass(frozen=True, slots=True)
class ServerMetricsSnapshot:
    uptime_seconds: float
    active_connections: int
    total_connections: int
    online_users: int
    auth_successes: int
    auth_rejections: int
    protocol_faults: int
    handled_requests: int
    process_cpu_seconds: float
    process_cpu_percent: float
    max_rss_kib: int | None

周期输出由服务端 CLI 启动:

async def log_server_metrics_periodically(
    server: ChatServer,
    *,
    interval: float,
    stop_event: asyncio.Event,
) -> None:
    while True:
        try:
            await asyncio.wait_for(stop_event.wait(), timeout=interval)
        except TimeoutError:
            snapshot = await server.metrics_snapshot()
            LOGGER.info("server metrics %s", snapshot.as_log_fields())
            continue
        return

这些功能的测试主要看三件事:

  • CLI 参数能解析,命令能启动到监听状态。
  • 日志包含排障需要的元数据。
  • 日志不包含密码、消息正文、图片 payload。
  • 指标计数会随真实 TCP 行为变化。

对应命令:

uv run pytest tests/test_startup_cli.py
uv run pytest tests/test_logging_config.py tests/test_server_connection.py tests/test_client_logging.py tests/test_tui_logging.py
uv run pytest tests/test_server_metrics.py
uv run pytest tests/test_documentation.py

验收测试:别只测函数,要测系统边界

前面的测试保证每个模块局部正确,最后还要有验收测试。chat-tui 的提交历史里,Step 26 和 Step 27 就是在补恢复和并发验收,后面又补了主动负载性能测试。

恢复测试要覆盖服务端和客户端两个恢复目标:

  • 服务端重启后,用户、群、成员关系、服务端消息还在。
  • 客户端重启后,本地会话、历史、未读数、在线标记还在。

测试大概是这个结构:

async with RunningRecoveryStack(server_database_path) as first_stack:
    # 注册用户、建群、发群消息、发私聊消息
    ...

async with RunningRecoveryStack(server_database_path) as restarted_stack:
    # 用同一个 server.sqlite3 重新启动服务端
    # 用同一个 alice.sqlite3 / bob.sqlite3 重新加载客户端本地状态
    ...

assert restored_members == ["alice", "bob"]
assert restored_alice_group_messages.messages_for_group("general")
assert restored_bob_direct.messages_for_user("alice")

协议故障测试要模拟坏客户端:

  • 发零长度帧。
  • 发非法 JSON。
  • 发未知消息类型。
  • 发缺字段请求。
  • 最后仍然能正常注册。
  • 同时健康客户端仍然能创建群。

并发验收不用一开始追求很大的数字。仓库要求是 50 个已认证空闲客户端在线时,代表性群消息和私聊消息投递低于 500ms:

IDLE_CLIENT_COUNT = 50
DELIVERY_TARGET_SECONDS = 0.5

主动负载测试再进一步:12 个在线客户端加入同一个群,连续发 24 条消息,所有客户端都收到完整广播,整体小于 2 秒:

ACTIVE_CLIENT_COUNT = 12
MESSAGE_BURST_COUNT = 24
BURST_DELIVERY_TARGET_SECONDS = 2.0

对应命令:

uv run pytest tests/test_recovery_acceptance.py
uv run pytest tests/test_concurrency_acceptance.py
uv run pytest tests/test_performance_acceptance.py

最后跑全量:

uv run pytest

本地跑起来

服务端:

uv run chat-tui-server --host 127.0.0.1 --port 8765 --db data/server.sqlite3

第一个客户端:

uv run chat-tui-client --host 127.0.0.1 --port 8765 --history-db data/alice.sqlite3

第二个客户端:

uv run chat-tui-client --host 127.0.0.1 --port 8765 --history-db data/bob.sqlite3

带日志和指标的服务端:

uv run chat-tui-server \
  --host 127.0.0.1 \
  --port 8765 \
  --db data/server.sqlite3 \
  --log-level debug \
  --log-file data/logs/server.log \
  --metrics-interval 5

这个项目的实现顺序可以直接复用到别的小型网络系统里:协议先行,存储跟着业务事实走,服务端用状态机处理连接,客户端把请求响应和后台事件分开,UI 只投影客户端状态。每做完一层就补对应测试,最后再用端到端、恢复、并发和性能验收确认系统边界。

后续如果要扩展,不建议直接在现有 handler 里继续堆分支。离线消息重放需要改服务端消息游标和客户端同步协议;ACK/retry 需要请求 ID 或消息 ID;文件传输需要从 Base64 字符串升级到分片和元数据;多端同步需要把客户端本地状态和服务端事实重新划边界。这些都不是小 UI 改动,应该从协议和持久化模型开始设计。


以上