কম্পিউটার টিউটোরিয়াল

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা

স্টিফেন সানভো

দ্বারা

একটি কার্যকরী পূর্ণ-স্ট্যাক অ্যাপ্লিকেশন তৈরি করার জন্য, চিন্তা করার জন্য অনেকগুলি চলমান অংশ রয়েছে৷ এবং আপনাকে অনেক সিদ্ধান্ত নিতে হবে যা আপনার অ্যাপের সাফল্যের জন্য গুরুত্বপূর্ণ হবে।

উদাহরণস্বরূপ, আপনি কোন ভাষা ব্যবহার করবেন এবং আপনি কোন প্ল্যাটফর্মে স্থাপন করবেন? আপনি একটি সার্ভারে একটি কন্টেইনারাইজড সফ্টওয়্যার স্থাপন করতে যাচ্ছেন, বা ব্যাকএন্ড পরিচালনা করতে সার্ভারহীন ফাংশন ব্যবহার করতে যাচ্ছেন? আপনি কি আপনার অ্যাপ্লিকেশনের জটিল অংশগুলি যেমন প্রমাণীকরণ বা অর্থপ্রদানগুলি পরিচালনা করতে তৃতীয় পক্ষের API ব্যবহার করার পরিকল্পনা করছেন? আপনি ডেটা কোথায় সংরক্ষণ করবেন?

এই সব ছাড়াও, আপনাকে ব্যবহারকারীর ইন্টারফেস, আপনার অ্যাপ্লিকেশনটির নকশা এবং ব্যবহারযোগ্যতা এবং আরও অনেক কিছু সম্পর্কে চিন্তা করতে হবে।

এই কারণেই জটিল বৃহৎ অ্যাপ্লিকেশনগুলির জন্য অ্যাপটি তৈরি করতে একটি বহুমুখী উন্নয়ন দলের সহযোগিতা প্রয়োজন৷

সম্পূর্ণ স্ট্যাক অ্যাপ্লিকেশনগুলি কীভাবে বিকাশ করা যায় তা শেখার সর্বোত্তম উপায়গুলির মধ্যে একটি হল এমন প্রকল্পগুলি তৈরি করা যা শেষ থেকে শেষ বিকাশ প্রক্রিয়াকে কভার করে। আপনি আর্কিটেকচার ডিজাইন, API পরিষেবাগুলি বিকাশ, ব্যবহারকারী ইন্টারফেস বিকাশ এবং অবশেষে আপনার অ্যাপ্লিকেশন স্থাপনের মধ্য দিয়ে যাবেন।

তাই এই টিউটোরিয়ালটি আপনাকে একটি AI চ্যাটবট তৈরির প্রক্রিয়ার মধ্যে নিয়ে যাবে যাতে আপনি এই ধারণাগুলি গভীরভাবে শিখতে পারেন৷

আমরা কভার করব এমন কিছু বিষয় অন্তর্ভুক্ত:

  • কিভাবে পাইথন, ফাস্টএপিআই, এবং ওয়েবসকেট দিয়ে API তৈরি করবেন
  • কিভাবে Redis এর সাথে রিয়েল-টাইম সিস্টেম তৈরি করবেন
  • কিভাবে প্রতিক্রিয়া সহ একটি চ্যাট ইউজার ইন্টারফেস তৈরি করবেন

গুরুত্বপূর্ণ নোট: এটি একটি মধ্যবর্তী পূর্ণ স্ট্যাক সফ্টওয়্যার উন্নয়ন প্রকল্প যার জন্য কিছু মৌলিক পাইথন এবং জাভাস্ক্রিপ্ট জ্ঞান প্রয়োজন।

আপনি সম্পূর্ণ অ্যাপ্লিকেশন কোড করতে না চাইলে আপনার জন্য গুরুত্বপূর্ণ যে ফেজটি আপনি সহজেই নির্বাচন করতে পারেন তা নিশ্চিত করার জন্য আমি সাবধানে প্রকল্পটিকে ভাগে ভাগ করেছি৷

আপনি এখানে My Github-এ সম্পূর্ণ সংগ্রহস্থল ডাউনলোড করতে পারেন।

সূচিপত্র

বিভাগ 1

  • অ্যাপ্লিকেশন আর্কিটেকচার
  • কিভাবে ডেভেলপমেন্ট এনভায়রনমেন্ট সেট আপ করবেন

    বিভাগ 2

  • কিভাবে পাইথন, ফাস্টএপিআই, এবং ওয়েবসকেটের সাথে একটি চ্যাট সার্ভার তৈরি করবেন
    • কিভাবে পাইথন এনভায়রনমেন্ট সেট আপ করবেন
    • FastAPI সার্ভার সেটআপ
    • কিভাবে এপিআই-তে রুট যোগ করবেন
    • কিভাবে UUID দিয়ে একটি চ্যাট সেশন টোকেন তৈরি করবেন
    • কিভাবে পোস্টম্যানের সাথে API পরীক্ষা করবেন
    • ওয়েবসকেট এবং সংযোগ ব্যবস্থাপক
    • FastAPI-তে ডিপেনডেন্সি ইনজেকশন

      ধারা 3

  • কিভাবে Redis
      দিয়ে রিয়েল-টাইম সিস্টেম তৈরি করবেন
    • রিডিস এবং ডিস্ট্রিবিউটেড মেসেজিং সারি
    • কিভাবে রেডিস ক্লায়েন্টের সাথে পাইথনে একটি রেডিস ক্লাস্টারের সাথে সংযোগ করবেন
    • কিভাবে রেডিস স্ট্রিমগুলির সাথে কাজ করবেন
    • চ্যাট ডেটা কীভাবে মডেল করবেন
    • কিভাবে Redis JSON এর সাথে কাজ করবেন
    • টোকেন নির্ভরতা কিভাবে আপডেট করবেন

      ধারা 4

  • এআই মডেলের সাথে চ্যাটবটগুলিতে কীভাবে বুদ্ধিমত্তা যুক্ত করবেন
    • হাগিংফেস দিয়ে কিভাবে শুরু করবেন
    • ভাষা মডেলের সাথে কিভাবে ইন্টারঅ্যাক্ট করবেন
    • এআই মডেলের জন্য কীভাবে স্বল্পমেয়াদী মেমরির অনুকরণ করা যায়
    • মেসেজ কিউ থেকে স্ট্রীম কনজিউমার এবং রিয়েল-টাইমডেটা টানুন
    • এআই রেসপন্স দিয়ে চ্যাট ক্লায়েন্টকে কীভাবে আপডেট করবেন
    • টোকেন রিফ্রেশ করুন
    • কিভাবে পোস্টম্যানে একাধিক ক্লায়েন্টের সাথে চ্যাট পরীক্ষা করবেন

অ্যাপ্লিকেশন আর্কিটেকচার

একটি সমাধান আর্কিটেকচার স্কেচ করা আপনাকে আপনার অ্যাপ্লিকেশনের একটি উচ্চ-স্তরের ওভারভিউ দেয়, আপনি যে সরঞ্জামগুলি ব্যবহার করতে চান এবং কীভাবে উপাদানগুলি একে অপরের সাথে যোগাযোগ করবে।

আমি draw.io:

ব্যবহার করে নিচে একটি সাধারণ আর্কিটেকচার আঁকলাম

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা ফুলস্ট্যাক চ্যাটবট আর্কিটেকচার

চলুন আরো বিস্তারিতভাবে স্থাপত্যের বিভিন্ন অংশে যাওয়া যাক:

ক্লায়েন্ট/ইউজার ইন্টারফেস

আমরা ইউজার ইন্টারফেস তৈরি করতে রিঅ্যাক্ট সংস্করণ 18 ব্যবহার করব। চ্যাট UI WebSockets এর মাধ্যমে ব্যাকএন্ডের সাথে যোগাযোগ করবে।

GPT-J-6B এবং Huggingface Inference API

GPT-J-6B হল একটি জেনারেটিভ ল্যাঙ্গুয়েজ মডেল যাকে 6 বিলিয়ন প্যারামিটারের সাথে প্রশিক্ষিত করা হয়েছিল এবং কিছু কাজে OpenAI-এর GPT-3 এর সাথে ঘনিষ্ঠভাবে কাজ করে।

আমি GPT-J-6B ব্যবহার করা বেছে নিয়েছি কারণ এটি একটি ওপেন-সোর্স মডেল এবং সাধারণ ব্যবহারের ক্ষেত্রে প্রদত্ত টোকেনের প্রয়োজন নেই।

