কম্পিউটার টিউটোরিয়াল

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

আমরা কী তৈরি করছি

এই নিবন্ধে, আমরা অত্যন্ত টেকসই LLM স্ট্রীম তৈরি করছি যা সহজেই বেঁচে থাকে:

  • নেটওয়ার্ক বিভ্রাট
  • পৃষ্ঠা রিফ্রেশ
  • ওয়েবসাইট বন্ধ করা হচ্ছে
  • ল্যাপটপের ঢাকনা বন্ধ করা হচ্ছে

বোনাস:আপনি একাধিক ডিভাইসে একই স্ট্রীম দেখতে পারেন (যেমন ফোন এবং ল্যাপটপ) একই সময়ে .

আপনি যতই স্ট্রীম ভাঙার চেষ্টা করুন না কেন, আপনি সংযোগ বিচ্ছিন্ন হওয়ার সময় এটি ব্যাকগ্রাউন্ডে চলতে থাকে এবং আপনি যখন ফিরে আসেন তখন মসৃণভাবে চলতে থাকে। এটি একটি অবিশ্বাস্য ব্যবহারকারীর অভিজ্ঞতা।

টেকসই এলএলএম স্ট্রিম ডেমো 👇

অনুপ্রেরণা

এআই দিয়ে তৈরি করার সময়, রিয়েল-টাইমে এআই প্রতিক্রিয়াগুলি স্ট্রিম করা একটি সর্বোত্তম অনুশীলন৷

পুরো প্রতিক্রিয়ার জন্য অপেক্ষা করার পরিবর্তে, আপনার ব্যবহারকারী রিয়েল-টাইমে বিষয়বস্তুটি তৈরি হওয়ার সাথে সাথে দেখেন - যা UX-এর জন্য আশ্চর্যজনক। Vercel-এর AI SDK-এর মতো টুলগুলি এটিকে অত্যন্ত সহজ করেছে:

import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
 
const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt: "Invent a new holiday and describe its traditions.",
});

রিয়েল-টাইম এলএলএম স্ট্রীমগুলিকে প্রযুক্তিগত স্তরে কাজ করার জন্য, আপনি একটি ক্লায়েন্টকে একটি API এর সাথে সংযুক্ত করুন এবং সার্ভার সেন্ট ইভেন্টস (এসএসই) এর মতো প্রোটোকল ব্যবহার করে ডেটা স্ট্রিম করুন:

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

কিন্তু:এই সেটআপে একটি সমস্যা আছে৷

যদি স্ট্রিম চলাকালীন কিছু ঘটে, যেমন আপনার ইন্টারনেট সংযোগ বিচ্ছিন্ন হওয়া, ল্যাপটপের ঢাকনা বন্ধ করা, বা নেটওয়ার্ক হেঁচকি, পুরো প্রজন্ম হারিয়ে যাবে। আপনাকে আবার শুরু করতে হবে এবং আবার পুরো প্রজন্মের জন্য অপেক্ষা করতে হবে। এটি বিশেষ করে দীর্ঘ প্রজন্মের জন্য বিরক্তিকর (যেমন, O1-এর মতো ব্যয়বহুল-থেকে-চালিত মডেলের সাথে)।

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

স্পষ্টতই এই সমস্যাটি মানুষের রাডারে রয়েছে। নির্ভরযোগ্য রিয়েল-টাইম এলএলএম স্ট্রিমিং-এর জন্য প্রকৃত চাহিদা রয়েছে এবং আরও devs এটিকে কার্যকর করার উপায় নিয়ে পরীক্ষা-নিরীক্ষা করছে:

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

উচ্চ-স্থায়িত্ব এলএলএম স্ট্রীম নির্মাণ

সত্যিকারের টেকসই, পুনঃসূচনাযোগ্য এলএলএম স্ট্রীম তৈরির গোপন সস ক্লায়েন্টকে প্রজন্মের পরিবেশ থেকে আলাদা করছে। ক্লায়েন্ট সংযোগগুলি অস্থির এবং অনেক কারণে সংযোগ বিচ্ছিন্ন হতে পারে, যেমন একটি ল্যাপটপ বন্ধ করা, নেটওয়ার্ক সমস্যা বা একটি পৃষ্ঠা রিফ্রেশ করা৷

ক্লায়েন্ট এবং জেনারেশন প্রসেসকে আলাদা রেখে, জেনারেশন সবসময় নিরবচ্ছিন্নভাবে চলতে থাকে। ক্লায়েন্ট যেকোন সময় পুনরায় সংযোগ করতে পারেন-চলমান প্রজন্মকে বাধা না দিয়ে।

খারাপ ধারণা:স্থায়ী, সরাসরি সংযোগ:

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

ভাল ধারণা:প্রতিস্থাপনযোগ্য, বাধাপ্রাপ্ত স্ট্রিম সংযোগ:

স্থিতিস্থাপক LLM স্ট্রীম তৈরি করুন যা সংযোগ বিচ্ছিন্ন, রিফ্রেশ এবং ক্র্যাশের মাধ্যমে অব্যাহত থাকে

