কম্পিউটার টিউটোরিয়াল

Redis তালিকা এবং TypeScript সহ একটি কাস্টম বার্তা সারি তৈরি করুন৷

আপনি কি কখনও আপনার নিজস্ব বার্তা সারি তৈরি করার চেষ্টা করেছেন কিন্তু চ্যালেঞ্জের সম্মুখীন হয়েছেন? যদি তাই হয়, আপনি একা নন৷ এই টিউটোরিয়ালে, আমরা Redis তালিকা ব্যবহার করে স্ক্র্যাচ থেকে একটি বার্তা সারি তৈরি করতে যাচ্ছি। Redis এর সাথে বার্তা সারি তৈরি করার জন্য বিভিন্ন পদ্ধতি আছে, যেমন স্ট্রীম, তালিকা এবং পাব/সাব, আমরা সবচেয়ে সহজ এবং সবচেয়ে সহজ পদ্ধতির উপর ফোকাস করব:তালিকা। আমরা এই ব্যবহারিক নির্দেশিকাটি অনুসন্ধান করার সাথে সাথে আমার সাথে যোগ দিন।

আমরা কি ব্যবহার করব

  • আপস্ট্যাশ
  • হাত জোড়া

আপনার যা প্রয়োজন

  • বান
  • হাত জোড়া

Upstash Redis সেট আপ করা হচ্ছে

প্রথমে, এর একটি Redis উদাহরণ সেট আপ করা যাক। এটি করতে, শুধুমাত্র Upstash এ যান এবং ডেটাবেস তৈরি করুন এ ক্লিক করুন .এরপর, আপনার সংযোগ স্ট্রিং খুঁজে পেতে নীচে স্ক্রোল করুন, যা আমরা আমাদের ক্লায়েন্টকে সংযুক্ত করতে ব্যবহার করব৷ আমি এখানে বিশদ বিবরণে যাব না, তবে এটিই মূলত আপনাকে শুরু করতে হবে৷

উদাহরণ সংযোগ স্ট্রিং:

redis://XXXXe@social-XXX-39281.upstash.io:39281

প্রজেক্ট কিকঅফ

আসুন Bun ব্যবহার করে আমাদের TypeScript প্রকল্প শুরু করি। পছন্দটি শুধুমাত্র এই কারণে নয় যে এটি নোডের চেয়ে দ্রুততর - এটি সেট আপ করাও অনেক সহজ। এবং হ্যাঁ, এটি চিত্তাকর্ষকভাবে দ্রুত! 🚀

mkdir upstash-mq
cd upstash-mq
 
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
 
bun add ioredis

প্রকল্প কাঠামো

 ┣ 📂src
 ┃ ┣ 📂lua-scripts
 ┃ ┃ ┣ 📜add-job.lua
 ┃ ┃ ┗ 📜remove-job.lua
 ┃ ┣ 📜index.ts
 ┃ ┣ 📜job.ts
 ┃ ┣ 📜queue.ts
 ┃ ┗ 📜utils.ts
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜README.md
 ┣ 📜bun.lockb
 ┣ 📜index.ts
 ┣ 📜package.json
 ┗ 📜tsconfig.json

আমাদের Queue এর ভিজ্যুয়াল উপস্থাপনা এরকম হবে:

Redis তালিকা এবং TypeScript সহ একটি কাস্টম বার্তা সারি তৈরি করুন৷

চাকরি

আমাদের চাকরির ক্লাসে কয়েকটি মূল জিনিস প্রয়োজন। প্রথমত, আমাদের প্রতিটি কাজের স্থিতি ট্র্যাক করতে হবে৷ এটি আমাদেরকে সেগুলি প্রক্রিয়াকরণ করতে, সেগুলিকে আবার চেষ্টা করতে হবে, অথবা যদি সেগুলি ইতিমধ্যেই শেষ হয়ে যায় তবে সেগুলিকে অন্য কোথাও সরাতে সাহায্য করে৷ প্রতিটি কাজের একটি আইডি এবং কিছু ডেটা থাকে, যা জেনেরিক হওয়া প্রয়োজন যাতে আমরা একটি দুর্দান্ত ব্যবহারকারীর অভিজ্ঞতা প্রদান করতে পারি৷ অবশেষে, আমাদের প্রতিটি কাজকে তার সারিতে লিঙ্ক করতে হবে এবং সহজ পরিচালনার জন্য সারির নাম অন্তর্ভুক্ত করতে হবে৷