Huggingface আমাদের এই মডেলের সাথে সংযোগ করার জন্য একটি অন-ডিমান্ড API প্রদান করে। আপনি GPT-J-6B এবং Hugging Face Inference API সম্পর্কে আরও পড়তে পারেন৷

রেডিস

যখন আমরা জিপিটি-তে প্রম্পট পাঠাই, তখন আমাদের প্রম্পটগুলি সংরক্ষণ করার এবং সহজেই প্রতিক্রিয়া পুনরুদ্ধার করার একটি উপায় প্রয়োজন৷ আমরা চ্যাট ডেটা সঞ্চয় করার জন্য Redis JSON ব্যবহার করব এবং huggingface inference API-এর সাথে রিয়েল-টাইম যোগাযোগ পরিচালনার জন্য Redis স্ট্রিমগুলিও ব্যবহার করব।

Redis হল একটি ইন-মেমরি কী-ভ্যালু স্টোর যা JSON-এর মতো ডেটা অতি-দ্রুত আনয়ন এবং সংরক্ষণ করতে সক্ষম করে। এই টিউটোরিয়ালের জন্য, আমরা পরীক্ষার উদ্দেশ্যে Redis এন্টারপ্রাইজ দ্বারা প্রদত্ত একটি পরিচালিত ফ্রি রেডিস স্টোরেজ ব্যবহার করব৷

ওয়েব সকেট এবং চ্যাট API

রিয়েল-টাইমে ক্লায়েন্ট এবং সার্ভারের মধ্যে বার্তা পাঠাতে, আমাদের একটি সকেট সংযোগ খুলতে হবে। কারণ ক্লায়েন্ট এবং সার্ভারের মধ্যে রিয়েল-টাইম দ্বি-দিকনির্দেশক যোগাযোগ নিশ্চিত করার জন্য একটি HTTP সংযোগ যথেষ্ট হবে না।

আমরা চ্যাট সার্ভারের জন্য FastAPI ব্যবহার করব, কারণ এটি আমাদের ব্যবহারের জন্য একটি দ্রুত এবং আধুনিক পাইথন সার্ভার প্রদান করে। WebSockets সম্পর্কে আরও জানতে FastAPI ডকুমেন্টেশন দেখুন।

কিভাবে ডেভেলপমেন্ট এনভায়রনমেন্ট সেট আপ করবেন

আপনি এই অ্যাপটি তৈরি করতে আপনার কাঙ্খিত OS ব্যবহার করতে পারেন - আমি বর্তমানে MacOS এবং Visual Studio কোড ব্যবহার করছি। শুধু নিশ্চিত করুন যে আপনি Python এবং NodeJs ইনস্টল করেছেন।

প্রজেক্ট স্ট্রাকচার সেট আপ করতে, fullstack-ai-chatbot নামে একটি ফোল্ডার তৈরি করুন . তারপরে client নামে প্রোজেক্টের মধ্যে দুটি ফোল্ডার তৈরি করুন এবং server . সার্ভার ব্যাকএন্ডের কোড ধারণ করবে, যখন ক্লায়েন্ট ফ্রন্টএন্ডের জন্য কোড ধারণ করবে।

প্রকল্প ডিরেক্টরির মধ্যে পরবর্তী, "git init" কমান্ড ব্যবহার করে প্রকল্প ফোল্ডারের রুটের মধ্যে একটি গিট সংগ্রহস্থল শুরু করুন। তারপর "touch .gitignore" ব্যবহার করে একটি .gitignore ফাইল তৈরি করুন:

git init
touch .gitignore

পরবর্তী বিভাগে, আমরা FastAPI এবং Python ব্যবহার করে আমাদের চ্যাট ওয়েব সার্ভার তৈরি করব।

কিভাবে পাইথন, ফাস্টএপিআই এবং ওয়েবসকেট দিয়ে একটি চ্যাট সার্ভার তৈরি করবেন

এই বিভাগে, আমরা ব্যবহারকারীর সাথে যোগাযোগ করার জন্য FastAPI ব্যবহার করে চ্যাট সার্ভার তৈরি করব। ক্লায়েন্ট এবং সার্ভারের মধ্যে দ্বি-দিকনির্দেশক যোগাযোগ নিশ্চিত করতে আমরা WebSockets ব্যবহার করব যাতে আমরা ব্যবহারকারীকে রিয়েল-টাইমে প্রতিক্রিয়া পাঠাতে পারি।

কিভাবে পাইথন এনভায়রনমেন্ট সেট আপ করবেন

আমাদের সার্ভার শুরু করতে, আমাদের পাইথন পরিবেশ সেট আপ করতে হবে। VS কোডের মধ্যে প্রজেক্ট ফোল্ডারটি খুলুন এবং টার্মিনাল খুলুন।

প্রজেক্ট রুট থেকে, সার্ভার ডিরেক্টরিতে cd করুন এবং python3.8 -m venv env চালান . এটি একটি ভার্চুয়াল পরিবেশ তৈরি করবে৷ আমাদের পাইথন প্রজেক্টের জন্য, যার নাম হবে env . ভার্চুয়াল পরিবেশ সক্রিয় করতে, source env/bin/activate চালান

এরপর, আপনার পাইথন পরিবেশে কয়েকটি লাইব্রেরি ইনস্টল করুন।

pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis

এরপর touch .env চালিয়ে একটি পরিবেশ ফাইল তৈরি করুন টার্মিনালে আমরা .env-এর মধ্যে আমাদের অ্যাপ ভেরিয়েবল এবং গোপন ভেরিয়েবল সংজ্ঞায়িত করব ফাইল

আপনার অ্যাপ এনভায়রনমেন্ট ভেরিয়েবল যোগ করুন এবং এটিকে এভাবে "ডেভেলপমেন্ট" এ সেট করুন:export APP_ENV=development . এরপর, আমরা একটি ফাস্টএপিআই সার্ভারের সাথে একটি ডেভেলপমেন্ট সার্ভার সেট আপ করব।

FastAPI সার্ভার সেটআপ

সার্ভার ডিরেক্টরির মূলে, main.py নামে একটি নতুন ফাইল তৈরি করুন তারপর ডেভেলপমেন্ট সেভারের জন্য নিচের কোডটি পেস্ট করুন:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

প্রথমে আমরা import FastAPI এবং এটিকে api হিসাবে আরম্ভ করুন . তারপর আমরা import load_dotenv python-dotenv থেকে লাইব্রেরি, এবং .env থেকে ভেরিয়েবল লোড করার জন্য এটিকে আরম্ভ করুন ফাইল,

তারপরে আমরা API পরীক্ষা করার জন্য একটি সাধারণ পরীক্ষা রুট তৈরি করি। পরীক্ষার রুট একটি সাধারণ JSON প্রতিক্রিয়া প্রদান করবে যা আমাদের বলে যে API অনলাইন আছে।

অবশেষে, আমরা uvicorn.run ব্যবহার করে ডেভেলপমেন্ট সার্ভার সেট আপ করি এবং প্রয়োজনীয় যুক্তি প্রদান. API 3500 পোর্টে চলবে .

অবশেষে, সার্ভারটি টার্মিনালে python main.py দিয়ে চালান . একবার আপনি Application startup complete দেখতে পাবেন টার্মিনালে, আপনার ব্রাউজারে URL http://localhost:3500/test-এ নেভিগেট করুন এবং আপনার এই মত একটি ওয়েব পৃষ্ঠা পাওয়া উচিত:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা API টেস্ট পৃষ্ঠা

কিভাবে এপিআই-তে রুট যোগ করবেন

এই বিভাগে, আমরা আমাদের API এ রুট যোগ করব। src নামে একটি নতুন ফোল্ডার তৈরি করুন৷ . এটি সেই ডিরেক্টরি যেখানে আমাদের সমস্ত API কোড থাকবে।

routes নামে একটি সাবফোল্ডার তৈরি করুন , ফোল্ডারে cd, chat.py নামে একটি নতুন ফাইল তৈরি করুন এবং তারপর নিচের কোড যোগ করুন:

import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
 return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
 return None