এবং হ্যাঁ - এই আর্কিটেকচারটি একটি সাধারণ AI স্ট্রিমের জন্য বেশ জটিল মনে হতে পারে। যদিও আপনি এখনই কোডে দেখতে পাচ্ছেন, এটি কোডের মাত্র কয়েকটি লাইন এবং এটি বাস্তবায়ন করতে কয়েক মিনিট সময় নেয়।

টেকসই স্ট্রিম সেট আপ করা

একটি অত্যন্ত নির্ভরযোগ্য LLM স্ট্রিম সেটআপের তিনটি অংশ রয়েছে:

  • ক্লায়েন্ট (সামনে)
  • স্ট্রিম জেনারেটর (একটি API রুট)
  • স্ট্রিম ভোক্তা (এছাড়াও একটি API রুট)

ক্লায়েন্টের সাথে সরাসরি সমস্ত সংযোগ যেকোন সময় বিঘ্নিত বা বিরাম দেওয়া যেতে পারে। LLM আউটপুট স্ট্রীম (স্ট্রীম জেনারেটর) তৈরি করার জন্য দায়ী যুক্তির অংশটি তাই একটি স্বাধীন API হওয়া উচিত যা ক্লায়েন্টের সাথে সক্রিয় সংযোগ নেই৷

পরিবর্তে, আমরা একজন ভোক্তার মাধ্যমে ক্লায়েন্টের সাথে সংযোগ স্থাপন করব - যেটি শুধুমাত্র Redis থেকে ডেটা পড়ে এবং অন্যথায় এটি বেশ "মূর্খ"। এর একমাত্র উদ্দেশ্য হল জেনারেটরের আউটপুট পড়া এবং সমস্ত এলএলএম খণ্ডগুলি সরবরাহ করা যা কোনও ক্লায়েন্ট এখনও দেখেনি যখনই কোনও ক্লায়েন্ট এটির সাথে সংযোগ করে। এটাই।

দ্রুত সারাংশ - প্রতিটি অংশ কি করে:

  • ক্লায়েন্ট: স্ট্রীম জেনারেটর ট্রিগার করে (কিন্তু কখনই খোলা সংযোগ বজায় রাখে না) এবং রিয়েল-টাইম স্ট্রীম রেন্ডার করে
  • স্ট্রিম জেনারেটর: রিয়েল-টাইমে LLM আউটপুট তৈরি করে এবং Redis এ প্রকাশ করে
  • স্ট্রিম ভোক্তা: জেনারেটরের স্ট্রীম পড়ে এবং খণ্ডগুলিকে ক্লায়েন্টে ঠেলে দেয়

জেনারেটর শুধুমাত্র একটি LLM স্ট্রিম পড়ার জন্য এবং রিয়েল-টাইমে Redis-এ প্রকাশ করার জন্য দায়ী। আমরা ক্লায়েন্ট থেকে স্ট্রিম ভোক্তার সাথে একটি প্রতিস্থাপনযোগ্য সংযোগ পাই যা শেষ করা, পুনরায় সংযোগ করা ইত্যাদি হতে পারে- স্ট্রিম জেনারেটরকে কিছুই প্রভাবিত করে না৷

কোডের উদাহরণ

এই বিভাগে, আমরা কোডটি দেখব। নীতিগুলি খুব স্পষ্ট করার জন্য, আমরা শেষ পর্যন্ত প্রকৃত, সম্পূর্ণ উৎপাদন কোড বাস্তবায়ন দেখব৷

আপাতত, কোডটি বোঝা অনেক সহজ যদি আমরা পুরো কোড ফাইলের পরিবর্তে মূল স্নিপেট এবং তাদের উদ্দেশ্য দেখি।

1. ক্লায়েন্ট

ক্লায়েন্টের শুধুমাত্র 3টি দায়িত্ব রয়েছে:

  • সেশন আইডি তৈরি করা হচ্ছে
  • জেনারেটর ট্রিগার করা হচ্ছে
  • জেনারেশন স্ট্রীম রেন্ডারিং

আসুন প্রতিটির দিকে তাকাই:

ক্লায়েন্ট:সেশন আইডি তৈরি করা হচ্ছে

যখন একটি ক্লায়েন্ট একটি স্ট্রীমে সংযোগ করে বা পুনরায় সংযোগ করে, তখন আমরা সেই সমস্ত বার্তা পাঠাতে চাই যা ক্লায়েন্ট এখনও দেখেনি। এর মানে হল যে একটি সক্রিয় স্ট্রীম চলাকালীন, প্রতিটি বার্তায় শুধুমাত্র ক্লায়েন্টের দেখতে হবে এমন সঠিক ডেল্টা থাকে এবং পুরো স্ট্রিমটি নয়৷

পুনরায় সংযোগ করার সময়, বর্তমান প্রজন্মের বিন্দু পর্যন্ত সমগ্র স্ট্রীম পাঠানো হয় এবং সমস্ত ভবিষ্যতের ইভেন্টের সদস্যতা কোনো অনুপস্থিত অংশ ছাড়াই একেবারে নির্বিঘ্ন।

কিভাবে?