এখানে আমাদের Job এর ব্যাকবোন রয়েছে ক্লাস:

type OwnerQueue = {
 redis: Redis;
 queueName: string;
};
export type JobStatuses =
 | "created"
 | "waiting"
 | "active"
 | "succeeded"
 | "failed";
 
export class Job<T> {
 id: string;
 status: JobStatuses;
 config: OwnerQueue;
 data: T;
 
 constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
 this.id = jobId;
 this.status = "created";
 this.data = data;
 this.config = ownerConfig;
 }
}

data তৈরি করতে জেনেরিক, আমাদের প্রথমে Job তৈরি করতে হবে নিজেই জেনেরিক। বাকিটা সহজবোধ্যভাবে অনুসরণ করে। আমরা প্রতিটি Job-এর জন্য একটি আলাদা Redis উদাহরণ তৈরি করতে পারি। উপাদান, কিন্তু এটি পরিচালনা করা জটিল হবে।

সৌভাগ্যবশত, আমাদের পন্থা সারির মধ্যে রেডিস ইন্সট্যান্সের সহজ কনফিগারেশনের জন্য অনুমতি দেয় এবং আমরা প্রয়োজন অনুসারে এই উদাহরণটি পাস করতে পারি। একই নীতি queueName এ প্রযোজ্য . যেহেতু আমরা প্রায়শই সারিতে কাজ সংরক্ষণ করতে এটি ব্যবহার করব, তাই আমাদের কাজগুলিকে অবশ্যই তাদের মূল সারি সম্পর্কে সচেতন হতে হবে৷ সারিতে কাজগুলি সংরক্ষণ করতে সক্ষম হতে আমাদের দুটি জিনিসের প্রয়োজন:Redis এবং কিছু ইউটিলিটিগুলির সাথে যোগাযোগ করার জন্য একটি Lua স্ক্রিপ্ট৷

আসুন প্রথমে আমাদের ইউটিলিটি তৈরি করি:

import { JobStatuses } from "./job";
 
const MQ_PREFIX = "UpstashMQ";
 
export const formatMessageQueueKey = (queueName: string, key: string) => {
 return `${MQ_PREFIX}:${queueName}:${key}`;
};
 
export const convertToJSONString = <T>(
 data: T,
 status: JobStatuses,
): string => {
 return JSON.stringify({
 data,
 status,
 });
};

যেহেতু আমরা প্রতিবার Redis ব্যবহার করি তখন ম্যানুয়ালি আমাদের সারি নাম তৈরি করা আদর্শ নয়, তাই আমরা formatMessageQueueKey নামে একটি ইউটিলিটি তৈরি করেছি .এই ইউটিলিটি কেবল স্ট্রিংগুলিকে একত্রিত করে। উপরন্তু, রেডিস-এ সঞ্চয় করার জন্য আমাদের ডেটাকে সিরিয়ালাইজ করতে হবে - আমরা শুধুমাত্র ডেটা উৎস হিসেবে JS অবজেক্টগুলিকে পাস করতে পারি না; সেগুলিকে প্রথমে স্ট্রিং-এ রূপান্তর করতে হবে৷ ডেটা জেনেরিক হওয়ার কারণে, আমরা একটি জেনেরিক ফাংশন প্রয়োগ করেছি, convertToJSONString , এই উদ্দেশ্যে।

এখন, আমাদের প্রথম লুয়া স্ক্রিপ্ট যোগ করা যাক:

add-job.lua

