Frontend#
Overview#
RTP_LLM currently comprises three core components: Frontend, Backend, and Master.
Frontend Workflow:
Accepts incoming requests
Converts inputs to token IDs (includes tokenizer decoding and OpenAI request rendering)
Queries the Master to obtain Backend IP
Submits requests to Backend and awaits responses
Processes responses (includes tokenizer encoding and function call rendering)
Role Initialization#
class RoleType(Enum):
PDFUSION = 0 # Monolithic mode
PREFILL = 1
DECODE = 2
VIT = 3
FRONTEND = 4
The active role is determined by the ROLE_TYPE environment variable (default: PDFUSION). Other roles only launch the corresponding component.
In frontend only deployments, engine initialization is skipped for rapid tokenizer/renderer debugging.
Backend servers still host Frontend apps (for health checks/debugging).
Italicized APIs below are only usable when locally paired with a Backend server.
Public APIs#
Health Check Endpoints#
Verifies Backend status (returns ok/error). Call same endpoints in Backend.
@app.get("/health")
@app.post("/health")
@app.get("/GraphService/cm2_status")
@app.post("/GraphService/cm2_status")
@app.get("/SearchService/cm2_status")
@app.post("/SearchService/cm2_status")
@app.get("/status")
@app.post("/status")
@app.post("/health_check")
@app.get("/")
Debug Endpoints#
Proxied to same endpoints in Backend.
@app.get("/cache_status")
@app.post("/cache_status")
@app.get("/rtp_llm/cache_status")
@app.post("/rtp_llm/cache_status")
# input
class WorkerStatusRequest(BaseModel):
lastest_cache_version: Optional[int] = -1
# output
class CacheStatus(BaseModel):
available_kv_cache: int = -1
total_kv_cache: int = -1
block_size: int = -1
version: int = -1
cached_keys: Optional[List[int]] = None
@app.get("/worker_status")
@app.post("/worker_status")
@app.get("/rtp_llm/worker_status")
@app.post("/rtp_llm/worker_status")
# input
class WorkerStatusRequest(BaseModel):
lastest_cache_version: Optional[int] = -1
latest_finised_version: Optional[int] = -1
# output
class WorkStatus(BaseModel):
role: str # prefill, decode, vit
server_port: int
http_proto_port: int
grpc_proto_port: int
available_concurrency: int
running_task_info: List[TaskInfo]
finished_task_list: List[TaskInfo]
step_latency_ms: float
iterate_count: int
dp_size: int
tp_size: int
alive: bool
version: int
cache_status: Optional[CacheStatus] = None
profile_meta: Optional[ProfileMeta] = None
Dynamic Update Endpoints#
Proxied to same endpoints in Backend.
@app.post("/update")
# example : {"peft_info": {"lora_info": {"lora_0": "/lora/llama-lora-test/""}}}
# input
class VersionInfo(BaseModel):
models_info: Optional[Dict[str, str]] = None
peft_info: Optional[Dict[str, Any]] = None
sampler_info: Optional[Dict[str, Any]] = None
# output:
# error info when failed
@app.post("/set_log_level")
# request format: {"log_level": "DEBUG/INFO/TRACE/WARNING"}
@app.post("/update_eplb_config")
# request format: {"mode": "NONE", "update_time": 5000}
# input:
class EplbMode(Enum):
NONE
STATS # stats, only
EPLB # load balance, only
ALL # stats + load balance
class EplbConfig:
mode: EplbMode
update_time: int
Embedding APIs#
Proxied to same endpoints in Backend.
python
@app.post("/v1/embeddings")
@app.post("/v1/embeddings/dense")
@app.post("/v1/embeddings/sparse")
@app.post("/v1/embeddings/colbert")
@app.post("/v1/embeddings/similarity")
@app.post("/v1/classifier")
@app.post("/v1/reranker")
Inference APIs#
@app.post("/")
# input
# prompt: str
# urls: optional[List[str]]
# generate_config: GenerateConfig
# output
# inference result
@app.post("/chat/completions")
@app.post("/v1/chat/completions")
# input
class ChatCompletionRequest(BaseModel):
model: Optional[str] = None
messages: List[ChatMessage]
functions: Optional[List[GPTFunctionDefinition]] = None
tools: Optional[List[GPTToolDefinition]] = None
temperature: Optional[float] = 0.7
top_p: Optional[float] = 1.0
max_tokens: Optional[int] = None
stop: Optional[Union[str, List[str]]] = Field(default_factory=list)
stream: Optional[bool] = False
user: Optional[str] = None
seed: Optional[int] = None
n: Optional[int] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
# ---- These functions are not implemented yet.
# presence_penalty: Optional[float] = 0.0
# frequency_penalty: Optional[float] = 0.0
# logit_bias: Optional[Dict[str, float]] = None
# ---- These params are hacked for our framework, not standard.
extra_configs: Optional[GenerateConfig] = None
private_request: bool = False
trace_id: Optional[str] = None
chat_id: Optional[str] = None
template_key: Optional[str] = None
user_template: Optional[str] = None
debug_info: Optional[bool] = False
aux_info: Optional[bool] = False
extend_fields: Optional[Dict[str, Any]] = (
None # This field is not effective, only for logging.
)
master_info: Optional[Dict[str, Any]] = None
chat_template_kwargs: Optional[Dict[str, Any]] = None
# output
# inference response
Prompt Processing APIs#
@app.post("/chat/render")
@app.post("/v1/chat/render")
# input
class ChatCompletionRequest:
...
# output
class DebugInfo(BaseModel):
input_prompt: str
input_ids: List[int]
input_urls: List[str]
tokenizer_info: str
max_seq_len: int
eos_token_id: Optional[int]
stop_word_ids_list: List[List[int]]
stop_words_list: List[str]
renderer_info: RendererInfo
generate_config: GenerateConfig
@app.post("/tokenizer/encode")
# input
# prompt: str
# return_offsets_mapping: bool
# output
class TokenizerEncodeResponse(BaseModel):
token_ids: List[int] = []
offset_mapping: Optional[List[Any]] = None
tokens: List[str] = []
error: str = ""
@app.post("/tokenize")
# input
# raw or openai request
# output
# token ids
Internal Communication#
Frontend → Master: HTTP call to obtain Backend IP.
Frontend → Backend: gRPC call for inference (see model_rpc_service.proto).
Master APIs#
class RoleType(Enum):
PDFUSION = 0
PREFILL = 1
DECODE = 2
VIT = 3
class ServerStatus(BaseModel):
role: RoleType
server_ip: str
http_port: int
grpc_port: int
debug_info: Optional[DebugInfo]
class ScheduleMeta(BaseModel):
server_status: List[ServerStatus]
cache_local: int # 0: LOCAL, 1: REMOTE
inter_request_id: int
@app.post("/rtp_llm/master")
# "real_master_host": "{master_ip}:{port}"
@app.post("/rtp_llm/schedule")
# input
# model: str
# block_cache_keys: list[int]
# seq_len: int
# debug: bool
# generate_timeout: int
# request_priority: int
# output
# ScheduleMeta
Backend grpc APIs#
Reference rtp_llm/cpp/proto/model_rpc/service.proto
GenerateStreamCall
# input
# GenerateInputPB
# output
# GenerateOutputsPB