রেডিস স্ট্রীমস, রিয়েল-টাইম ডেটা দক্ষতার সাথে সঞ্চয় এবং পুনরুদ্ধার করার একটি উপায়, ভোক্তা গোষ্ঠী নামে পরিচিত কিছুর মাধ্যমে এই এখনও দেখা যায়নি এমন কার্যকারিতা রয়েছে। আমাদের একমাত্র কাজটি করতে হবে:নিশ্চিত করুন যে প্রতিটি ক্লায়েন্টের একটি অনন্য সেশন রয়েছে - যার অর্থ আমরা প্রতিটি প্রজন্মকে একটি অনন্য আইডি বরাদ্দ করি৷

স্ট্রীম ভোক্তার দিকে তাকালে আমরা ভোক্তা গোষ্ঠী সম্পর্কে আরও জানব। তারা দেখতে এইরকম:

await redis.xgroup("redis-key", {
 type: "CREATE",
 group: "my-group-name",
 id: "0",
});

কোন ক্লায়েন্ট কোন বিন্দু পর্যন্ত কোন স্ট্রীম আপ দেখেছে এবং কোন টুকরোগুলো অনুপস্থিত রয়েছে তার সম্পূর্ণ যুক্তি সম্পূর্ণভাবে Redis স্ট্রিম দ্বারা পরিচালিত হয় নিশ্চিত নির্ভুলতার সাথে। আমরা কখনই অনুপস্থিত LLM খণ্ড পাই না এবং সর্বদা একটি ক্লায়েন্টের প্রয়োজনীয় ডেটা পাঠাই।

ক্লায়েন্টকে আপাতত একমাত্র কাজটি করতে হবে:প্রতিটি প্রজন্মের জন্য একটি আইডি বরাদ্দ করুন। আমরা কেবল nanoid ব্যবহার করি :

import { customAlphabet } from "nanoid"
 
const nanoid = customAlphabet("0123456789", 6);

ক্লায়েন্ট:একটি জেনারেশন স্ট্রীম ট্রিগার করা

প্রজন্মের ইঞ্জিনের সাথে ক্লায়েন্টের একমাত্র মিথস্ক্রিয়াই এটিকে ট্রিগার করছে। যদিও টেকনিক্যালি, আপনি অন্য যেকোনো জায়গা থেকে প্রজন্মকে ট্রিগার করতে পারেন (যেমন CRON কাজ, স্বয়ংক্রিয় পাইপলাইন)।

এর সহজতম আকারে, এটি প্রজন্মের API রুটে একটি ফেচ কল:

// 👇 trigger stream generator
await fetch("/api/llm-stream", {
 method: "POST",
 headers: {
 "Content-Type": "application/json",
 },
 body: JSON.stringify({ prompt, sessionId }),
});

ক্লায়েন্ট:একটি জেনারেশন স্ট্রিম পড়া

জেনারেশন ট্রিগার করার পর, জেনারেটর LLM আউটপুটকে ক্লায়েন্ট থেকে সম্পূর্ণভাবে ডিকপল করা একটি কেন্দ্রীভূত রেডিস স্টোরে স্ট্রিমিং শুরু করে। আসুন প্রজন্মের স্ট্রীম পড়তে স্ট্রীম গ্রাহকের সাথে সংযোগ করি:

// 👇 connect to stream consumer
const res = await fetch(`/api/check-stream?sessionId=${sessionId}`, {
 headers: { "Content-Type": "text/event-stream" },
});

এটাই!

সেগুলি হল ক্লায়েন্টের তিনটি দায়িত্ব। অবশ্যই, আমরা আইডি তৈরির জন্য কাস্টম হুক, অতিরিক্ত নির্ভরযোগ্যতার জন্য প্রতিক্রিয়া-কোয়েরি এবং আরও অনেক কিছুর সাথে অনেক বেশি চৌকস পেতে পারি - আমরা পরে সম্পূর্ণ কোড উদাহরণগুলিতে এটি পেতে পারব।

2. স্ট্রিম জেনারেটর

স্ট্রিম জেনারেটর একটি LLM স্ট্রীম খোলে এবং প্রতিটি খণ্ডকে একটি Redis স্ট্রীমে লেখে। এটি রিয়েল-টাইম আপডেটের জন্য নতুন ডেটা সম্পর্কে স্ট্রিম ভোক্তাকে সতর্ক করার জন্য লেখা প্রতিটি অংশের জন্য একটি বার্তা প্রকাশ করে৷

দ্রষ্টব্য:আবার, এটি ইচ্ছাকৃতভাবে একটি সম্পূর্ণ কোড উদাহরণ নয়। আমরা শেষ পর্যন্ত সম্পূর্ণ কোডে পৌঁছাব-এটি ধারণাটি বোঝার জন্য।

import { streamText } from "ai"
import { redis } from "@/utils"
 
const result = await new Promise(
 async (resolve, reject) => {
 const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt,
 onError: (err) => reject(err),
 onFinish: async () => {
 resolve({
 // ...
 }),
 })
 
 for await (const chunk of textStream) {
 if (chunk) {
 const chunkMessage: ChunkMessage = {
 type: MessageType.CHUNK,
 content: chunk,
 }
 
 // 👇 write chunk to redis stream
 await redis.xadd(streamKey, "*", chunkMessage)
 
 // 👇 alert consumer that there's a new chunk
 await redis.publish(streamKey, { type: MessageType.CHUNK })
 }
 }
 }
)