আমরা তিনটি শেষ পয়েন্ট তৈরি করেছি:

  • /token চ্যাট সেশনে অ্যাক্সেসের জন্য ব্যবহারকারীকে একটি সেশন টোকেন জারি করবে। যেহেতু চ্যাট অ্যাপটি সর্বজনীনভাবে খোলা থাকবে, তাই আমরা প্রমাণীকরণ নিয়ে চিন্তা করতে চাই না এবং এটিকে সহজ রাখতে চাই না - তবে আমাদের এখনও প্রতিটি অনন্য ব্যবহারকারী সেশন সনাক্ত করার জন্য একটি উপায় প্রয়োজন৷
  • /refresh_token সংযোগটি হারিয়ে গেলে ব্যবহারকারীর জন্য সেশন ইতিহাস পাবেন, যতক্ষণ না টোকেনটি সক্রিয় থাকে এবং মেয়াদ শেষ না হয়।
  • /chat ক্লায়েন্ট এবং সার্ভারের মধ্যে বার্তা পাঠাতে একটি WebSocket খুলবে৷

এরপরে, চ্যাট রুটটিকে আমাদের প্রধান API এর সাথে সংযুক্ত করুন। প্রথমে আমাদের import chat from src.chat করতে হবে আমাদের main.py এর মধ্যে ফাইল তারপর আমরা আক্ষরিকভাবে একটি include_router কল করে রাউটার অন্তর্ভুক্ত করব আরম্ভ করা FastAPI-এ পদ্ধতি যুক্তি হিসাবে ক্লাস এবং পাসিং চ্যাট.

আপনার api.py আপডেট করুন নীচে দেখানো কোড:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

কিভাবে UUID দিয়ে একটি চ্যাট সেশন টোকেন তৈরি করবেন

একটি ব্যবহারকারী টোকেন তৈরি করতে আমরা uuid4 ব্যবহার করব আমাদের চ্যাট এন্ডপয়েন্টের জন্য গতিশীল রুট তৈরি করতে। যেহেতু এটি একটি সর্বজনীনভাবে উপলব্ধ এন্ডপয়েন্ট, তাই আমাদের JWT এবং প্রমাণীকরণ সম্পর্কে বিশদে যেতে হবে না।

আপনি যদি uuid ইনস্টল না করে থাকেন প্রাথমিকভাবে, pip install uuid চালান . এরপর chat.py-এ, UUID আমদানি করুন এবং /token আপডেট করুন নিচের কোড সহ রুট করুন:


from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 token = str(uuid.uuid4())
 data = {"name": name, "token": token}
 return data

উপরের কোডে, ক্লায়েন্ট তাদের নাম প্রদান করে, যা প্রয়োজন। নামের ক্ষেত্রটি খালি নেই তা নিশ্চিত করার জন্য আমরা দ্রুত পরীক্ষা করি, তারপর uuid4 ব্যবহার করে একটি টোকেন তৈরি করি।

সেশন ডেটা হল নাম এবং টোকেনের জন্য একটি সহজ অভিধান। শেষ পর্যন্ত আমাদের এই সেশনের ডেটা বজায় রাখতে হবে এবং একটি টাইমআউট সেট করতে হবে, কিন্তু আপাতত আমরা এটি ক্লায়েন্টের কাছে ফেরত দিই৷

পোস্টম্যানের সাথে এপিআই কীভাবে পরীক্ষা করবেন

যেহেতু আমরা একটি ওয়েবসকেট এন্ডপয়েন্ট পরীক্ষা করব, তাই আমাদের পোস্টম্যানের মতো একটি টুল ব্যবহার করতে হবে যা এটির অনুমতি দেয় (যেহেতু ফাস্টএপিআই-তে ডিফল্ট সোয়াগার ডক্স ওয়েবসকেট সমর্থন করে না)।

পোস্টম্যানে, আপনার উন্নয়ন পরিবেশের জন্য একটি সংগ্রহ তৈরি করুন এবং localhost:3500/token-এ একটি POST অনুরোধ পাঠান একটি ক্যোয়ারী প্যারামিটার হিসাবে নাম নির্দিষ্ট করা এবং এটি একটি মান পাস। আপনি নীচে দেখানো হিসাবে একটি প্রতিক্রিয়া পেতে হবে:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা টোকেন জেনারেটর পোস্টম্যান

ওয়েবসকেট এবং সংযোগ ব্যবস্থাপক

src রুটে, socket নামে একটি নতুন ফোল্ডার তৈরি করুন এবং connection.py নামে একটি ফাইল যোগ করুন . এই ফাইলটিতে, আমরা আমাদের ওয়েবসকেটের সংযোগগুলি নিয়ন্ত্রণ করে এমন ক্লাস এবং সংযোগ এবং সংযোগ বিচ্ছিন্ন করার সমস্ত সহায়ক পদ্ধতিগুলিকে সংজ্ঞায়িত করব।

connection.py-এ নীচের কোড যোগ করুন:


from fastapi import WebSocket
class ConnectionManager:
 def __init__(self):
 self.active_connections: List[WebSocket] = []
 async def connect(self, websocket: WebSocket):
 await websocket.accept()
 self.active_connections.append(websocket)
 def disconnect(self, websocket: WebSocket):
 self.active_connections.remove(websocket)
 async def send_personal_message(self, message: str, websocket: WebSocket):
 await websocket.send_text(message)

ConnectionManager ক্লাস একটি active_connections দিয়ে আরম্ভ করা হয় অ্যাট্রিবিউট যা সক্রিয় সংযোগের একটি তালিকা।

তারপর অ্যাসিঙ্ক্রোনাস connect পদ্ধতি একটি WebSocket গ্রহণ করবে এবং এটি সক্রিয় সংযোগের তালিকায় যোগ করুন, যখন disconnect পদ্ধতিটি Websocket মুছে ফেলবে সক্রিয় সংযোগের তালিকা থেকে।

সবশেষে, send_personal_message পদ্ধতিটি একটি বার্তা এবং Websocket গ্রহণ করবে আমরা বার্তাটি পাঠাতে চাই এবং অ্যাসিঙ্ক্রোনাসভাবে বার্তা পাঠাতে চাই৷

WebSockets একটি খুব বিস্তৃত বিষয় এবং আমরা শুধুমাত্র এখানে পৃষ্ঠ স্ক্র্যাপ. যদিও এটি একাধিক সংযোগ তৈরি করতে এবং অসিঙ্ক্রোনাসভাবে সেই সংযোগগুলিতে বার্তা পরিচালনা করার জন্য যথেষ্ট হওয়া উচিত।

আপনি FastAPI ওয়েবসকেট এবং সকেট প্রোগ্রামিং সম্পর্কে আরও পড়তে পারেন।

ConnectionManager ব্যবহার করতে , আমদানি করুন এবং এটিকে src.routes.chat.py-এর মধ্যে শুরু করুন , এবং /chat আপডেট করুন নিচের কোড সহ WebSocket রুট:

from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

websocket_endpoint-এ ফাংশন, যা একটি ওয়েবসকেট নেয়, আমরা কানেকশন ম্যানেজারে নতুন ওয়েবসকেট যোগ করি এবং একটি while True চালাই লুপ, সকেট খোলা থাকে তা নিশ্চিত করতে। সকেট সংযোগ বিচ্ছিন্ন হয়ে গেলে ছাড়া।

সংযোগ খোলা থাকাকালীন, আমরা websocket.receive_test() দিয়ে ক্লায়েন্টের পাঠানো যেকোন বার্তা পাই এবং আপাতত তাদের টার্মিনালে প্রিন্ট করুন।

তারপরে আমরা আপাতত ক্লায়েন্টকে একটি হার্ড-কোডেড প্রতিক্রিয়া পাঠাই। শেষ পর্যন্ত ক্লায়েন্টদের কাছ থেকে প্রাপ্ত বার্তাটি AI মডেলে পাঠানো হবে, এবং ক্লায়েন্টকে ফেরত পাঠানো প্রতিক্রিয়া হবে AI মডেলের প্রতিক্রিয়া৷

পোস্টম্যানে, আমরা একটি নতুন WebSocket অনুরোধ তৈরি করে এবং WebSocket এন্ডপয়েন্ট localhost:3500/chat এর সাথে সংযোগ করে এই এন্ডপয়েন্টটি পরীক্ষা করতে পারি .

আপনি সংযোগ ক্লিক করলে, বার্তা ফলকটি দেখাবে যে API ক্লায়েন্ট URL এর সাথে সংযুক্ত এবং একটি সকেট খোলা আছে।

