Frontend#

Overview#

RTP_LLM currently comprises three core components: Frontend, Backend, and Master.

Frontend Workflow:

  • Accepts incoming requests

  • Converts inputs to token IDs (includes tokenizer decoding and OpenAI request rendering)

  • Queries the Master to obtain Backend IP

  • Submits requests to Backend and awaits responses

  • Processes responses (includes tokenizer encoding and function call rendering)

Role Initialization#

class RoleType(Enum):
    PDFUSION = 0  # Monolithic mode
    PREFILL = 1
    DECODE = 2
    VIT = 3
    FRONTEND = 4

The active role is determined by the ROLE_TYPE environment variable (default: PDFUSION). Other roles only launch the corresponding component.

In frontend only deployments, engine initialization is skipped for rapid tokenizer/renderer debugging.

Backend servers still host Frontend apps (for health checks/debugging).

Italicized APIs below are only usable when locally paired with a Backend server.



Public APIs#

Health Check Endpoints#

Verifies Backend status (returns ok/error). Call same endpoints in Backend.

@app.get("/health")
@app.post("/health")
@app.get("/GraphService/cm2_status")
@app.post("/GraphService/cm2_status")
@app.get("/SearchService/cm2_status")
@app.post("/SearchService/cm2_status")
@app.get("/status")
@app.post("/status")
@app.post("/health_check")

@app.get("/")

Debug Endpoints#

Proxied to same endpoints in Backend.

@app.get("/cache_status")
@app.post("/cache_status")
@app.get("/rtp_llm/cache_status")
@app.post("/rtp_llm/cache_status")

# input
class WorkerStatusRequest(BaseModel):
    lastest_cache_version: Optional[int] = -1

# output
class CacheStatus(BaseModel):
    available_kv_cache: int = -1
    total_kv_cache: int  = -1
    block_size: int = -1
    version: int = -1
    cached_keys: Optional[List[int]] = None
@app.get("/worker_status")
@app.post("/worker_status")
@app.get("/rtp_llm/worker_status")
@app.post("/rtp_llm/worker_status")

# input
class WorkerStatusRequest(BaseModel):
    lastest_cache_version: Optional[int] = -1
    latest_finised_version: Optional[int] = -1

# output
class WorkStatus(BaseModel):
    role: str  # prefill, decode, vit
    server_port: int
    http_proto_port: int
    grpc_proto_port: int
    available_concurrency: int

    running_task_info: List[TaskInfo]
    finished_task_list: List[TaskInfo]

    step_latency_ms: float
    iterate_count: int

    dp_size: int
    tp_size: int
    alive: bool
    version: int
    cache_status: Optional[CacheStatus] = None
    profile_meta: Optional[ProfileMeta] = None

Dynamic Update Endpoints#

Proxied to same endpoints in Backend.

@app.post("/update")
# example : {"peft_info": {"lora_info": {"lora_0": "/lora/llama-lora-test/""}}}

# input
class VersionInfo(BaseModel):
    models_info: Optional[Dict[str, str]] = None
    peft_info: Optional[Dict[str, Any]] = None
    sampler_info: Optional[Dict[str, Any]] = None

# output:
# error info when failed
@app.post("/set_log_level")
# request format: {"log_level": "DEBUG/INFO/TRACE/WARNING"}
@app.post("/update_eplb_config")
# request format: {"mode": "NONE", "update_time": 5000}

# input:
class EplbMode(Enum):
    NONE
    STATS  # stats, only
    EPLB   # load balance, only
    ALL    # stats + load balance

class EplbConfig:
  mode: EplbMode
  update_time: int

Embedding APIs#

Proxied to same endpoints in Backend.

python
@app.post("/v1/embeddings")
@app.post("/v1/embeddings/dense")
@app.post("/v1/embeddings/sparse")
@app.post("/v1/embeddings/colbert")
@app.post("/v1/embeddings/similarity")
@app.post("/v1/classifier")
@app.post("/v1/reranker")

Inference APIs#

@app.post("/")
# input
# prompt: str
# urls: optional[List[str]]
# generate_config: GenerateConfig

# output
# inference result
@app.post("/chat/completions")
@app.post("/v1/chat/completions")

# input
class ChatCompletionRequest(BaseModel):
  model: Optional[str] = None
  messages: List[ChatMessage]
  functions: Optional[List[GPTFunctionDefinition]] = None
  tools: Optional[List[GPTToolDefinition]] = None
  temperature: Optional[float] = 0.7
  top_p: Optional[float] = 1.0
  max_tokens: Optional[int] = None
  stop: Optional[Union[str, List[str]]] = Field(default_factory=list)
  stream: Optional[bool] = False
  user: Optional[str] = None
  seed: Optional[int] = None
  n: Optional[int] = None
  logprobs: Optional[bool] = None
  top_logprobs: Optional[int] = None

  # ---- These functions are not implemented yet.
  # presence_penalty: Optional[float] = 0.0
  # frequency_penalty: Optional[float] = 0.0
  # logit_bias: Optional[Dict[str, float]] = None

  # ---- These params are hacked for our framework, not standard.
  extra_configs: Optional[GenerateConfig] = None
  private_request: bool = False
  trace_id: Optional[str] = None
  chat_id: Optional[str] = None
  template_key: Optional[str] = None
  user_template: Optional[str] = None
  debug_info: Optional[bool] = False
  aux_info: Optional[bool] = False
  extend_fields: Optional[Dict[str, Any]] = (
    None  # This field is not effective, only for logging.
  )
  master_info: Optional[Dict[str, Any]] = None
  chat_template_kwargs: Optional[Dict[str, Any]] = None

# output
# inference response

Prompt Processing APIs#

@app.post("/chat/render")
@app.post("/v1/chat/render")

# input
class ChatCompletionRequest:
    ...

# output
class DebugInfo(BaseModel):
    input_prompt: str
    input_ids: List[int]
    input_urls: List[str]
    tokenizer_info: str
    max_seq_len: int
    eos_token_id: Optional[int]
    stop_word_ids_list: List[List[int]]
    stop_words_list: List[str]
    renderer_info: RendererInfo
    generate_config: GenerateConfig
@app.post("/tokenizer/encode")
# input
# prompt: str
# return_offsets_mapping: bool

# output
class TokenizerEncodeResponse(BaseModel):
    token_ids: List[int] = []
    offset_mapping: Optional[List[Any]] = None
    tokens: List[str] = []
    error: str = ""

@app.post("/tokenize")
# input
# raw or openai request

# output
# token ids

Internal Communication#

Frontend → Master: HTTP call to obtain Backend IP.

Frontend → Backend: gRPC call for inference (see model_rpc_service.proto).

Master APIs#

class RoleType(Enum):
    PDFUSION = 0
    PREFILL = 1
    DECODE = 2
    VIT = 3

class ServerStatus(BaseModel):
    role: RoleType
    server_ip: str
    http_port: int
    grpc_port: int
    debug_info: Optional[DebugInfo]

class ScheduleMeta(BaseModel):
    server_status: List[ServerStatus]
    cache_local: int          # 0: LOCAL, 1: REMOTE
    inter_request_id: int

@app.post("/rtp_llm/master")
# "real_master_host": "{master_ip}:{port}"

@app.post("/rtp_llm/schedule")
# input
# model: str
# block_cache_keys: list[int]
# seq_len: int
# debug: bool
# generate_timeout: int
# request_priority: int

# output
# ScheduleMeta

Backend grpc APIs#

Reference rtp_llm/cpp/proto/model_rpc/service.proto

GenerateStreamCall
# input
# GenerateInputPB

# output
# GenerateOutputsPB