3. স্ট্রিম কনজিউমার

স্ট্রীম ভোক্তা Redis এর সাথে সংযোগ স্থাপন করে এবং Redis pub/sub-এর মাধ্যমে নতুন খণ্ড সতর্কতা শোনে। প্রতিটি ক্লায়েন্ট তার দেখা এবং অদেখা মেসেজ ট্র্যাক করার জন্য তাদের নিজস্ব ভোক্তা গ্রুপ পায়।

দ্রষ্টব্য:প্রকাশটি প্রকৃত খণ্ডটি স্থানান্তর করে না, এটি কেবল সতর্ক করে যে স্ট্রীমে একটি নতুন খণ্ড উপলব্ধ৷

যখন একটি নতুন খণ্ড পাওয়া যায়, তখন স্ট্রিম কনজিউমার API এটি স্ট্রিম থেকে পড়ে এবং এটি সমস্ত সংযুক্ত ক্লায়েন্টদের কাছে ফরোয়ার্ড করে। রেডিস ভোক্তা গোষ্ঠীগুলি ট্র্যাক করে প্রতিটি ক্লায়েন্ট কী দেখেছে যাতে কোনও ডুপ্লিকেট বা অনুপস্থিত অংশগুলি স্থানান্তরের গ্যারান্টি থাকে৷

মূল স্ট্রীম ভোক্তা এইরকম দেখাচ্ছে:

const streamKey = `llm:stream:${sessionId}`;
const groupName = `sse-group-${nanoid()}`;
 
await redis.xgroup(streamKey, {
 type: "CREATE",
 group: groupName,
 id: "0",
});
 
const readStreamMessages = async () => {
 const chunks = (await redis.xreadgroup(
 groupName,
 `consumer-1`,
 streamKey,
 
 // 👇 built-in Redis stream functionality: only send unseen messages
 ">",
 )) as StreamData[];
 
 if (chunks?.length > 0) {
 const [_streamKey, messages] = chunks[0];
 for (const [_messageId, fields] of messages) {
 const rawObj = arrToObj(fields);
 const validatedMessage = validateMessage(rawObj);
 
 if (validatedMessage) {
 controller.enqueue(json(validatedMessage));
 }
 }
 }
};
 
// 👇 initial read
await readStreamMessages();
 
const subscription = redis.subscribe(streamKey);
 
subscription.on("message", async () => {
 // 👇 read every time a new chunk is written to stream
 await readStreamMessages();
});

দ্রষ্টব্য:আমরা প্রতিটি সংযোগে একটি ভোক্তা গ্রুপ তৈরি করছি। এটি খুব ভাল কাজ করে কারণ রেডিস এই অপারেশনটি অদম্যভাবে পরিচালনা করে, যেমন গ্রুপটি আগে থেকে থাকলে কিছুই হবে না।

সম্পূর্ণ কোড

সেশনআইডি জেনারেশন

এখন পর্যন্ত, আমরা ক্লায়েন্টদের কাজ, স্ট্রীম জেনারেটর এবং স্ট্রীম ভোক্তাকে পৃথকভাবে বোঝার জন্য আলাদা আলাদা কোড দেখেছি। এখন, পূর্ণ বাস্তবায়ন দেখে এই টুকরোগুলি কীভাবে একত্রে ফিট করে তা দেখে নেওয়া যাক৷

শুরু করার জন্য, একটি সেশনআইডি তৈরি করা শুধুমাত্র nanoid() ব্যবহার করার চেয়ে আরও স্থিতিস্থাপক হওয়া উচিত . সর্বোপরি, ওয়েবসাইটটি রিফ্রেশ বা বন্ধ হলে কী হবে? পুনঃসংযোগের পরে, আমরা জেনারেট করা সেশনআইডি হারাবো যদি আমরা এটিকে কোথাও সঞ্চয় না করি - যতক্ষণ জেনারেশন চলছে ততক্ষণ পর্যন্ত এটি টিকে থাকতে হবে।

ভাগ্যক্রমে localStorage এটির জন্য উপযুক্ত:

import { customAlphabet } from "nanoid";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useState } from "react";
 
export const useLLMSession = () => {
 const [sessionId, setSessionId] = useState<string>("");
 const router = useRouter();
 const nanoid = customAlphabet("0123456789", 6);
 
 const updateUrlWithSessionId = useCallback(
 (id: string) => {
 const url = new URL(window.location.href);
 url.searchParams.set("sessionId", id);
 router.replace(url.toString(), { scroll: false });
 },
 [router],
 );
 
 useEffect(() => {
 const urlParams = new URLSearchParams(window.location.search);
 const urlSessionId = urlParams.get("sessionId");
 const storedSessionId = localStorage.getItem("llm-session-id");
 
 if (urlSessionId) {
 localStorage.setItem("llm-session-id", urlSessionId);
 setSessionId(urlSessionId);
 } else if (storedSessionId) {
 setSessionId(storedSessionId);
 updateUrlWithSessionId(storedSessionId);
 } else {
 const newSessionId = nanoid();
 localStorage.setItem("llm-session-id", newSessionId);
 setSessionId(newSessionId);
 updateUrlWithSessionId(newSessionId);
 }
 // eslint-disable-next-line react-hooks/exhaustive-deps
 }, []);
 
 const clearSessionId = useCallback(() => {
 localStorage.removeItem("llm-session-id");
 setSessionId("");
 const url = new URL(window.location.href);
 url.searchParams.delete("sessionId");
 router.replace(url.toString(), { scroll: false });
 }, [router]);
 
 const regenerateSessionId = () => {
 const newSessionId = nanoid();
 localStorage.setItem("llm-session-id", newSessionId);
 setSessionId(newSessionId);
 updateUrlWithSessionId(newSessionId);
 return newSessionId;
 };
 
 return {
 sessionId,
 regenerateSessionId,
 clearSessionId,
 };
};