--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
 
 
local jobId = ARGV[1]
local payload = ARGV[2]
 
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
 
return jobId

আমরা আমাদের Redis ইনস্ট্যান্সের সাথে এই কলগুলি স্বতন্ত্রভাবে সম্পাদন করতে পারি, নিম্নরূপ:

  • redis.hexists(jobId)
  • redis.hset(jobId,payload)
  • redis.lpush(jobId,payload)

যাইহোক, এই পদ্ধতির ফলে তিনটি পৃথক কল হবে। রেডিস সার্ভারে রাউন্ড ট্রিপ কমাতে, আমরা পুরো প্রক্রিয়াটিকে একক কলে একত্রিত করার লক্ষ্য রাখি।

আসুন আমাদের save() যোগ করি পদ্ধতি

 private createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
 
 async save(): Promise<string | null> {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
 const resJobId = (await this.config.redis.eval(
 addJobToQueueScript,
 2,
 this.createQueueKey("jobs"),
 this.createQueueKey("waiting"),
 this.id,
 convertToJSONString(this.data, this.status)
 )) as string | null;
 
 if (resJobId) {
 this.id = resJobId;
 return resJobId;
 }
 return null;
 }

কোডটি সোজা, কিন্তু আমাকে আরও স্পষ্ট করতে দিন। আমাদের লুয়া স্ক্রিপ্ট তৈরি করার পরে, আমরা redis.eval ব্যবহার করে এটিকে কল করি , যা লুয়া স্ক্রিপ্ট চালানোর জন্য প্রয়োজনীয়। redis.eval-এর প্যারামিটার নিম্নরূপ:

  • প্রথম প্যারামিটারের জন্য স্ক্রিপ্ট প্রয়োজন।
  • দ্বিতীয় প্যারামিটারটি আর্গুমেন্টের সংখ্যা নির্দিষ্ট করে।
  • তৃতীয় এবং চতুর্থ প্যারামিটারগুলি কীগুলির জন্য৷
  • অবশেষে, আমরা প্রকৃত আর্গুমেন্ট পাস করি।

আমরা আমাদের Job শেষ করার আগে ক্লাস ভবিষ্যতের জন্য আরও কয়েকটি পদ্ধতি যোগ করা যাক।

fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
 const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
 if (jobData) {
 return this.fromData<T>(jobId, jobData);
 }
 return null;
 };
 
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
 const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
 const job = new Job<T>(this.config, parsedData.data, jobId);
 job.status = parsedData.status;
 return job;
};

এই মুহূর্তে, আমাদের এই ফাংশনগুলির প্রয়োজন নাও হতে পারে, কিন্তু ভবিষ্যতে যখন আমরা কাজগুলি প্রক্রিয়াকরণ শুরু করব তখন এগুলি গুরুত্বপূর্ণ হয়ে উঠবে৷ সেই সময়ে, আমাদের কাছে শুধুমাত্র কাজের আইডি (jobId) থাকবে , এবং স্ক্র্যাচ থেকে আমাদের চাকরি পুনর্গঠনের জন্য আমাদের একটি পদ্ধতির প্রয়োজন হবে। এটা ঠিক কি fromId সম্পন্ন করে। এটি রেডিস থেকে কাজের ডেটা পুনরুদ্ধার করে, এটিকে একটি কাজের উদাহরণে রূপান্তর করে এবং এটি ফেরত দেয়, যাতে সারি পরে এই কাজটি প্রক্রিয়া করতে পারে৷

সারিতে এগিয়ে যাওয়া

