Skip to content

@puregram/stream

stream LLM output to telegram via sendMessageDraft + a terminal sendMessage. any AsyncIterable<string> (or a known LLM SDK stream) is turned into repeated animated draft previews that finalize as real messages once each 4096-char window fills or the source exhausts

this is a runtime plugin — install it via tg.extend(stream())

when to use

  • piping an LLM response token-by-token into a private telegram chat as it generates
  • showing a thinking animation before the answer arrives (thinkingPlaceholder: true)
  • streaming output from OpenAI, Anthropic, Vercel AI SDK, Ollama, LangChain, or any AsyncIterable<string>

WARNING

drafts are private-chat only — it's a telegram api constraint. the plugin throws synchronously before consuming the source if you target a group or channel. a group fallback using editMessageText is planned

install

sh
yarn add @puregram/stream
sh
npm i -S @puregram/stream
sh
pnpm add @puregram/stream

optionally, add @puregram/markup if you want parseMode support:

sh
yarn add @puregram/markup

quick start

ts
import { Telegram } from 'puregram'
import { stream } from '@puregram/stream'
import OpenAI from 'openai'

const tg = Telegram.fromToken(process.env.TOKEN!).extend(stream())
const openai = new OpenAI()

tg.onMessage(async (message) => {
  const completion = await openai.chat.completions.create({
    model: 'gpt-4o-mini',
    stream: true,
    messages: [{ role: 'user', content: message.text ?? 'tell me a joke' }]
  })

  await message.stream(completion)
})

await tg.startPolling()

two call shapes

ts
// inside an update handler — chat_id is inferred from the message
await message.stream(source, options?)

// out-of-context — pass chat_id explicitly
await tg.stream({ chat_id: 12345, source, ...options })

both return a Promise<StreamResult> with accounting for how the stream went

sources

@puregram/stream duck-types the source automatically. you can also call a named adapter for better type inference:

openai

ts
import { fromOpenAI } from '@puregram/stream'

const completion = await openai.chat.completions.create({ model: 'gpt-4o-mini', stream: true, messages: [...] })

await message.stream(completion)             // auto-detect
await message.stream(fromOpenAI(completion)) // explicit

anthropic

ts
import { fromAnthropic } from '@puregram/stream'

const result = client.messages.stream({ model: 'claude-sonnet-4-5', max_tokens: 1024, messages: [...] })

await message.stream(fromAnthropic(result))

vercel ai sdk

ts
import { fromVercelAI } from '@puregram/stream'
import { streamText } from 'ai'

const result = await streamText({ model: openai('gpt-4o'), prompt: 'tell me a joke' })

await message.stream(result)              // duck-typed via .textStream
await message.stream(fromVercelAI(result)) // explicit

ollama

ts
import { fromOllama } from '@puregram/stream'

const res = await ollama.chat({ model: 'llama3', messages: [...], stream: true })

await message.stream(fromOllama(res))

langchain

ts
import { fromLangChain } from '@puregram/stream'

const result = await chain.stream({ input: 'tell me a joke' })

await message.stream(fromLangChain(result))

raw AsyncIterable<string>

ts
async function * generate () {
  for (const word of ['hello', ' ', 'world']) {
    yield word
    await new Promise(r => setTimeout(r, 50))
  }
}

await message.stream(generate())

other adapters

adapteraccepts
fromTextStreamweb ReadableStream<string>
fromBytesAsyncIterable<Uint8Array> (utf-8 decoded)
fromEventEmitternode EventEmitter — listens on 'text' events by default

options

StreamCallOptions — passed as the second argument to update.stream(source, options?) or spread into tg.stream({ chat_id, source, ...options }):

optiontypedefaultnotes
parseMode'MarkdownV2' | 'HTML'plain textlenient per-tick, strict on finalize. requires @puregram/markup
editIntervalMsnumber250soft floor between sendMessageDraft calls (ms)
maxEditBackoffnumber4000drop a draft tick when local backoff exceeds this
thinkingPlaceholderbooleantrueemit an empty draft eagerly on start so users see the "typing" animation
draftIdOffsetnumberhybridderived from message_id << 8 on update.stream; counter-based for tg.stream
signalAbortSignalaborts mid-stream, finalizes last-good text
message_thread_idnumberforwarded to sendMessage / sendMessageDraft
reply_parametersReplyParametersforwarded to sendMessage
link_preview_optionsLinkPreviewOptionsforwarded to sendMessage
disable_notificationbooleanforwarded to sendMessage
protect_contentbooleanforwarded to sendMessage
reply_markupReplyMarkuponly attached to the terminal sendMessage, not to drafts
onPiece(piece, draftId) => voidcalled per source yield
onDraftFinalized(msg) => voidcalled per terminal sendMessage
onError(err) => void | Promise<void>called for source / draft / parse errors

return value — StreamResult

ts
interface StreamResult {
  messages: TelegramMessage[] // committed sendMessage results, in order
  drafts: number              // distinct sendMessageDraft calls issued
  pieces: number              // chunks pulled from source
  bytes: number               // total text bytes streamed
  skipped: number             // draft ticks coalesced or dropped under back-pressure
  aborted: boolean            // true when AbortSignal triggered
}

error handling

situationbehavior
source throws mid-streamstop pulling, finalize last-good via sendMessage, call onError, rethrow
AbortSignal.abort()stop pulling, finalize last-good, set result.aborted = true, no rethrow
chat is not privatethrows synchronously before consuming the source
maxEditBackoff exceededdrop the draft tick, skipped += 1, continue
terminal sendMessage failsnever dropped — bubbles up
strict-parse failure on finalizefalls back to raw text, onError called

using runStream directly

when you need the state machine without going through a Telegram instance:

ts
import { runStream, type StreamApi } from '@puregram/stream'

const fakeApi: StreamApi = {
  sendMessage: async (params) => { /* ... */ },
  sendMessageDraft: async (params) => { /* ... */ }
}

const result = await runStream(fakeApi, {
  chatId: 12345,
  source: generate(),
  draftIdOffset: 0
})

useful for testing or for embedding the streaming logic in custom transports

throttling

@puregram/stream does not implement its own throttling. it leans on puregram's built-in retryOnFloodWait for 429s and on maxEditBackoff to drop stale ticks. for strict per-chat pacing, layer @puregram/throttler on top

exported surface

ts
import {
  stream,    // plugin factory
  runStream, // low-level state machine
  normalize, // normalize any StreamSource to AsyncIterable<string>
  // named adapters
  fromOpenAI,
  fromAnthropic,
  fromVercelAI,
  fromOllama,
  fromLangChain,
  fromTextStream,
  fromBytes,
  fromEventEmitter,
  // constants
  DRAFT_TTL_MS,
  DRAFT_SAFETY_MS,
  MAX_CHUNK,
  DRAFT_ID_MAX,
  DEFAULT_EDIT_INTERVAL_MS,
  DEFAULT_MAX_EDIT_BACKOFF
} from '@puregram/stream'

import type {
  StreamCallOptions,
  StreamTgParams,
  StreamExtension,
  StreamSource,
  StreamResult,
  StreamApi,
  RunStreamOptions,
  StreamForwardOptions,
  StreamCallbacks,
  ParseMode,
  ParsedPayload
} from '@puregram/stream'

see also