ক্লায়েন্ট

আমরা ইতিমধ্যেই ক্লায়েন্টের দুটি সবচেয়ে গুরুত্বপূর্ণ অংশ দেখেছি:একটি স্ট্রিম শুরু করা এবং একটি স্ট্রিমের সাথে সংযোগ করা৷ একবার আমরা আমাদের API থেকে নিশ্চিত হয়েছি যে জেনারেটর চলছে, আমরা react-queries refetch ব্যবহার করে স্ট্রীমের সাথে সংযোগ করি আমাদের সংযোগ অনুসন্ধানের জন্য ইউটিলিটি।

এখানে কিভাবে সব টুকরা একসাথে মাপসই করা হয়:

app/page.tsx
"use client"
 
import { useLLMSession } from "@/use-llm-session"
import { useMutation, useQuery } from "@tanstack/react-query"
import { FormEvent, useRef, useState, useEffect } from "react"
import {
 MessageType,
 validateMessage,
 type ChunkMessage,
 type MetadataMessage,
 StreamStatus,
} from "@/lib/message-schema"
 
// precondition = stream is ready to read
class PreconditionFailedError extends Error {
 constructor(message: string) {
 super(message)
 this.name = "PreconditionFailedError"
 }
}
 
export default function Home() {
 const { sessionId, regenerateSessionId, clearSessionId } = useLLMSession()
 
 const [prompt, setPrompt] = useState("")
 const [status, setStatus] = useState<
 "idle" | "loading" | "streaming" | "completed" | "error"
 >("idle")
 const [response, setResponse] = useState("")
 const [chunkCount, setChunkCount] = useState(0)
 
 const controller = useRef<AbortController | null>(null)
 const responseRef = useRef<HTMLDivElement>(null)
 const isInitialRequest = useRef(true)
 
 // keep generation in viewport
 useEffect(() => {
 if (responseRef.current) {
 responseRef.current.scrollTop = responseRef.current.scrollHeight
 }
 }, [response])
 
 // start generator
 const { mutate, error, isIdle } = useMutation({
 mutationFn: async (newSessionId: string) => {
 controller.current?.abort()
 isInitialRequest.current = false
 
 await fetch("/api/llm-stream", {
 method: "POST",
 headers: {
 "Content-Type": "application/json",
 },
 body: JSON.stringify({ prompt, sessionId: newSessionId }),
 })
 },
 onSuccess: () => {
 setStatus("streaming")
 refetch()
 },
 })
 
 // connect to running stream
 const { refetch } = useQuery({
 queryKey: ["stream", sessionId],
 queryFn: async () => {
 if (!sessionId) return null
 
 setResponse("")
 setChunkCount(0)
 
 const abortController = new AbortController()
 controller.current = abortController
 
 const res = await fetch(`/api/check-stream?sessionId=${sessionId}`, {
 headers: { "Content-Type": "text/event-stream" },
 signal: controller.current.signal,
 })
 
 if (res.status === 412) {
 // stream is not yet ready, retry connection
 throw new PreconditionFailedError("Stream not ready yet")
 }
 
 if (!res.body) return null
 
 const reader = res.body.pipeThrough(new TextDecoderStream()).getReader()
 
 let streamContent = ""
 
 while (true) {
 const { value, done } = await reader.read()
 
 if (done) break
 
 if (value) {
 const messages = value.split("\n\n").filter(Boolean)
 
 for (const message of messages) {
 if (message.startsWith("data: ")) {
 const data = message.slice(6)
 try {
 const parsedData = JSON.parse(data)
 const validatedMessage = validateMessage(parsedData)
 
 if (!validatedMessage) continue
 
 switch (validatedMessage.type) {
 case MessageType.CHUNK:
 const chunkMessage = validatedMessage as ChunkMessage
 streamContent += chunkMessage.content
 setResponse((prev) => prev + chunkMessage.content)
 setChunkCount((prev) => prev + 1)
 break
 
 case MessageType.METADATA:
 const metadataMessage = validatedMessage as MetadataMessage
 
 if (metadataMessage.status === StreamStatus.COMPLETED) {
 setStatus("completed")
 }
 break
 
 case MessageType.ERROR:
 setStatus("error")
 break
 }
 } catch (e) {
 console.error("Failed to parse message:", e)
 }
 }
 }
 }
 }
 
 return streamContent
 },
 refetchOnWindowFocus: false,
 refetchOnMount: false,
 retry(failureCount, error) {
 if (isInitialRequest.current === true) return false
 
 if (error instanceof PreconditionFailedError) {
 return failureCount < 10
 }
 
 return false
 },
 })
 
 const handleSubmit = async (e: FormEvent) => {
 e.preventDefault()
 setStatus("loading")
 const newSessionId = regenerateSessionId()
 mutate(newSessionId)
 }
 
 const handleReset = () => {
 controller.current?.abort()
 clearSessionId()
 setPrompt("")
 setResponse("")
 setChunkCount(0)
 setStatus("idle")
 }
 
 return (
 <main className="flex min-h-screen flex-col items-center justify-between p-12 sm:p-24">
 <div className="z-10 max-w-5xl w-full items-center justify-between font-mono text-sm">
 <h1 className="text-4xl tracking-tight font-bold mb-8 text-center">
 Resumable LLM Stream
 </h1>
 
 <form onSubmit={handleSubmit} className="mb-8">
 <div className="mb-4">
 <label htmlFor="prompt" className="block text-sm font-medium mb-2">
 Enter your prompt:
 </label>
 <textarea
 autoFocus
 id="prompt"
 value={prompt}
 onChange={(e) => setPrompt(e.target.value)}
 className="w-full p-2 border border-zinc-700 rounded-md min-h-[100px] focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-transparent transition-all duration-200"
 placeholder="Ask the AI something..."
 disabled={status === "loading" || status === "streaming"}
 />
 </div>
 
 <div className="flex gap-4">
 <button
 type="submit"
 disabled={status === "loading" || status === "streaming"}
 className="px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700 disabled:bg-gray-400"
 >
 {status === "loading"
 ? "Starting..."
 : status === "streaming"
 ? "Streaming..."
 : "Generate Response"}
 </button>
 <button
 type="button"
 onClick={handleReset}
 className="px-4 py-2 bg-zinc-600 text-white rounded-md hover:bg-zinc-700"
 >
 Reset
 </button>
 </div>
 </form>
 
 <div className="mt-8">
 <h2 className="text-xl tracking-tight font-semibold mb-2">
 Response:
 </h2>
 {status === "error" ? (
 <div className="p-4 bg-red-100 border border-red-300 rounded-md text-red-800">
 <p className="font-bold">Error:</p>
 <p>{error?.message}</p>
 </div>
 ) : status === "idle" && !response ? (
 <p className="text-gray-500">
 Enter a prompt and click "Generate Response" to see the AI's
 response.
 </p>
 ) : (
 <div
 ref={responseRef}
 className="flex flex-col h-96 overflow-y-auto p-4 bg-zinc-900 text-zinc-200 border border-zinc-800 rounded-md whitespace-pre-wrap [&::-webkit-scrollbar]:w-2 [&::-webkit-scrollbar-thumb]:bg-zinc-700 [&::-webkit-scrollbar-track]:bg-zinc-800"
 >
 <div>{response || "Loading..."}</div>
 </div>
 )}
 
 {(status === "streaming" || status === "completed") && (
 <div className="mt-2 text-sm text-gray-500">
 <p>Session ID: {sessionId}</p>
 <p>Status: {status}</p>
 <p>Chunks received: {chunkCount}</p>
 <p>
 Connection: {status === "streaming" ? "Active SSE" : "Closed"}
 </p>
 </div>
 )}
 </div>
 </div>
 </main>
 )
}