save() সম্পূর্ণ করার পরে অংশ, আসুন সারির বিবরণে ডুব দেওয়া যাক। এখানে আমাদের উদ্দেশ্য:

  • আমাদের সারির লক্ষ্য হল সাফল্য বা ব্যর্থতার উপর ডেটা ধরে রাখা বা অপসারণ করা, কারণ পরবর্তীতে পুনরায় প্রক্রিয়াকরণের প্রয়োজন হতে পারে।
  • আমরা একযোগে সারি তৈরি করতে চাই, যাতে একাধিক কাজ একসাথে চলতে পারে।
  • ডাটা প্রক্রিয়াকরণের জন্য একটি কলব্যাক ফাংশন পাস করার অনুমতি দেওয়া আমাদের লক্ষ্য। এই ফাংশনটি একটি ভাল বিকাশকারী অভিজ্ঞতার জন্য কাজের ধরন অনুমান করা উচিত৷
  • আমরা Job.save() কলিং সক্ষম করার পরিকল্পনা করছি সারির মধ্যে থেকে, আমাদের রেডিস ইনস্ট্যান্স এবং queueName পাস করার অনুমতি দেয় .
  • অবশেষে, আমরা প্রয়োজনে সারিটি ধ্বংস করার ক্ষমতা নিশ্চিত করতে চাই এবং সারি থেকে একটি চাকরি সরাতে চাই।

আসুন শুরু করা যাক আমাদের সারির সংজ্ঞা দিয়ে

export type QueueConfig = {
 redis: Redis;
 queueName: string;
 keepOnSuccess?: boolean;
 keepOnFailure?: boolean;
};
 
export class Queue extends EventEmitter {
 config: QueueConfig;
 concurrency = 0;
 worker: any;
 running = 0;
 queued = 0;
 
 constructor(config: QueueConfig) {
 super();
 this.config = {
 redis: config.redis,
 queueName: config.queueName,
 keepOnFailure: config.keepOnFailure ?? true,
 keepOnSuccess: config.keepOnSuccess ?? true,
 };
 }
 
 createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
}

সারি শ্রেণীতে বাহ্যিক তথ্য গ্রহণ করার জন্য একটি কনফিগারেশন রয়েছে, যেমন সারির নাম, রেডিস ইনস্ট্যান্স, এবং ডেটা ধরে রাখার বা অপসারণ করার জন্য সেটিংস৷ আমরা যেকোনও রেডিস বাস্তবায়নকে স্বাগত জানাই যা ব্যবহারকারীরা পছন্দ করেন, যদিও আমাদের কাছে Upstash 😌 এর জন্য একটি বিশেষ অগ্রাধিকার রয়েছে৷ এই নমনীয়তা ব্যবহারকারীদের তাদের বিদ্যমান সিস্টেমে সারিকে একীভূত করতে দেয়৷

এবং, আমাদের ক্লাস ইভেন্ট ইমিটারকে প্রসারিত করে যাতে কিছু ঘটলে তাদের জানানো হয়।

এখানে একটি উদাহরণ শুরু করা হল:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "upstash-rocks",
 keepOnFailure: true,
 keepOnSuccess: true,
});

যোগ করুন

async add<T>(payload: T) {
 return new Job<T>(this.config, payload).save();
 }

আমাদের কাজটি মূল সারির কনফিগারেশনের বিশদ এবং পেলোড নেয়, যা রেডিসে সংরক্ষিত ডেটা। আমরা তখন কেবল এটি সংরক্ষণ করি।

এখন আমরা তা করতে পারি:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL!),
 queueName: "mytest-queue",
 keepOnFailure: true,
 keepOnSuccess: true,
});
 
const payload = {
 upstash: "best-redis-ever",
};
 
await queue.add(payload);

এখন, আমাদের সেগুলি প্রক্রিয়া করার একটি উপায় দরকার৷

প্রক্রিয়া হচ্ছে

