@puregram/stream
stream LLM output to telegram via sendMessageDraft + a terminal sendMessage. any AsyncIterable<string> (or a known LLM SDK stream) is turned into repeated animated draft previews that finalize as real messages once each 4096-char window fills or the source exhausts
this is a runtime plugin — install it via tg.extend(stream())
when to use
- piping an LLM response token-by-token into a private telegram chat as it generates
- showing a thinking animation before the answer arrives (
thinkingPlaceholder: true) - streaming output from OpenAI, Anthropic, Vercel AI SDK, Ollama, LangChain, or any
AsyncIterable<string>
WARNING
drafts are private-chat only — it's a telegram api constraint. the plugin throws synchronously before consuming the source if you target a group or channel. a group fallback using editMessageText is planned
install
yarn add @puregram/streamnpm i -S @puregram/streampnpm add @puregram/streamoptionally, add @puregram/markup if you want parseMode support:
yarn add @puregram/markupquick start
import { Telegram } from 'puregram'
import { stream } from '@puregram/stream'
import OpenAI from 'openai'
const tg = Telegram.fromToken(process.env.TOKEN!).extend(stream())
const openai = new OpenAI()
tg.onMessage(async (message) => {
const completion = await openai.chat.completions.create({
model: 'gpt-4o-mini',
stream: true,
messages: [{ role: 'user', content: message.text ?? 'tell me a joke' }]
})
await message.stream(completion)
})
await tg.startPolling()two call shapes
// inside an update handler — chat_id is inferred from the message
await message.stream(source, options?)
// out-of-context — pass chat_id explicitly
await tg.stream({ chat_id: 12345, source, ...options })both return a Promise<StreamResult> with accounting for how the stream went
sources
@puregram/stream duck-types the source automatically. you can also call a named adapter for better type inference:
openai
import { fromOpenAI } from '@puregram/stream'
const completion = await openai.chat.completions.create({ model: 'gpt-4o-mini', stream: true, messages: [...] })
await message.stream(completion) // auto-detect
await message.stream(fromOpenAI(completion)) // explicitanthropic
import { fromAnthropic } from '@puregram/stream'
const result = client.messages.stream({ model: 'claude-sonnet-4-5', max_tokens: 1024, messages: [...] })
await message.stream(fromAnthropic(result))vercel ai sdk
import { fromVercelAI } from '@puregram/stream'
import { streamText } from 'ai'
const result = await streamText({ model: openai('gpt-4o'), prompt: 'tell me a joke' })
await message.stream(result) // duck-typed via .textStream
await message.stream(fromVercelAI(result)) // explicitollama
import { fromOllama } from '@puregram/stream'
const res = await ollama.chat({ model: 'llama3', messages: [...], stream: true })
await message.stream(fromOllama(res))langchain
import { fromLangChain } from '@puregram/stream'
const result = await chain.stream({ input: 'tell me a joke' })
await message.stream(fromLangChain(result))raw AsyncIterable<string>
async function * generate () {
for (const word of ['hello', ' ', 'world']) {
yield word
await new Promise(r => setTimeout(r, 50))
}
}
await message.stream(generate())other adapters
| adapter | accepts |
|---|---|
fromTextStream | web ReadableStream<string> |
fromBytes | AsyncIterable<Uint8Array> (utf-8 decoded) |
fromEventEmitter | node EventEmitter — listens on 'text' events by default |
options
StreamCallOptions — passed as the second argument to update.stream(source, options?) or spread into tg.stream({ chat_id, source, ...options }):
| option | type | default | notes |
|---|---|---|---|
parseMode | 'MarkdownV2' | 'HTML' | plain text | lenient per-tick, strict on finalize. requires @puregram/markup |
editIntervalMs | number | 250 | soft floor between sendMessageDraft calls (ms) |
maxEditBackoff | number | 4000 | drop a draft tick when local backoff exceeds this |
thinkingPlaceholder | boolean | true | emit an empty draft eagerly on start so users see the "typing" animation |
draftIdOffset | number | hybrid | derived from message_id << 8 on update.stream; counter-based for tg.stream |
signal | AbortSignal | — | aborts mid-stream, finalizes last-good text |
message_thread_id | number | — | forwarded to sendMessage / sendMessageDraft |
reply_parameters | ReplyParameters | — | forwarded to sendMessage |
link_preview_options | LinkPreviewOptions | — | forwarded to sendMessage |
disable_notification | boolean | — | forwarded to sendMessage |
protect_content | boolean | — | forwarded to sendMessage |
reply_markup | ReplyMarkup | — | only attached to the terminal sendMessage, not to drafts |
onPiece | (piece, draftId) => void | — | called per source yield |
onDraftFinalized | (msg) => void | — | called per terminal sendMessage |
onError | (err) => void | Promise<void> | — | called for source / draft / parse errors |
return value — StreamResult
interface StreamResult {
messages: TelegramMessage[] // committed sendMessage results, in order
drafts: number // distinct sendMessageDraft calls issued
pieces: number // chunks pulled from source
bytes: number // total text bytes streamed
skipped: number // draft ticks coalesced or dropped under back-pressure
aborted: boolean // true when AbortSignal triggered
}error handling
| situation | behavior |
|---|---|
| source throws mid-stream | stop pulling, finalize last-good via sendMessage, call onError, rethrow |
AbortSignal.abort() | stop pulling, finalize last-good, set result.aborted = true, no rethrow |
| chat is not private | throws synchronously before consuming the source |
maxEditBackoff exceeded | drop the draft tick, skipped += 1, continue |
terminal sendMessage fails | never dropped — bubbles up |
| strict-parse failure on finalize | falls back to raw text, onError called |
using runStream directly
when you need the state machine without going through a Telegram instance:
import { runStream, type StreamApi } from '@puregram/stream'
const fakeApi: StreamApi = {
sendMessage: async (params) => { /* ... */ },
sendMessageDraft: async (params) => { /* ... */ }
}
const result = await runStream(fakeApi, {
chatId: 12345,
source: generate(),
draftIdOffset: 0
})useful for testing or for embedding the streaming logic in custom transports
throttling
@puregram/stream does not implement its own throttling. it leans on puregram's built-in retryOnFloodWait for 429s and on maxEditBackoff to drop stale ticks. for strict per-chat pacing, layer @puregram/throttler on top
exported surface
import {
stream, // plugin factory
runStream, // low-level state machine
normalize, // normalize any StreamSource to AsyncIterable<string>
// named adapters
fromOpenAI,
fromAnthropic,
fromVercelAI,
fromOllama,
fromLangChain,
fromTextStream,
fromBytes,
fromEventEmitter,
// constants
DRAFT_TTL_MS,
DRAFT_SAFETY_MS,
MAX_CHUNK,
DRAFT_ID_MAX,
DEFAULT_EDIT_INTERVAL_MS,
DEFAULT_MAX_EDIT_BACKOFF
} from '@puregram/stream'
import type {
StreamCallOptions,
StreamTgParams,
StreamExtension,
StreamSource,
StreamResult,
StreamApi,
RunStreamOptions,
StreamForwardOptions,
StreamCallbacks,
ParseMode,
ParsedPayload
} from '@puregram/stream'see also
- plugins & .extend — how
.extend(plugin)works - markup plugin — parse
MarkdownV2/HTMLinto entities (required forparseMode) - throttler plugin — outbound rate limiting for bots that send at scale
- /api/methods —
sendMessage,sendMessageDrafton the wire