স্ট্রিম জেনারেটর

এখানে স্ট্রিম জেনারেটরের জন্য সম্পূর্ণ কোড। যদি একটি LLM জেনারেশন যেকোন সময়ে ব্যর্থ হয়, সর্বোচ্চ নির্ভরযোগ্যতার জন্য আপস্ট্যাশ ওয়ার্কফ্লো ব্যবহার করে স্বয়ংক্রিয়ভাবে পুনরায় চেষ্টা করা হয়:

api/llm-stream/route.ts
import {
 MessageType,
 StreamStatus,
 type ChunkMessage,
 type MetadataMessage,
} from "@/lib/message-schema";
import { redis } from "@/utils";
import { openai } from "@ai-sdk/openai";
import { serve } from "@upstash/workflow/nextjs";
import { streamText } from "ai";
 
interface LLMStreamResponse {
 success: boolean;
 sessionId: string;
 totalChunks: number;
 fullContent: string;
}
 
export const { POST } = serve(async (context) => {
 const { prompt, sessionId } = context.requestPayload as {
 prompt?: string;
 sessionId?: string;
 };
 
 if (!prompt || !sessionId) {
 throw new Error("Prompt and sessionId are required");
 }
 
 const streamKey = `llm:stream:${sessionId}`;
 
 await context.run("mark-stream-start", async () => {
 const metadataMessage: MetadataMessage = {
 type: MessageType.METADATA,
 status: StreamStatus.STARTED,
 completedAt: new Date().toISOString(),
 totalChunks: 0,
 fullContent: "",
 };
 
 await redis.xadd(streamKey, "*", metadataMessage);
 await redis.publish(streamKey, { type: MessageType.METADATA });
 });
 
 const res = await context.run("generate-llm-response", async () => {
 const result = await new Promise<LLMStreamResponse>(
 async (resolve, reject) => {
 let fullContent = "";
 let chunkIndex = 0;
 
 const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt,
 onError: (err) => reject(err),
 onFinish: async () => {
 resolve({
 success: true,
 sessionId,
 totalChunks: chunkIndex,
 fullContent,
 });
 },
 });
 
 for await (const chunk of textStream) {
 if (chunk) {
 fullContent += chunk;
 chunkIndex++;
 
 const chunkMessage: ChunkMessage = {
 type: MessageType.CHUNK,
 content: chunk,
 };
 
 await redis.xadd(streamKey, "*", chunkMessage);
 await redis.publish(streamKey, { type: MessageType.CHUNK });
 }
 }
 },
 );
 
 return result;
 });
 
 await context.run("mark-stream-end", async () => {
 const metadataMessage: MetadataMessage = {
 type: MessageType.METADATA,
 status: StreamStatus.COMPLETED,
 completedAt: new Date().toISOString(),
 totalChunks: res.totalChunks,
 fullContent: res.fullContent,
 };
 
 await redis.xadd(streamKey, "*", metadataMessage);
 await redis.publish(streamKey, { type: MessageType.METADATA });
 });
});