এটি পরীক্ষা করার জন্য, চ্যাট সার্ভারে একটি বার্তা "হ্যালো বট" পাঠান, এবং আপনি একটি তাৎক্ষণিক পরীক্ষার প্রতিক্রিয়া পেতে হবে "প্রতিক্রিয়া:GPT পরিষেবা থেকে অনুকরণকারী প্রতিক্রিয়া" নীচে দেখানো হিসাবে:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা পোস্টম্যান চ্যাট পরীক্ষা

FastAPI তে ডিপেন্ডেন্সি ইনজেকশন

দুটি ভিন্ন ক্লায়েন্ট সেশনের মধ্যে পার্থক্য করতে এবং চ্যাট সেশনগুলিকে সীমাবদ্ধ করতে সক্ষম হতে, আমরা একটি টাইমড টোকেন ব্যবহার করব, যা ওয়েবসকেট সংযোগে একটি ক্যোয়ারী প্যারামিটার হিসাবে পাস করা হবে৷

সকেট ফোল্ডারে, utils.py নামে একটি ফাইল তৈরি করুন তারপর নিচের কোড যোগ করুন:

from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 return token

get_token ফাংশন একটি WebSocket এবং টোকেন গ্রহণ করে, তারপর টোকেনটি নেই বা শূন্য কিনা তা পরীক্ষা করে।

যদি এটি হয়, ফাংশনটি একটি নীতি লঙ্ঘনের স্থিতি প্রদান করে এবং যদি উপলব্ধ থাকে, ফাংশনটি কেবল টোকেন প্রদান করে। আমরা শেষ পর্যন্ত অতিরিক্ত টোকেন যাচাইকরণের সাথে এই ফাংশনটি প্রসারিত করব।

এই ফাংশনটি ব্যবহার করতে, আমরা এটিকে /chat এ ইনজেক্ট করি রুট FastAPI নির্ভরশীলতা সহজে ইনজেক্ট করার জন্য একটি নির্ভরশীল ক্লাস প্রদান করে, তাই আমাদের ডেকোরেটরদের সাথে টিঙ্কার করতে হবে না।

/chat আপডেট করুন নিম্নলিখিত রুট:

from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

এখন আপনি যখন /chat এর সাথে সংযোগ করার চেষ্টা করবেন পোস্টম্যানের শেষ পয়েন্টে, আপনি একটি 403 ত্রুটি পাবেন। ক্যোয়ারী প্যারামিটার হিসাবে একটি টোকেন প্রদান করুন এবং আপাতত টোকেনের যেকোনো মান প্রদান করুন। তারপরে আপনি আগের মতো সংযোগ করতে সক্ষম হবেন, শুধুমাত্র এখন সংযোগের জন্য একটি টোকেন প্রয়োজন৷

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা টোকেন সহ পোস্টম্যান চ্যাট পরীক্ষা

এই পর্যন্ত পাওয়ার জন্য অভিনন্দন! আপনার chat.py ফাইল এখন এই মত দেখতে হবে:

import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 data = {"name": name, "token": token}
 return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

এই টিউটোরিয়ালের পরবর্তী অংশে, আমরা আমাদের আবেদনের অবস্থা পরিচালনা এবং ক্লায়েন্ট এবং সার্ভারের মধ্যে ডেটা পাস করার উপর ফোকাস করব।

কিভাবে রেডিস দিয়ে রিয়েল-টাইম সিস্টেম তৈরি করবেন

আমাদের অ্যাপ্লিকেশন বর্তমানে কোনো রাজ্য সংরক্ষণ করে না, এবং ব্যবহারকারীদের শনাক্ত করার বা চ্যাট ডেটা সঞ্চয় ও পুনরুদ্ধারের কোনো উপায় নেই। আমরা চ্যাট সেশনের সময় ক্লায়েন্টকে একটি হার্ড-কোডেড প্রতিক্রিয়াও ফিরিয়ে দিচ্ছি।

টিউটোরিয়ালের এই অংশে, আমরা নিম্নলিখিতগুলি কভার করব:

  • কিভাবে একটি রেডিস ক্লাস্টার এর সাথে সংযোগ করতে হয় পাইথনে এবং একটি রেডিস ক্লায়েন্ট সেট আপ করুন
  • কিভাবে Redis JSON দিয়ে ডেটা সঞ্চয় ও পুনরুদ্ধার করবেন
  • কিভাবে রেডিস স্ট্রিম সেট আপ করবেন একটি ওয়েব সার্ভার এবং কর্মী পরিবেশের মধ্যে বার্তা সারি হিসাবে

রিডিস এবং ডিস্ট্রিবিউটেড মেসেজিং সারি

রেডিস একটি ওপেন সোর্স ইন-মেমরি ডেটা স্টোর যা আপনি ডাটাবেস, ক্যাশে, বার্তা ব্রোকার এবং স্ট্রিমিং ইঞ্জিন হিসাবে ব্যবহার করতে পারেন। এটি বেশ কয়েকটি ডেটা স্ট্রাকচার সমর্থন করে এবং রিয়েল-টাইম ক্ষমতা সহ বিতরণ করা অ্যাপ্লিকেশনগুলির জন্য একটি নিখুঁত সমাধান।

রেডিস এন্টারপ্রাইজ ক্লাউড Redis দ্বারা প্রদত্ত একটি সম্পূর্ণরূপে পরিচালিত ক্লাউড পরিষেবা যা আমাদের পরিকাঠামো নিয়ে চিন্তা না করে অসীম স্কেলে Redis ক্লাস্টার স্থাপন করতে সাহায্য করে।

আমরা এই টিউটোরিয়ালের জন্য একটি বিনামূল্যের রেডিস এন্টারপ্রাইজ ক্লাউড ইন্সট্যান্স ব্যবহার করব। আপনি এখানে রেডিস ক্লাউডের সাথে বিনামূল্যে শুরু করতে পারেন এবং একটি রেডিস ডাটাবেস সেট আপ করতে এই টিউটোরিয়ালটি অনুসরণ করতে পারেন এবং রেডিসের সাথে ইন্টারঅ্যাক্ট করার জন্য একটি GUI রেডিস ইনসাইট৷

একবার আপনি আপনার Redis ডাটাবেস সেট আপ করার পরে, প্রকল্প রুটে (সার্ভার ফোল্ডারের বাইরে) worker নামে একটি নতুন ফোল্ডার তৈরি করুন .

আমরা ওয়েব সার্ভার থেকে আমাদের কর্মী পরিবেশকে বিচ্ছিন্ন করব যাতে ক্লায়েন্ট যখন আমাদের WebSocket-এ একটি বার্তা পাঠায়, তখন ওয়েব সার্ভারকে তৃতীয় পক্ষের পরিষেবার অনুরোধটি পরিচালনা করতে না হয়। এছাড়াও, অন্যান্য ব্যবহারকারীদের জন্য সম্পদ মুক্ত করা যেতে পারে।

অনুমান API-এর সাথে পটভূমি যোগাযোগ এই কর্মী পরিষেবা দ্বারা পরিচালিত হয়, Redis এর মাধ্যমে৷

সমস্ত সংযুক্ত ক্লায়েন্টের অনুরোধগুলি বার্তা সারিতে (প্রযোজক) যুক্ত করা হয়, যখন কর্মী বার্তাগুলি গ্রহণ করে, অনুমান API-এ অনুরোধগুলি প্রেরণ করে এবং প্রতিক্রিয়া সারিতে প্রতিক্রিয়া যুক্ত করে।

একবার API একটি প্রতিক্রিয়া পায়, এটি ক্লায়েন্টের কাছে ফেরত পাঠায়।

প্রযোজক এবং ভোক্তার মধ্যে ভ্রমণের সময়, ক্লায়েন্ট একাধিক বার্তা পাঠাতে পারে এবং এই বার্তাগুলি সারিবদ্ধ করা হবে এবং ক্রমানুসারে প্রতিক্রিয়া জানানো হবে।

আদর্শভাবে, আমরা এই কর্মীকে সম্পূর্ণ ভিন্ন সার্ভারে, তার নিজস্ব পরিবেশে চালাতে পারতাম, কিন্তু আপাতত, আমরা আমাদের স্থানীয় মেশিনে এর নিজস্ব পাইথন পরিবেশ তৈরি করব৷