এটি আমাদের সারি বাস্তবায়নের সবচেয়ে চ্যালেঞ্জিং অংশ। আমরা আমাদের ব্যবহারকারীদের একযোগে প্রক্রিয়ার সংখ্যা নির্দিষ্ট করতে চাই এবং একজন কর্মী প্রদান করতে চাই – কাজগুলি প্রক্রিয়া করার জন্য একটি কলব্যাক ফাংশন যা কাজের ধরনকে অনুমান করবে। উপরন্তু, বর্তমানে চলমান এবং সারিবদ্ধ চাকরির সংখ্যা ট্র্যাক করার জন্য আমাদের একটি ব্যবস্থা দরকার, যা আমাদেরকে সারি থেকে নিরাপদে পরবর্তী কাজ নির্বাচন করতে সক্ষম করে।

 async process<TJobPayload>(
 worker: (job: TJobPayload) => void,
 concurrency: number
 ): Promise<void> {
 this.concurrency = concurrency;
 this.worker = worker;
 this.running = 0;
 this.queued = 1;
 
 this.jobTick();
 }

জেনেরিক TJobPayload গ্রহণ করার প্রাথমিক উদ্দেশ্য আমাদের ব্যবহারকারীদের জন্য ডেভেলপার অভিজ্ঞতা উন্নত করা। আমাদের সারি ব্যবহার করার সময় তাদের বুদ্ধিমত্তা থেকে উপকৃত করতে আমাদের লক্ষ্য। ব্যবহারকারীরা সচেতন যে তারা {hello: "world"} এর মতো ডেটা সঞ্চয় করেছে। চাকরিতে, কিন্তু সঠিক বুদ্ধিমত্তা প্রদানের জন্য TypeScript-এর সহায়তা প্রয়োজন৷ এই কারণেই আমাদের কাছে এই প্রক্রিয়াটি রয়েছে, টাইপস্ক্রিপ্টকে জানাতে এবং উন্নত বিকাশকারী অভিজ্ঞতার জন্য এটিকে অনুমান করতে বাধ্য করতে৷

jobTick() এ অগ্রসর হওয়ার আগে , আসুন সাবধানে প্রক্রিয়াটি বিবেচনা করি:

  • যেহেতু আমাদের সারিটি FIFO (ফার্স্ট ইন ফার্স্ট আউট) ভিত্তিতে কাজ করে, তাই আমাদের সারির ডান দিক থেকে একটি কাজ পপ করে শুরু করতে হবে।
  • এরপর, আমরা এই কাজে আমাদের কর্মী ফাংশন চালাই।
  • কাজ শেষ হওয়ার পর, আমরা আমাদের ব্যবহারকারীকে ফলাফল প্রকাশ করি।
  • অবশেষে, আমরা jobTick() কল করি আবার পরবর্তী কাজ প্রক্রিয়া করার জন্য।

অতএব, jobTick() এই তিনটি গুরুত্বপূর্ণ অংশ নিয়ে গঠিত হবে।"

private jobTick() {
 this.getNextJob()
 .then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
 })
 .catch((error) => {
 console.error("Error in jobTick:", error);
 })
 .finally(() => {
 setImmediate(() => this.jobTick());
 });
 }

আমি ফাংশন দ্বারা ফাংশন ব্যাখ্যা করার চেষ্টা করব। চলুন শুরু করা যাক getNextJob() দিয়ে

 private async getNextJob() {
 try {
 const jobId = await this.config.redis.brpoplpush(
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 0
 );
 return jobId;
 } catch (error) {
 console.error("Error fetching the next job:", error);
 throw error;
 }
 }
 

আমরা কেবল Redis-এ একটি কল করছি, কিন্তু একটি কৌশলগত পদ্ধতির সাথে:আমরা lpush এর সাথে মিলিত একটি ব্লকিং কল ব্যবহার করি রাউন্ড ট্রিপের সংখ্যা কমাতে। একটি ব্লকিং কল ব্যবহার ইচ্ছাকৃত; আমরা অন্যান্য কর্মীদের একই সাথে একই কাজ প্রক্রিয়াকরণ থেকে বিরত রাখতে চাই, এইভাবে জাতি পরিস্থিতি এড়িয়ে চলুন। উপরন্তু, আমরা কাজগুলিকে 'অপেক্ষারত' অবস্থা থেকে 'সক্রিয়'-এ স্থানান্তরিত করি, প্রক্রিয়ার পরবর্তী ধাপগুলির জন্য কার্যকরভাবে প্রস্তুত করি৷