সম্পূর্ণ টাইপ-নিরাপত্তার জন্য, আমি জোড:

-এ সমস্ত বার্তা স্কিমাও লিখেছি message-schema.ts
import { z } from "zod";
 
export const MessageType = {
 CHUNK: "chunk",
 METADATA: "metadata",
 EVENT: "event",
 ERROR: "error",
} as const;
 
export const StreamStatus = {
 STARTED: "started",
 STREAMING: "streaming",
 COMPLETED: "completed",
 ERROR: "error",
} as const;
 
export const baseMessageSchema = z.object({
 type: z.enum([
 MessageType.CHUNK,
 MessageType.METADATA,
 MessageType.EVENT,
 MessageType.ERROR,
 ]),
});
 
export const chunkMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.CHUNK),
 content: z.string(),
});
 
export const metadataMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.METADATA),
 status: z.enum([
 StreamStatus.STARTED,
 StreamStatus.STREAMING,
 StreamStatus.COMPLETED,
 StreamStatus.ERROR,
 ]),
 completedAt: z.string().optional(),
 totalChunks: z.number().optional(),
 fullContent: z.string().optional(),
 error: z.string().optional(),
});
 
export const eventMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.EVENT),
});
 
export const errorMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.ERROR),
 error: z.string(),
});
 
export const messageSchema = z.discriminatedUnion("type", [
 chunkMessageSchema,
 metadataMessageSchema,
 eventMessageSchema,
 errorMessageSchema,
]);
 
export type Message = z.infer<typeof messageSchema>;
export type ChunkMessage = z.infer<typeof chunkMessageSchema>;
export type MetadataMessage = z.infer<typeof metadataMessageSchema>;
export type EventMessage = z.infer<typeof eventMessageSchema>;
export type ErrorMessage = z.infer<typeof errorMessageSchema>;
 
export const validateMessage = (data: unknown): Message | null => {
 const result = messageSchema.safeParse(data);
 return result.success ? result.data : null;
};

স্ট্রিম কনজিউমার

সবশেষে, এর পূর্ণাঙ্গ ভোক্তা বাস্তবায়নের দিকে নজর দেওয়া যাক। এটি একটি পরিবর্তনযোগ্য সংযোগ যা একটি ক্লায়েন্ট সংযোগ করলে স্বয়ংক্রিয়ভাবে সমস্ত অদেখা অংশগুলিকে পাঠায়:

api/check-stream/route.ts
import { redis } from "@/utils"
import { nanoid } from "nanoid"
import { NextRequest, NextResponse } from "next/server"
import {
 validateMessage,
 MessageType,
 type ErrorMessage,
} from "@/lib/message-schema"
 
export const dynamic = "force-dynamic"
export const maxDuration = 60
export const runtime = "nodejs"
 
type StreamField = string
type StreamMessage = [string, StreamField[]]
type StreamData = [string, StreamMessage[]]
 
const arrToObj = (arr: StreamField[]) => {
 const obj: Record<string, string> = {}
 
 for (let i = 0; i < arr.length; i += 2) {
 obj[arr[i]] = arr[i + 1]
 }
 
 return obj
}
 