আপনি হয়তো ভাবছেন – কেন আমাদের একজন কর্মী দরকার? একটি দৃশ্যকল্প কল্পনা করুন যেখানে ওয়েব সার্ভার তৃতীয় পক্ষের পরিষেবাতে অনুরোধ তৈরি করে। এর মানে হল যে সকেট সংযোগের সময় তৃতীয় পক্ষের পরিষেবা থেকে প্রতিক্রিয়ার জন্য অপেক্ষা করার সময়, সার্ভারটি ব্লক করা হয় এবং API থেকে প্রতিক্রিয়া প্রাপ্ত না হওয়া পর্যন্ত সংস্থানগুলি আবদ্ধ থাকে।

আপনি একটি এলোমেলো ঘুম time.sleep(10) তৈরি করে এটি ব্যবহার করে দেখতে পারেন হার্ড-কোডেড প্রতিক্রিয়া পাঠানোর আগে এবং একটি নতুন বার্তা পাঠানোর আগে। তারপর একটি নতুন পোস্টম্যান সেশনে একটি ভিন্ন টোকেনের সাথে সংযোগ করার চেষ্টা করুন।

আপনি লক্ষ্য করবেন যে এলোমেলো ঘুমের সময় শেষ না হওয়া পর্যন্ত চ্যাট সেশনটি সংযুক্ত হবে না।

যদিও আমরা আরও উৎপাদন-কেন্দ্রিক সার্ভার সেট-আপে অ্যাসিঙ্ক্রোনাস কৌশল এবং কর্মী পুল ব্যবহার করতে পারি, একই সাথে ব্যবহারকারীর সংখ্যা বৃদ্ধির সাথে সাথে এটিও যথেষ্ট হবে না।

শেষ পর্যন্ত, আমরা আমাদের চ্যাট API এবং তৃতীয় পক্ষের API-এর মধ্যে যোগাযোগের ব্রোকার করার জন্য Redis ব্যবহার করে ওয়েব সার্ভার সংস্থানগুলিকে বাঁধা এড়াতে চাই৷

এরপরে একটি নতুন টার্মিনাল খুলুন, ওয়ার্কার ফোল্ডারে সিডি, এবং একটি নতুন পাইথন ভার্চুয়াল পরিবেশ তৈরি এবং সক্রিয় করুন যা আমরা অংশ 1-এ করেছি।

এরপরে, নিম্নলিখিত নির্ভরতাগুলি ইনস্টল করুন:

pip install aiohttp aioredis python-dotenv

কিভাবে একটি রেডিস ক্লায়েন্টের সাথে পাইথনে একটি রেডিস ক্লাস্টারের সাথে সংযোগ করতে হয়

আমরা রেডিস ডাটাবেসের সাথে সংযোগ করতে aioredis ক্লায়েন্ট ব্যবহার করব। আমরা Huggingface inference API এ অনুরোধ পাঠাতে অনুরোধ লাইব্রেরি ব্যবহার করব।

দুটি ফাইল .env তৈরি করুন , এবং main.py . তারপর src নামে একটি ফোল্ডার তৈরি করুন . এছাড়াও, redis নামে একটি ফোল্ডার তৈরি করুন এবং config.py নামে একটি নতুন ফাইল যোগ করুন .

.env-এ ফাইল, নিম্নলিখিত কোড যোগ করুন - এবং নিশ্চিত করুন যে আপনি আপনার রেডিস ক্লাস্টারে প্রদত্ত শংসাপত্র সহ ক্ষেত্রগুলি আপডেট করেছেন৷

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

config.py-এ নীচে Redis ক্লাস যোগ করুন:

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

আমরা একটি Redis অবজেক্ট তৈরি করি এবং এনভায়রনমেন্ট ভেরিয়েবল থেকে প্রয়োজনীয় প্যারামিটারগুলি শুরু করি। তারপর আমরা একটি অ্যাসিঙ্ক্রোনাস পদ্ধতি তৈরি করি create_connection একটি Redis সংযোগ তৈরি করতে এবং aioredis থেকে প্রাপ্ত সংযোগ পুল ফেরত দিতে পদ্ধতি from_url .

এর পরে, আমরা নীচের কোডটি চালিয়ে main.py-এ Redis সংযোগ পরীক্ষা করি। এটি একটি নতুন রেডিস সংযোগ পুল তৈরি করবে, একটি সাধারণ কী "কী" সেট করবে এবং এটিতে একটি স্ট্রিং "মান" বরাদ্দ করবে৷


from src.redis.config import Redis
import asyncio
async def main():
 redis = Redis()
 redis = await redis.create_connection()
 print(redis)
 await redis.set("key", "value")
if __name__ == "__main__":
 asyncio.run(main())

এখন রেডিস ইনসাইট খুলুন (আপনি যদি এটি ডাউনলোড এবং ইনস্টল করার জন্য টিউটোরিয়ালটি অনুসরণ করেন) আপনি এইরকম কিছু দেখতে পাবেন:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা Redis অন্তর্দৃষ্টি পরীক্ষা

কিভাবে রেডিস স্ট্রিমগুলির সাথে কাজ করবেন

এখন যেহেতু আমাদের কর্মী পরিবেশ সেটআপ আছে, আমরা ওয়েব সার্ভারে একজন প্রযোজক এবং কর্মীতে একজন ভোক্তা তৈরি করতে পারি।

প্রথমে, সার্ভারে আবার আমাদের Redis ক্লাস তৈরি করা যাক। server.src-এ redis নামে একটি ফোল্ডার তৈরি করুন এবং দুটি ফাইল যোগ করুন, config.py এবং producer.py .

config.py-এ , নীচের কোডটি যোগ করুন যেমন আমরা কর্মী পরিবেশের জন্য করেছি:

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

.env ফাইলে, Redis শংসাপত্রগুলিও যোগ করুন:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

অবশেষে, server.src.redis.producer.py-এ নিম্নলিখিত কোড যোগ করুন:


from .config import Redis
class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel):
 try:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id
 except Exception as e:
 print(f"Error sending msg to stream => {e}")

আমরা একটি প্রযোজক ক্লাস তৈরি করেছি যা একটি রেডিস ক্লায়েন্টের সাথে শুরু করা হয়েছে। আমরা add_to_stream দিয়ে স্ট্রীমে ডেটা যোগ করতে এই ক্লায়েন্ট ব্যবহার করি পদ্ধতি, যা ডেটা এবং Redis চ্যানেলের নাম নেয়।

একটি স্ট্রিম চ্যানেলে ডেটা যোগ করার জন্য Redis কমান্ড হল xadd এবং এটি aioredis-এ উচ্চ-স্তরের এবং নিম্ন-স্তরের ফাংশন উভয়ই রয়েছে।

এর পরে, আমাদের নতুন তৈরি প্রযোজক চালানোর জন্য, chat.py আপডেট করুন এবং WebSocket /chat নীচের মত শেষ বিন্দু। আপডেট করা চ্যানেলের নাম message_channel লক্ষ্য করুন .


from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

এরপরে, পোস্টম্যানে, একটি সংযোগ তৈরি করুন এবং Hello বলে যে কোনো সংখ্যক বার্তা পাঠান . নিচের মত টার্মিনালে প্রিন্ট করা স্ট্রীম বার্তাগুলি আপনার থাকা উচিত:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা টার্মিনাল চ্যানেল বার্তা পরীক্ষা

Redis Insight-এ, আপনি একটি নতুন mesage_channel দেখতে পাবেন তৈরি করা হয়েছে এবং একটি টাইম-স্ট্যাম্পযুক্ত সারি ক্লায়েন্ট থেকে পাঠানো বার্তা দিয়ে ভরা। এই টাইমস্ট্যাম্পড সারিটি বার্তাগুলির ক্রম রক্ষা করার জন্য গুরুত্বপূর্ণ৷

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা রেডিস ইনসাইট চ্যানেল

চ্যাট ডেটা কীভাবে মডেল করবেন

এর পরে, আমরা আমাদের চ্যাট বার্তাগুলির জন্য একটি মডেল তৈরি করব৷ মনে রাখবেন যে আমরা WebSockets এর মাধ্যমে পাঠ্য ডেটা পাঠাচ্ছি, তবে আমাদের চ্যাট ডেটাতে কেবল পাঠ্যের চেয়ে আরও বেশি তথ্য রাখা দরকার। যখন চ্যাট পাঠানো হয়েছিল তখন আমাদের টাইমস্ট্যাম্প করতে হবে, প্রতিটি বার্তার জন্য একটি আইডি তৈরি করতে হবে এবং চ্যাট সেশন সম্পর্কে ডেটা সংগ্রহ করতে হবে, তারপর এই ডেটাটি একটি JSON ফর্ম্যাটে সংরক্ষণ করতে হবে।