this.getNextJob().then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
});

এখন আমাদের কাছে jobId আছে , আমরা চলমান চাকরির সংখ্যা এক দ্বারা বৃদ্ধি করি এবং সারিবদ্ধ গণনা এক দ্বারা হ্রাস করি৷ আমরা সমকালীন সীমাকে সম্মান করার সাথে সাথে যতটা সম্ভব নতুন চাকরি শুরু করার চেষ্টা করি:

if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
}

সবকিছু ঠিকঠাক থাকলে, আমরা আমাদের Job নির্মাণে এগিয়ে যাই fromId ব্যবহার করে .একবার আমরা সফলভাবে আমাদের কাজকে এর আইডি দ্বারা পুনর্গঠন করে ফেলি, তারপরে আমরা আমাদের কর্মী ফাংশন দিয়ে কাজ সম্পাদনের দিকে এগিয়ে যাই।

চলুন executeJob এ যাওয়া যাক

private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
 let hasError = false;
 try {
 await this.worker(jobCreatedById.data);
 this.running -= 1;
 this.queued += 1;
 } catch (error) {
 hasError = true;
 } finally {
 const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
 this.emit(jobStatus, job.id);
 return;
 }
 }

এখন যেহেতু আমাদের কাছে কাজের ডেটা আছে, আমরা এই ডেটা আমাদের worker-এ পাঠাই . এটি সফলভাবে কার্যকর হলে, আমরা সারিবদ্ধ কাজের সংখ্যা এক দ্বারা বৃদ্ধি করি এবং চলমান সংখ্যা এক দ্বারা হ্রাস করি। এই পদক্ষেপ অত্যন্ত গুরুত্বপূর্ণ; সঠিকভাবে পরিচালনা না করা হলে, এটি একই সাথে নতুন চাকরি চালু করার ক্ষমতাকে প্রভাবিত করতে পারে। কর্মীর সম্পাদনের সময় কোনো ত্রুটির ক্ষেত্রে, আমরা কেবল hasError পরিবর্তন করি পতাকা অবশেষে, আমরা finishJob কল করি আমাদের jobCreatedById এর সাথে এবং hasError পতাকা এবং jobId দিয়ে স্ট্যাটাস নির্গত করুন .

সাইড নোট:ব্যবহারকারীরা এখন এই মত নির্গত আপডেট শুনতে পারেন.

queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));

চলুন finishJob এ যাওয়া যাক