const json = (data: Record<string, unknown>) => {
 return new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`)
}
 
export async function GET(req: NextRequest) {
 const { searchParams } = new URL(req.url)
 const sessionId = searchParams.get("sessionId")
 
 if (!sessionId) {
 return NextResponse.json(
 { error: "Stream key is required" },
 { status: 400 }
 )
 }
 
 const streamKey = `llm:stream:${sessionId}`
 const groupName = `sse-group-${nanoid()}`
 
 const keyExists = await redis.exists(streamKey)
 
 if (!keyExists) {
 return NextResponse.json(
 { error: "Stream does not (yet) exist" },
 { status: 412 }
 )
 }
 
 try {
 await redis.xgroup(streamKey, {
 type: "CREATE",
 group: groupName,
 id: "0",
 })
 } catch (_err) {}
 
 const response = new Response(
 new ReadableStream({
 async start(controller) {
 const readStreamMessages = async () => {
 const chunks = (await redis.xreadgroup(
 groupName,
 `consumer-1`,
 streamKey,
 ">"
 )) as StreamData[]
 
 if (chunks?.length > 0) {
 const [_streamKey, messages] = chunks[0]
 for (const [_messageId, fields] of messages) {
 const rawObj = arrToObj(fields)
 const validatedMessage = validateMessage(rawObj)
 
 if (validatedMessage) {
 controller.enqueue(json(validatedMessage))
 }
 }
 }
 }
 
 await readStreamMessages()
 
 const subscription = redis.subscribe(streamKey)
 
 subscription.on("message", async () => {
 await readStreamMessages()
 })
 
 subscription.on("error", (error) => {
 console.error(`SSE subscription error on ${streamKey}:`, error)
 
 const errorMessage: ErrorMessage = {
 type: MessageType.ERROR,
 error: error.message,
 }
 
 controller.enqueue(json(errorMessage))
 controller.close()
 })
 
 req.signal.addEventListener("abort", () => {
 console.log("Client disconnected, cleaning up subscription")
 subscription.unsubscribe()
 controller.close()
 })
 },
 }),
 {
 headers: {
 "Content-Type": "text/event-stream",
 "Cache-Control": "no-cache, no-transform",
 Connection: "keep-alive",
 },
 }
 )
 
 return response
}

দ্রুত সারাংশ এবং চূড়ান্ত শব্দ

আমরা এইমাত্র একটি অত্যন্ত শক্তিশালী LLM স্ট্রীম তৈরি করেছি যা নেটওয়ার্ক বাধা, পৃষ্ঠা রিফ্রেশ এবং এমনকি সম্পূর্ণ সংযোগ বিচ্ছিন্ন করতে পারে। আমরা যা করেছি তা এখানে:

  • ডেলিভারি থেকে ডিকপল জেনারেশন: ক্লায়েন্ট সংযোগ থেকে LLM জেনারেশনকে আলাদা করে, ক্লায়েন্ট সমস্যা নির্বিশেষে বিষয়বস্তু তৈরি চলতে থাকে।

  • রেডিস স্ট্রীম ব্যবহার করে স্থায়ী সঞ্চয়স্থান: আমরা রেডিস স্ট্রীমগুলিকে একটি ক্রমাগত বার্তা ব্রোকার হিসাবে ব্যবহার করছি LLM প্রতিক্রিয়ার প্রতিটি অংশ যেমন এটি তৈরি করা হয়েছে সঞ্চয় করতে৷

  • Redis Pub/Sub-এর সাথে রিয়েল-টাইম আপডেট: আমরা Redis Pub/Sub ব্যবহার করে একটি নোটিফিকেশন সিস্টেম তৈরি করেছি যাতে নতুন খণ্ডগুলি উপলব্ধ হলে স্ট্রিম গ্রাহকদের জানানো যায়৷

  • স্বয়ংক্রিয় পুনঃসংযোগ: ক্লায়েন্ট যেকোন সময় পুনরায় সংযোগ করতে পারে এবং স্বয়ংক্রিয়ভাবে সমস্ত বিষয়বস্তু গ্রহণ করে, ডুপ্লিকেট বা অনুপস্থিত অংশ ছাড়াই নিশ্চিত। এতে সংযোগ বিচ্ছিন্ন হওয়ার সময় তৈরি হওয়া সামগ্রী অন্তর্ভুক্ত রয়েছে৷

  • সেশন ম্যানেজমেন্ট: আমরা একটি সেশন সিস্টেম তৈরি করেছি যা ব্যবহারকারীদের একই সময়ে একাধিক ডিভাইসে স্ট্রিম দেখতে দেয়৷

নীচের লাইন, আমরা এখন আমাদের ব্যবহারকারীদের একটি ব্যতিক্রমী ব্যবহারকারীর অভিজ্ঞতা (UX) প্রদান করছি। আমি সত্যিই এই পদ্ধতির সুপারিশ করছি, বিশেষ করে যদি আপনি একটি LLM চ্যাট পরিষেবার মতো কিছু তৈরি করেন।

পড়ার জন্য চিয়ার্স! আপনার যদি কোনো প্রতিক্রিয়া থাকে বা Upstash-এ অতিথি লেখক হতে চান, তাহলে josh@upstash.com এ যোগাযোগ করুন 🙌


  1. কীভাবে জাভাস্ক্রিপ্ট অ্যারেতে একটি উপাদান অনুসন্ধান করবেন?

  2. রুবিতে ব্যতিক্রমগুলিতে প্রসঙ্গ ডেটা কীভাবে যুক্ত করবেন

  3. C++ এ একটি বাইনারি ট্রির সম্পূর্ণতা পরীক্ষা করুন

  4. পাইথন ম্যাটপ্লটলিবের একটি চিত্র থেকে গ্রিড লাইনগুলি কীভাবে সরিয়ে ফেলা যায়?