আমরা এই JSON ডেটা রেডিস-এ সংরক্ষণ করতে পারি যাতে সংযোগটি হারিয়ে গেলে আমরা চ্যাট ইতিহাস হারাতে পারি না, কারণ আমাদের ওয়েবসকেট স্টেট সংরক্ষণ করে না।

server.src-এ schema নামে একটি নতুন ফোল্ডার তৈরি করুন . তারপর chat.py নামে একটি ফাইল তৈরি করুন server.src.schema-এ নিম্নলিখিত কোড যোগ করুন:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = uuid.uuid4()
 msg: str
 timestamp = str(datetime.now())
class Chat(BaseModel):
 token: str
 messages: List[Message]
 name: str
 session_start = str(datetime.now())

আমরা Pydantic এর BaseModel ব্যবহার করছি চ্যাট ডেটা মডেল করার জন্য ক্লাস। Chat ক্লাসে একটি একক চ্যাট সেশনের তথ্য থাকবে। এটি datetime.now() ব্যবহার করে চ্যাট সেশন শুরুর সময়ের জন্য টোকেন, ব্যবহারকারীর নাম এবং একটি স্বয়ংক্রিয়ভাবে তৈরি টাইমস্ট্যাম্প সংরক্ষণ করবে। .

এই চ্যাট সেশনের মধ্যে পাঠানো এবং প্রাপ্ত বার্তাগুলি একটি Message দিয়ে সংরক্ষণ করা হয় ক্লাস যা uuid4 ব্যবহার করে ফ্লাইতে একটি চ্যাট আইডি তৈরি করে . এই Message আরম্ভ করার সময় আমাদের শুধুমাত্র ডেটা প্রদান করতে হবে ক্লাস হল মেসেজ টেক্সট।

কিভাবে Redis JSON এর সাথে কাজ করবেন

আমাদের চ্যাট ইতিহাস সংরক্ষণ করার জন্য Redis JSON-এর ক্ষমতা ব্যবহার করার জন্য, আমাদের Redis ল্যাব দ্বারা প্রদত্ত rejson ইনস্টল করতে হবে।

টার্মিনালে, server এ সিডি করুন এবং pip install rejson দিয়ে rejson ইনস্টল করুন . তারপর আপনার Redis আপডেট করুন server.src.redis.config.py-এ ক্লাস create_rejson_connection অন্তর্ভুক্ত করতে পদ্ধতি:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

আমরা create_rejson_connection যোগ করছি rejson Client দিয়ে Redis-এর সাথে সংযোগ করার পদ্ধতি . এটি আমাদেরকে Redis-এ JSON ডেটা তৈরি এবং ম্যানিপুলেট করার পদ্ধতি দেয়, যা aioredis-এর সাথে উপলব্ধ নয়৷

এরপর, server.src.routes.chat.py-এ আমরা /token আপডেট করতে পারি একটি নতুন Chat তৈরি করতে এন্ডপয়েন্ট রেডিস JSON-এ সেশন ডাটা সংরক্ষন করুন এইভাবে:

@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create new chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()

দ্রষ্টব্য:যেহেতু এটি একটি ডেমো অ্যাপ, আমি চ্যাট ডেটা রেডিস-এ বেশিক্ষণ সংরক্ষণ করতে চাই না। তাই আমি aioredis ক্লায়েন্ট ব্যবহার করে টোকেনে একটি 60-মিনিটের সময় যোগ করেছি (rejson টাইমআউট বাস্তবায়ন করে না)। এর মানে হল যে 60 মিনিট পরে, চ্যাট সেশনের ডেটা হারিয়ে যাবে।

এটি প্রয়োজনীয় কারণ আমরা ব্যবহারকারীদের প্রমাণীকরণ করছি না এবং আমরা একটি নির্দিষ্ট সময়ের পরে চ্যাট ডেটা ডাম্প করতে চাই। এই ধাপটি ঐচ্ছিক, এবং আপনাকে এটি অন্তর্ভুক্ত করতে হবে না।

পরবর্তীতে, পোস্টম্যানে, যখন আপনি একটি নতুন টোকেন তৈরি করার জন্য একটি POST অনুরোধ পাঠাবেন, আপনি নীচের মত একটি কাঠামোগত প্রতিক্রিয়া পাবেন। আপনি একটি JSON কী হিসাবে টোকেনের সাথে সংরক্ষিত আপনার চ্যাট ডেটা এবং মান হিসাবে ডেটা দেখতে Redis ইনসাইট দেখতে পারেন৷

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা টোকেন জেনারেটর আপডেট করা হয়েছে

টোকেন নির্ভরতা কিভাবে আপডেট করবেন

এখন যেহেতু আমাদের কাছে একটি টোকেন তৈরি এবং সংরক্ষণ করা হচ্ছে, এটি get_token আপডেট করার একটি ভাল সময়। আমাদের /chat-এ নির্ভরতা ওয়েবসকেট। আমরা চ্যাট সেশন শুরু করার আগে একটি বৈধ টোকেন পরীক্ষা করার জন্য এটি করি।

server.src.socket.utils.py-এ get_token আপডেট করুন Redis উদাহরণে টোকেন বিদ্যমান কিনা তা পরীক্ষা করার ফাংশন। যদি তা হয় তাহলে আমরা টোকেন ফেরত দিই, যার মানে হল সকেট সংযোগ বৈধ। যদি এটি বিদ্যমান না থাকে, আমরা সংযোগ বন্ধ করে দিই।

/token দ্বারা তৈরি টোকেন 60 মিনিটের পরে অস্তিত্ব বন্ধ হয়ে যাবে। তাই চ্যাট শুরু করার চেষ্টা করার সময় কোনো ত্রুটির প্রতিক্রিয়া তৈরি হলে ব্যবহারকারীকে একটি নতুন টোকেন জেনারেট করতে রিডাইরেক্ট করার জন্য ফ্রন্টএন্ডে আমাদের কিছু সহজ যুক্তি থাকতে পারে।


from ..redis.config import Redis
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 redis_client = await redis.create_connection()
 isexists = await redis_client.exists(token)
 if isexists == 1:
 return token
 else:
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")

নির্ভরতা পরীক্ষা করতে, আমরা যে এলোমেলো টোকেন ব্যবহার করছি তার সাথে চ্যাট সেশনে সংযোগ করুন এবং আপনার একটি 403 ত্রুটি পাওয়া উচিত। (উল্লেখ্য যে আপনাকে Redis Insight-এ টোকেনটি ম্যানুয়ালি মুছে ফেলতে হবে।)

এখন আপনি /token এ পোস্টের অনুরোধ পাঠানোর সময় তৈরি হওয়া টোকেনটি অনুলিপি করুন এন্ডপয়েন্ট (বা একটি নতুন অনুরোধ তৈরি করুন) এবং /chat দ্বারা প্রয়োজনীয় টোকেন কোয়েরি প্যারামিটারে মান হিসাবে পেস্ট করুন ওয়েবসকেট। তারপর সংযোগ করুন। আপনার একটি সফল সংযোগ পাওয়া উচিত।

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা টোকেনের সাথে চ্যাট সেশন

সবগুলোকে একত্রিত করে, আপনার chat.py দেখতে নিচের মত হওয়া উচিত।


import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create nee chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 print(chat_session.dict())
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

এটা এই পর্যন্ত পৌঁছানোর উপর ভাল কাজ! পরবর্তী বিভাগে, আমরা AI মডেলের সাথে যোগাযোগ এবং ক্লায়েন্ট, সার্ভার, কর্মী এবং বহিরাগত API-এর মধ্যে ডেটা স্থানান্তর পরিচালনা করার উপর ফোকাস করব৷

এআই মডেলগুলির সাথে চ্যাটবটগুলিতে কীভাবে বুদ্ধিমত্তা যুক্ত করবেন

এই বিভাগে, আমরা ট্রান্সফরমার মডেলের সাথে যোগাযোগ করার জন্য একটি র‍্যাপার তৈরি করার উপর ফোকাস করব, একটি কথোপকথনমূলক বিন্যাসে API-এ ব্যবহারকারীর কাছ থেকে প্রম্পট পাঠাতে এবং আমাদের চ্যাট অ্যাপ্লিকেশনের জন্য প্রতিক্রিয়াগুলি গ্রহণ ও রূপান্তরিত করব৷