private async finishJob<TJobPayload>(
 job: Job<TJobPayload>,
 hasFailed?: boolean
 ): Promise<[JobStatuses, Job<TJobPayload>]> {
 const multi = this.config.redis.multi();
 
 multi.lrem(this.createQueueKey("active"), 0, job.id);
 
 if (hasFailed) {
 if (this.config.keepOnFailure) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("failed"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "failed";
 } else {
 if (this.config.keepOnSuccess) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("succeeded"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "succeeded";
 }
 
 await multi.exec();
 return [job.status, job];
 }

এখানে একটি গুরুত্বপূর্ণ দিক হল multi() এর ব্যবহার যেহেতু আমাদের লক্ষ্য সবসময় রাউন্ড ট্রিপ কম করা। multi ব্যবহার করে , আমরা exec() কল না করা পর্যন্ত Redis মৃত্যুদন্ড স্থগিত করে .যদি ব্যবহারকারী keepOnFailure সেট করে থাকেন এবং keepOnSuccess ডেটা সংরক্ষণের জন্য, আমরা দুটি সেট তৈরি করব:একটি কাজের ডেটা সহ এবং অন্যটি এই ডেটা অ্যাক্সেস করার জন্য কাজের আইডিগুলির তালিকা সহ। এই পদ্ধতিটি সাফল্য এবং ব্যর্থতার উভয় ক্ষেত্রেই প্রযোজ্য। স্বাভাবিকভাবেই, আমরা সেই অনুযায়ী কাজের অবস্থা সামঞ্জস্য করি। অবশেষে, আমরা exec দিয়ে মাল্টি কমান্ড চালাই এবং ইভেন্ট নির্গমনের উদ্দেশ্যে কাজের স্থিতি এবং চাকরি নিজেই ফেরত দেয়।

অবশেষে, আমাদের কাছে দুটি অবশিষ্ট পদ্ধতি রয়েছে, যেগুলি আমি বিশদভাবে বর্ণনা করব না, কারণ তারা এমন ধারণাগুলি ব্যবহার করে যা আমরা ইতিমধ্যেই পরিচিত৷

 async removeJob(jobId: string) {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
 return await this.config.redis.eval(
 addJobToQueueScript,
 5,
 this.createQueueKey("succeeded"),
 this.createQueueKey("failed"),
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 this.createQueueKey("jobs"),
 jobId
 );
 }
 
 async destroy() {
 const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
 this.createQueueKey(key)
 );
 const res = await this.config.redis.del(...args);
 return res;
 }

remove-job.lua

--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
 
local jobId = ARGV[1]
 
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
 redis.call("lrem", KEYS[3], 0, jobId)
 redis.call("lrem", KEYS[4], 0, jobId)
end
 
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
 

destroy() সম্পূর্ণ সারিটি সম্পূর্ণরূপে মুছে ফেলা এবং অন্যটি হল সারি থেকে একটি নির্দিষ্ট কাজ মুছে ফেলা৷

আসুন সব কিছু কাজ করে দেখি

import { sleep } from "bun";
import Redis from "ioredis";
 
import { Queue } from "./queue";
 
type Payload = {
 id: number;
 data: string;
};
 
const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "mytest-queue",
});
 
async function main() {
 await generateQueueItems(queue, 20);
 console.log("Sleep starting for 5 sec");
 await sleep(5000);
 
 queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
 await queue.process<Payload>((job) => {
 console.log("Processing job:", job.data);
 sleep(1000);
 }, 3);
}
 
main();
 
async function generateQueueItems(queue: Queue, itemCount: number) {
 for (let i = 0; i < itemCount; i++) {
 const payload = {
 id: i,
 data: `dummy-data-${i}`,
 // Add more properties as needed for your testing
 };
 const jobId = await queue.add(payload);
 console.log(`Added item ${i} with jobId: ${jobId}`);
 }
}

বোনাস চ্যালেঞ্জ

  • Redis অ্যাক্সেস এবং কর্মী প্রক্রিয়া উভয়ের জন্যই সূচকীয় ব্যাকঅফের সাথে পুনরায় চেষ্টা করার যুক্তি প্রয়োগ করুন।
  • একটি 'অন্তত একবার' গ্যারান্টি মেকানিজম তৈরি করুন।
  • আরো ভালো পারফরম্যান্সের জন্য সার্ভিস ওয়ার্কার্সে কর্মীদের কার্যকর করার চেষ্টা করুন
  • নির্ধারিত কাজ যোগ করুন

র্যাপ আপ

কিছু শেখার সর্বোত্তম উপায় হল এটি তৈরি করা এবং আরও ভাল পদ্ধতি হল Upstash Redis ব্যবহার করে তা করা। দোলাতে থাকুন।

🔗 প্রকল্পের Github ঠিকানা


  1. HTML DOM ইনপুট চেকবক্স চেক করা সম্পত্তি

  2. লিনাক্সে ঘড়ির কমান্ড কীভাবে ব্যবহার করবেন, উদাহরণ সহ

  3. C++ এ প্রদত্ত আকারের আয়তক্ষেত্রের ভিতরে সম্ভাব্য রম্বির সংখ্যা গণনা করুন

  4. কিভাবে CSS দিয়ে একটি ফুল স্ক্রীন ভিডিও ব্যাকগ্রাউন্ড তৈরি করবেন?