হাগিংফেস দিয়ে কিভাবে শুরু করবেন

আমরা Hugginface-এ কোনো ভাষার মডেল তৈরি বা স্থাপন করব না। পরিবর্তে, আমরা প্রাক-প্রশিক্ষিত মডেলগুলির সাথে সংযোগ করতে Huggingface এর ত্বরিত অনুমান API ব্যবহার করার উপর ফোকাস করব।

আমরা যে মডেলটি ব্যবহার করব তা হল EleutherAI দ্বারা প্রদত্ত GPT-J-6B মডেল। এটি একটি জেনারেটিভ ল্যাঙ্গুয়েজ মডেল যা 6 বিলিয়ন প্যারামিটারের সাথে প্রশিক্ষিত ছিল।

Huggingface এই মডেলের সাথে সংযোগ করার জন্য আমাদের একটি অন-ডিমান্ড সীমিত API প্রদান করে।

Huggingface দিয়ে শুরু করতে, একটি বিনামূল্যের অ্যাকাউন্ট তৈরি করুন৷ আপনার সেটিংসে, একটি নতুন অ্যাক্সেস টোকেন তৈরি করুন৷ 30k টোকেন পর্যন্ত, Huggingface বিনামূল্যে অনুমান API-এ অ্যাক্সেস প্রদান করে।

আপনি এখানে আপনার API ব্যবহার নিরীক্ষণ করতে পারেন। নিশ্চিত করুন যে আপনি এই টোকেনটি সুরক্ষিত রেখেছেন এবং এটি সর্বজনীনভাবে প্রকাশ করবেন না।

দ্রষ্টব্য:আমরা API এর সাথে যোগাযোগ করতে HTTP সংযোগ ব্যবহার করব কারণ আমরা একটি বিনামূল্যের অ্যাকাউন্ট ব্যবহার করছি। কিন্তু PRO Huggingface অ্যাকাউন্টটি WebSockets এর সাথে স্ট্রিমিং সমর্থন করে সমান্তরালতা এবং ব্যাচের কাজগুলি দেখুন।

এটি মডেল এবং আমাদের চ্যাট অ্যাপ্লিকেশনের মধ্যে প্রতিক্রিয়া সময়কে উল্লেখযোগ্যভাবে উন্নত করতে সাহায্য করতে পারে এবং আমি আশা করি একটি ফলো-আপ নিবন্ধে এই পদ্ধতিটি কভার করব৷

ভাষা মডেলের সাথে কীভাবে ইন্টারঅ্যাক্ট করবেন

প্রথমে, আমরা আমাদের কর্মী ডিরেক্টরির মধ্যে .env ফাইলে Huggingface সংযোগের শংসাপত্র যোগ করি।

export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

এরপর, worker.src-এ model নামে একটি ফোল্ডার তৈরি করুন তারপর একটি ফাইল যোগ করুন gptj.py . তারপর নিচের GPT ক্লাস যোগ করুন:

import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": True,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = input
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 print(json.loads(response.content.decode("utf-8")))
 return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

GPT ক্লাসটি Huggingface মডেল url দিয়ে শুরু করা হয়েছে , প্রমাণীকরণ header , এবং পূর্বনির্ধারিত payload . কিন্তু পেলোড ইনপুট হল একটি গতিশীল ক্ষেত্র যা query দ্বারা সরবরাহ করা হয় আমরা Huggingface এন্ডপয়েন্টে একটি অনুরোধ পাঠানোর আগে পদ্ধতি এবং আপডেট করা হয়েছে৷

অবশেষে, আমরা সরাসরি জিপিটি ক্লাসের একটি উদাহরণে ক্যোয়ারী পদ্ধতি চালিয়ে এটি পরীক্ষা করি। টার্মিনালে, python src/model/gptj.py চালান , এবং আপনি এই মত একটি প্রতিক্রিয়া পাবেন (শুধু মনে রাখবেন যে আপনার প্রতিক্রিয়া অবশ্যই এর থেকে আলাদা হবে):

[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]

এর পরে, আমরা ইনপুটের ফর্ম্যাট পরিবর্তন করে মডেলের সাথে মিথস্ক্রিয়াকে আরও কথোপকথন করতে ইনপুটে কিছু টুইকিং যোগ করি।

GPT আপডেট করুন এভাবে ক্লাস করুন:


class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": False,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = f"Human: {input} Bot:"
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 data = json.loads(response.content.decode("utf-8"))
 text = data[0]['generated_text']
 res = str(text.split("Human:")[0]).strip("\n").strip()
 return res
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

আমরা একটি স্ট্রিং লিটারেল f"Human: {input} Bot:" দিয়ে ইনপুট আপডেট করেছি . মানুষের ইনপুট স্ট্রিং এ স্থাপন করা হয় এবং বট একটি প্রতিক্রিয়া প্রদান করে। এই ইনপুট বিন্যাস GPT-J6B কে একটি কথোপকথন মডেলে পরিণত করে। আপনি লক্ষ্য করতে পারেন অন্যান্য পরিবর্তন অন্তর্ভুক্ত

  • use_cache:ইনপুট একই থাকলে আপনি মডেলটিকে একটি নতুন প্রতিক্রিয়া তৈরি করতে চাইলে আপনি এই False করতে পারেন। একজন ব্যবহারকারী যদি একই বার্তা দিয়ে বটটিকে স্প্যামিং করতে থাকে তবে আপনার বিনামূল্যের টোকেনগুলিকে নিঃশেষ করা রোধ করতে আমি এটিকে প্রোডাকশনে সত্য হিসাবে রেখে দেওয়ার পরামর্শ দিচ্ছি। ক্যাশে ব্যবহার করলে আসলে মডেল থেকে একটি নতুন প্রতিক্রিয়া লোড হয় না।
  • রিটার্ন_ফুল_টেক্সট:এটি মিথ্যা, কারণ আমাদের ইনপুট ফেরত দেওয়ার দরকার নেই - আমাদের কাছে এটি ইতিমধ্যেই রয়েছে। যখন আমরা একটি প্রতিক্রিয়া পাই, তখন আমরা প্রতিক্রিয়া থেকে "বট:" এবং অগ্রণী/পরবর্তী স্থানগুলি সরিয়ে ফেলি এবং শুধুমাত্র প্রতিক্রিয়া পাঠ্য ফেরত দেই৷

এআই মডেলের জন্য কীভাবে স্বল্পমেয়াদী মেমরি অনুকরণ করা যায়

আমরা মডেলে পাঠাই প্রতিটি নতুন ইনপুটের জন্য, মডেলের কথোপকথনের ইতিহাস মনে রাখার কোনো উপায় নেই৷ এটি গুরুত্বপূর্ণ যদি আমরা কথোপকথনে প্রসঙ্গ ধরে রাখতে চাই।

কিন্তু মনে রাখবেন যে মডেলটিতে আমরা যত টোকেন পাঠাই, প্রক্রিয়াকরণ আরও ব্যয়বহুল হয় এবং প্রতিক্রিয়ার সময়ও দীর্ঘ হয়।

তাই আমাদের স্বল্প-মেয়াদী ইতিহাস পুনরুদ্ধার করার একটি উপায় খুঁজে বের করতে হবে এবং মডেলটিতে পাঠাতে হবে। আমাদের একটি মিষ্টি জায়গাও বের করতে হবে - আমরা কতটা ঐতিহাসিক ডেটা পুনরুদ্ধার করতে চাই এবং মডেলটিতে পাঠাতে চাই?

চ্যাট ইতিহাস পরিচালনা করতে, আমাদের JSON ডাটাবেসে ফিরে যেতে হবে। আমরা token ব্যবহার করব শেষ চ্যাট ডেটা পেতে, এবং তারপর যখন আমরা প্রতিক্রিয়া পাই, প্রতিক্রিয়াটি JSON ডাটাবেসে যুক্ত করুন৷

worker.src.redis.config.py আপডেট করুন create_rejson_connection অন্তর্ভুক্ত করতে পদ্ধতি এছাড়াও, প্রমাণীকরণ ডেটা সহ .env ফাইলটি আপডেট করুন এবং নিশ্চিত করুন যে rejson ইনস্টল করা আছে৷

আপনার worker.src.redis.config.py এই মত দেখতে হবে:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

যখন আপনার .env ফাইলটি দেখতে এইরকম হওয়া উচিত:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

এরপর, worker.src.redis-এ cache.py নামে একটি নতুন ফাইল তৈরি করুন এবং নীচের কোড যোগ করুন:

from .config import Redis
from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

ক্যাশে একটি rejson ক্লায়েন্ট, এবং পদ্ধতি get_chat_history দিয়ে আরম্ভ করা হয় Redis থেকে সেই টোকেনের জন্য চ্যাটের ইতিহাস পেতে একটি টোকেন নেয়। নিশ্চিত করুন যে আপনি rejson থেকে পাথ অবজেক্ট আমদানি করেছেন৷

এরপর, worker.main.py আপডেট করুন নিচের কোড সহ:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
if __name__ == "__main__":
 asyncio.run(main())

আমি পোস্টম্যানের পূর্ববর্তী পরীক্ষা থেকে তৈরি একটি নমুনা টোকেন হার্ড-কোড করেছি। আপনি যদি একটি টোকেন তৈরি না করে থাকেন, তাহলে শুধু /token-এ একটি নতুন অনুরোধ পাঠান এবং টোকেন কপি করুন, তারপর python main.py চালান টার্মিনালে আপনি টার্মিনালে ডেটা দেখতে পাবেন এভাবে:

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

এরপরে, আমাদের একটি add_message_to_cache যোগ করতে হবে আমাদের Cache এর পদ্ধতি ক্লাস যা একটি নির্দিষ্ট টোকেনের জন্য Redis-এ বার্তা যোগ করে।


 async def add_message_to_cache(self, token: str, message_data: dict):
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

jsonarrappend rejson দ্বারা প্রদত্ত পদ্ধতি বার্তা অ্যারেতে নতুন বার্তা যুক্ত করে।

নোট করুন যে বার্তা অ্যারে অ্যাক্সেস করতে, আমাদের .messages প্রদান করতে হবে পথ একটি যুক্তি হিসাবে. যদি আপনার বার্তা ডেটার একটি ভিন্ন/নেস্টেড কাঠামো থাকে, তবে আপনি যে অ্যারেতে নতুন ডেটা যুক্ত করতে চান তার পথটি প্রদান করুন৷

এই পদ্ধতিটি পরীক্ষা করতে, main.py ফাইলে প্রধান ফাংশনটি নীচের কোডটি দিয়ে আপডেট করুন:

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

আমরা ক্যাশে একটি হার্ড-কোডেড বার্তা পাঠাচ্ছি, এবং ক্যাশে থেকে চ্যাট ইতিহাস পাচ্ছি। যখন আপনি python main.py চালান ওয়ার্কার ডিরেক্টরির মধ্যে টার্মিনালে, আপনি টার্মিনালে এইরকম কিছু প্রিন্ট করা উচিত, বার্তা অ্যারেতে বার্তা যুক্ত করা।

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

অবশেষে, জিপিটি মডেলে বার্তা ডেটা পাঠানোর জন্য আমাদের প্রধান ফাংশন আপডেট করতে হবে এবং শেষ 4 দিয়ে ইনপুট আপডেট করতে হবে। ক্লায়েন্ট এবং মডেলের মধ্যে পাঠানো বার্তা।

প্রথমে আমাদের add_message_to_cache আপডেট করা যাক function with a new argument "source" that will tell us if the message is a human or bot. We can then use this arg to add the "Human:" or "Bot:" tags to the data before storing it in the cache.

Update the add_message_to_cache method in the Cache class like so:

 async def add_message_to_cache(self, token: str, source: str, message_data: dict):
 if source == "human":
 message_data['msg'] = "Human: " + (message_data['msg'])
 elif source == "bot":
 message_data['msg'] = "Bot: " + (message_data['msg'])
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

Then update the main function in main.py in the worker directory, and run python main.py to see the new results in the Redis database.

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.

You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.

In worker.src , create a new folder schema. Then create a new file named chat.py and paste our message schema in chat.py like so:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = str(uuid.uuid4())
 msg: str
 timestamp = str(datetime.now())

Next, update the main.py file like below:

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "3",
 "msg": "I would like to go to the moon to, would you take me?",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())

In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added.

Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out.

Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string.

Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"

Next, run python main.py a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model.

Open Redis Insight and you should have something similar to the below:

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা Conversational Chat

Stream Consumer and Real-time Data Pull from the Message Queue

Next, we want to create a consumer and update our worker.main.py to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.

In worker.src.redis create a new file named stream.py . Add a StreamConsumer class with the code below:

class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

The StreamConsumer class is initialized with a Redis client. The consume_stream method pulls a new message from the queue from the message channel, using the xread method provided by aioredis.

Next, update the worker.main.py file with a while loop to keep the connection to the message channel alive, like so:


from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(token)
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

This is quite the update, so let's take it step by step:

We use a while True loop so that the worker can be online listening to messages from the queue.

Next, we await new messages from the message_channel by calling our consume_stream পদ্ধতি If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model query পদ্ধতি

Once we get a response, we then add the response to the cache using the add_message_to_cache method, then delete the message from the queue.

How to Update the Chat Client with the AI Response

So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response.

Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response.

If the connection is closed, the client can always get a response from the chat history using the refresh_token শেষবিন্দু।

In worker.src.redis create a new file named producer.py , and add a Producer class similar to what we had on the chat web server:


class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel) -> bool:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id

Next, in the main.py file, update the main function to initialize the producer, create a stream data, and send the response to a response_channel using the add_to_stream method:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 producer = Producer(redis_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 stream_data = {}
 stream_data[str(token)] = str(msg.dict())
 await producer.add_to_stream(stream_data, "response_channel")
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

Next, we need to let the client know when we receive responses from the worker in the /chat socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.

Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.

In server.src.redis create a new file named stream.py and add our StreamConsumer class like this:

from .config import Redis
class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

Next, update the /chat socket endpoint like so:

from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 consumer = StreamConsumer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[str(token)] = str(data)
 await producer.add_to_stream(stream_data, "message_channel")
 response = await consumer.consume_stream(stream_channel="response_channel", block=0)
 print(response)
 for stream, messages in response:
 for message in messages:
 response_token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 if token == response_token:
 response_message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(message[0].decode('utf-8'))
 print(token)
 print(response_token)
 await manager.send_personal_message(response_message, websocket)
 await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
 except WebSocketDisconnect:
 manager.disconnect(websocket)

Refresh Token

Finally, we need to update the /refresh_token endpoint to get the chat history from the Redis database using our Cache ক্লাস

In server.src.redis , add a cache.py file and add the code below:


from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

Next, in server.src.routes.chat.py import the Cache class and update the /token endpoint to the below:


from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
 json_client = redis.create_rejson_connection()
 cache = Cache(json_client)
 data = await cache.get_chat_history(token)
 if data == None:
 raise HTTPException(
 status_code=400, detail="Session expired or does not exist")
 else:
 return data

Now, when we send a GET request to the /refresh_token endpoint with any token, the endpoint will fetch the data from the Redis database.

If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.

How to Test the Chat with multiple Clients in Postman

Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients.

Lastly, we will try to get the chat history for the clients and hopefully get a proper response.

মাস্টার এআই চ্যাটবট:রেডিস এবং পাইথন সহ একটি শক্তিশালী জিপিটি-চালিত বট তৈরি করুন - একটি ব্যাপক নির্দেশিকা

Recap

Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.

Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.

When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.

If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.

এই পর্যন্ত পাওয়ার জন্য অভিনন্দন! You have been able to build a working chat system.

In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.

This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**

You can download the full repository on My Github Repository

I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:

  • Try Redis Cloud free of charge
  • Watch this video on the benefits of Redis Cloud over other Redis providers
  • Redis Developer Hub - tools, guides, and tutorials about Redis
  • RedisInsight Desktop GUI

বিনামূল্যে কোড শিখুন. freeCodeCamp-এর ওপেন সোর্স পাঠ্যক্রম 40,000-এরও বেশি লোককে ডেভেলপার হিসেবে চাকরি পেতে সাহায্য করেছে। শুরু করুন


  1. দুই যোগফল IV - ইনপুট হল C++ এ একটি BST

  2. কিভাবে CSS দিয়ে বিভিন্ন ডিভাইস লুক (স্মার্টফোন, ট্যাবলেট এবং ল্যাপটপ) তৈরি করবেন?

  3. অ্যান্ড্রয়েডে সিঙ্ক্রোনাইজড সর্টেডসেট কীভাবে ব্যবহার করবেন?

  4. C-তে fillpoly() ফাংশন