আজকের পোস্টে, আমরা মজা করার জন্য একটি সাদামাটা ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেম বাস্তবায়ন করতে যাচ্ছি! Sidekiq-এর মতো জনপ্রিয় ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের অভ্যন্তরীণ দিকে উঁকি দিয়ে আমরা কিছু জিনিস শিখতে পারি। এই মজার পণ্যটি কোনভাবেই উৎপাদন ব্যবহারের উদ্দেশ্যে নয়।
আসুন কল্পনা করি আমাদের অ্যাপ্লিকেশনে একটি কাজ আছে যা এক বা একাধিক ওয়েবসাইট লোড করে এবং তাদের শিরোনাম বের করে। যেহেতু এই ওয়েবসাইটগুলির কার্যকারিতার উপর আমাদের কোন প্রভাব নেই, তাই আমরা আমাদের মূল থ্রেডের বাইরে কাজটি করতে চাই (অথবা বর্তমান অনুরোধ—যদি আমরা একটি ওয়েব অ্যাপ্লিকেশন তৈরি করছি), তবে পটভূমিতে। পি>
একটি টাস্ক এনক্যাপসুলেট করা
আমরা ব্যাকগ্রাউন্ড প্রসেসিংয়ে নামার আগে, হাতে কাজটি সম্পাদন করার জন্য একটি পরিষেবা বস্তু তৈরি করি। শিরোনাম ট্যাগের বিষয়বস্তু বের করতে আমরা OpenURI এবং Nokogiri ব্যবহার করব।
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
পরিষেবাতে কল করা প্রদত্ত URL-এর শিরোনাম প্রিন্ট করে৷
৷TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
এটি প্রত্যাশিত হিসাবে কাজ করে, তবে আসুন দেখি আমরা সিনট্যাক্সটিকে অন্য ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের মতো দেখতে এবং অনুভব করতে কিছুটা উন্নত করতে পারি কিনা। একটি Magique::Worker
তৈরি করে মডিউল, আমরা সার্ভিস অবজেক্টে কিছু সিনট্যাকটিক চিনি যোগ করতে পারি।
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
মডিউলটি একটি perform
যোগ করে কর্মীর উদাহরণের পদ্ধতি এবং একটি perform_now
আমন্ত্রণটিকে আরও ভাল করার জন্য কর্মী শ্রেণীর কাছে পদ্ধতি।
আমাদের পরিষেবা বস্তুর মধ্যে মডিউল অন্তর্ভুক্ত করা যাক। আমরা যখন এটিতে আছি, আসুন এটির নাম পরিবর্তন করে TitleExtractorWorker
রাখি এবং call
পরিবর্তন করুন perform
করার পদ্ধতি .
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
আমন্ত্রণটির এখনও একই ফলাফল রয়েছে, তবে এটি কি ঘটছে তা একটু পরিষ্কার।
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
অসিঙ্ক্রোনাস প্রসেসিং বাস্তবায়ন করা
এখন যেহেতু আমাদের শিরোনাম নিষ্কাশন কাজ করছে, আমরা অতীতের রুবি ম্যাজিক নিবন্ধগুলি থেকে সমস্ত শিরোনাম দখল করতে পারি। এটি করার জন্য, ধরা যাক আমাদের একটি RUBYMAGIC
আছে বিগত নিবন্ধগুলির সমস্ত URLগুলির একটি তালিকা সহ ধ্রুবক৷
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
আমরা অতীতের নিবন্ধগুলির শিরোনাম পেয়েছি, তবে সেগুলি বের করতে কিছুটা সময় লাগে। এর কারণ আমরা পরবর্তীতে যাওয়ার আগে প্রতিটি অনুরোধ সম্পূর্ণ না হওয়া পর্যন্ত অপেক্ষা করি।
আসুন একটি perform_async
প্রবর্তন করে এটিকে উন্নত করি আমাদের কর্মী মডিউল পদ্ধতি. জিনিসগুলিকে গতি বাড়ানোর জন্য, এটি প্রতিটি URL-এর জন্য একটি নতুন থ্রেড তৈরি করে৷
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
আমন্ত্রণটি TitleExtractorWorker.perform_async(url)
এ পরিবর্তন করার পরে , আমরা প্রায় একসাথে সব শিরোনাম পেতে. যাইহোক, এর মানে এই যে আমরা রুবি ম্যাজিক ব্লগে একসাথে 20 টির বেশি সংযোগ খুলছি। (আপনার ব্লগের সাথে বিশৃঙ্খলা করার জন্য দুঃখিত, বন্ধুরা! 😅)
যদি আপনি আপনার নিজের বাস্তবায়নের সাথে অনুসরণ করেন এবং দীর্ঘ-চলমান প্রক্রিয়ার বাইরে এটি পরীক্ষা করেন (যেমন একটি ওয়েব সার্ভার), তাহলে loop { sleep 1 }
এর মত কিছু যোগ করতে ভুলবেন না প্রক্রিয়াটি অবিলম্বে বন্ধ না হয় তা নিশ্চিত করতে আপনার স্ক্রিপ্টের শেষ পর্যন্ত৷
কাজের সারিবদ্ধ করা
প্রতিটি আহ্বানের জন্য একটি নতুন থ্রেড তৈরি করার পদ্ধতির সাথে, আমরা অবশেষে সম্পদ সীমাতে আঘাত করব (আমাদের পক্ষে এবং আমরা যে ওয়েবসাইটগুলি অ্যাক্সেস করছি উভয়েই)। যেহেতু আমরা সুন্দর নাগরিক হতে চাই, আসুন বাস্তবায়নটিকে এমন কিছুতে পরিবর্তন করি যা অ্যাসিঙ্ক্রোনাস কিন্তু পরিষেবার অস্বীকৃতির আক্রমণের মতো মনে হয় না৷
এই সমস্যাটি সমাধান করার একটি সাধারণ উপায় হল প্রযোজক/ভোক্তা প্যাটার্ন ব্যবহার করা। এক বা একাধিক প্রযোজক একটি সারিতে কাজগুলিকে ঠেলে দেয় যখন এক বা একাধিক ভোক্তা সারি থেকে কাজগুলি নেয় এবং সেগুলি প্রক্রিয়া করে৷
একটি সারি মূলত উপাদানগুলির একটি তালিকা। তাত্ত্বিকভাবে, একটি সাধারণ অ্যারে কাজ করবে। যাইহোক, যেহেতু আমরা সঙ্গতি নিয়ে কাজ করছি, আমাদের নিশ্চিত করতে হবে যে শুধুমাত্র একজন প্রযোজক বা ভোক্তা একবারে সারিতে প্রবেশ করতে পারে। আমরা যদি এই বিষয়ে সতর্ক না হই, তাহলে জিনিসগুলি বিশৃঙ্খলার মধ্যে শেষ হয়ে যাবে—যেমন দু'জন লোক একবারে একটি দরজা দিয়ে চেপে যাওয়ার চেষ্টা করছে।
এই সমস্যাটি প্রযোজক-ভোক্তা সমস্যা হিসাবে পরিচিত এবং এর একাধিক সমাধান রয়েছে। ভাগ্যক্রমে, এটি একটি খুব সাধারণ সমস্যা এবং রুবি একটি সঠিক Queue
সহ জাহাজে করে বাস্তবায়ন যা আমরা থ্রেড সিঙ্ক্রোনাইজেশন সম্পর্কে চিন্তা না করেই ব্যবহার করতে পারি।
এটি ব্যবহার করতে, আসুন নিশ্চিত করি যে প্রযোজক এবং ভোক্তা উভয়ই সারিতে অ্যাক্সেস করতে পারে। আমরা আমাদের Magique
এ একটি ক্লাস পদ্ধতি যোগ করে এটি করি মডিউল এবং Queue
এর একটি উদাহরণ বরাদ্দ করা এটিতে।
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
এরপর, আমরা আমাদের perform_async
পরিবর্তন করি নিজের নতুন থ্রেড তৈরি করার পরিবর্তে একটি টাস্ককে সারিতে ঠেলে দেওয়ার জন্য বাস্তবায়ন। একটি টাস্ককে হ্যাশ হিসাবে উপস্থাপন করা হয় যার মধ্যে কর্মী শ্রেণীর একটি রেফারেন্স এবং সেইসাথে perform_async
-এ পাস করা আর্গুমেন্টগুলি সহ পদ্ধতি।
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
এর সাথে, আমরা প্রযোজকের দিক দিয়ে কাজ শেষ করেছি। এর পরে, আসুন ভোক্তাদের দিকে নজর দেওয়া যাক।
প্রতিটি ভোক্তা একটি পৃথক থ্রেড যা সারি থেকে কাজ নেয় এবং সেগুলি সম্পাদন করে। থ্রেডের মতো একটি কাজের পরে থামার পরিবর্তে, ভোক্তা তারপর সারি থেকে অন্য একটি কাজ নেয় এবং এটি সম্পাদন করে এবং আরও অনেক কিছু। এখানে Magique::Processor
নামে একটি ভোক্তার মৌলিক বাস্তবায়ন রয়েছে . প্রতিটি প্রসেসর একটি নতুন থ্রেড তৈরি করে যা অসীমভাবে লুপ করে। প্রতিটি পুনরাবৃত্তির জন্য, এটি সারি থেকে একটি নতুন কাজ দখল করার চেষ্টা করে, কর্মী শ্রেণীর একটি নতুন উদাহরণ তৈরি করে এবং এটিকে perform
বলে। প্রদত্ত আর্গুমেন্ট সহ পদ্ধতি।
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
প্রক্রিয়াকরণ লুপ ছাড়াও, আমরা Magique::Processor.start
নামে একটি সুবিধার পদ্ধতি যোগ করি . এটি আমাদের একসাথে একাধিক প্রসেসর স্পিন আপ করতে দেয়। যদিও থ্রেডের নামকরণ সত্যিই প্রয়োজনীয় নয়, এটি আমাদের দেখার অনুমতি দেবে যে জিনিসগুলি প্রত্যাশিতভাবে কাজ করছে কিনা৷
আমাদের TitleExtractorWorker
এর আউটপুট সামঞ্জস্য করা যাক বর্তমান থ্রেডের নাম অন্তর্ভুক্ত করতে।
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
আমাদের ব্যাকগ্রাউন্ড প্রসেসিং সেটআপ পরীক্ষা করার জন্য, আমাদের কাজগুলি সারিবদ্ধ করার আগে আমাদের প্রথমে প্রসেসরের একটি সেট স্পিন আপ করতে হবে৷
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
যখন এটি চালানো হয়, তখনও আমরা সব নিবন্ধের শিরোনাম পাই। যদিও এটি প্রতিটি কাজের জন্য একটি পৃথক থ্রেড ব্যবহার করার মতো দ্রুত নয়, এটি এখনও প্রাথমিক বাস্তবায়নের চেয়ে দ্রুততর যার কোনও ব্যাকগ্রাউন্ড প্রক্রিয়াকরণ ছিল না। যোগ করা প্রসেসরের নামগুলির জন্য ধন্যবাদ, আমরা নিশ্চিত করতে পারি যে সমস্ত প্রসেসর সারির মাধ্যমে কাজ করছে। সমসাময়িক প্রসেসরের সংখ্যা পরিবর্তন করে, প্রক্রিয়াকরণের গতি এবং বিদ্যমান সম্পদের সীমাবদ্ধতার মধ্যে ভারসাম্য খুঁজে পাওয়া সম্ভব।
একাধিক প্রক্রিয়া এবং মেশিনে প্রসারিত করা
এখন পর্যন্ত, আমাদের পটভূমি প্রক্রিয়াকরণ সিস্টেমের বর্তমান বাস্তবায়ন যথেষ্ট ভাল কাজ করে। যদিও এটি এখনও একই প্রক্রিয়ার মধ্যে সীমাবদ্ধ। সম্পদ-ক্ষুধার্ত কাজগুলি এখনও সমগ্র প্রক্রিয়ার কর্মক্ষমতা প্রভাবিত করবে। একটি চূড়ান্ত পদক্ষেপ হিসাবে, চলুন একাধিক প্রক্রিয়া এবং এমনকি একাধিক মেশিনে কাজের চাপ বিতরণের দিকে নজর দেওয়া যাক৷
সারি হল উৎপাদক এবং ভোক্তাদের মধ্যে একমাত্র সংযোগ। এই মুহূর্তে, এটি একটি ইন-মেমরি বাস্তবায়ন ব্যবহার করছে। আসুন Sidekiq থেকে আরও অনুপ্রেরণা গ্রহণ করি এবং Redis ব্যবহার করে একটি সারি বাস্তবায়ন করি।
Redis-এর সেই তালিকাগুলির জন্য সমর্থন রয়েছে যা আমাদের কাজগুলি পুশ করতে এবং আনতে দেয়৷ উপরন্তু, Redis রুবি রত্ন থ্রেড-নিরাপদ এবং তালিকা সংশোধন করার জন্য Redis কমান্ডগুলি পারমাণবিক। এই বৈশিষ্ট্যগুলি আমাদের অ্যাসিঙ্ক্রোনাস ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের জন্য সিঙ্ক্রোনাইজেশন সমস্যায় না গিয়ে এটি ব্যবহার করা সম্ভব করে৷
আসুন একটি Redis ব্যাকড সারি তৈরি করি যা push
প্রয়োগ করে এবং shift
Queue
মত পদ্ধতি আমরা আগে ব্যবহার করেছি।
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
যেহেতু রেডিস রুবি অবজেক্ট সম্পর্কে কিছুই জানে না, তাই lpush
ব্যবহার করে ডাটাবেসে সংরক্ষণ করার আগে আমাদের কাজগুলিকে JSON-এ সিরিয়ালাইজ করতে হবে। কমান্ড যা তালিকার সামনে একটি উপাদান যোগ করে।
সারি থেকে একটি টাস্ক আনতে, আমরা brpop
ব্যবহার করছি কমান্ড, যা একটি তালিকা থেকে শেষ উপাদান পায়। তালিকাটি খালি থাকলে, একটি নতুন উপাদান উপলব্ধ না হওয়া পর্যন্ত এটি ব্লক করা হবে। কোনো কাজ উপলব্ধ না থাকলে আমাদের প্রসেসরকে বিরতি দেওয়ার এটি একটি চমৎকার উপায়। অবশেষে, রেডিস থেকে একটি টাস্ক পাওয়ার পর, আমাদের Object.const_get
ব্যবহার করে কর্মীর নামের উপর ভিত্তি করে আসল রুবি ক্লাস খুঁজতে হবে। .
একটি চূড়ান্ত পদক্ষেপ হিসাবে, আসুন জিনিসগুলিকে একাধিক প্রক্রিয়ায় বিভক্ত করি। জিনিসগুলির প্রযোজকের দিক থেকে, শুধুমাত্র আমাদের যা করতে হবে তা হল ব্যাকএন্ডকে আমাদের নতুন প্রয়োগ করা Redis সারিতে পরিবর্তন করা৷
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
জিনিসের ভোক্তা দিক থেকে, আমরা এইরকম কয়েকটি লাইন দিয়ে দূরে যেতে পারি:
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
সম্পাদিত হলে, ভোক্তা প্রক্রিয়া সারিতে আসার জন্য নতুন কাজের জন্য অপেক্ষা করবে। একবার আমরা প্রযোজক প্রক্রিয়া শুরু করি যা কাজগুলিকে সারিতে ঠেলে দেয়, আমরা দেখতে পারি যে সেগুলি অবিলম্বে প্রক্রিয়া করা হয়৷
দায়িত্বের সাথে উপভোগ করুন এবং এটি উৎপাদনে ব্যবহার করবেন না
যদিও আমরা এটিকে একটি বাস্তব বিশ্বের সেটআপ থেকে দূরে রেখেছি যা আপনি উত্পাদনে ব্যবহার করবেন (তাই করবেন না!), আমরা একটি পটভূমি প্রসেসর তৈরিতে কয়েকটি পদক্ষেপ নিয়েছি। আমরা ব্যাকগ্রাউন্ড পরিষেবা হিসাবে একটি প্রক্রিয়া চালানোর মাধ্যমে শুরু করেছি। তারপর আমরা এটিকে অ্যাসিঙ্ক করেছি এবং Queue
ব্যবহার করেছি উৎপাদক-ভোক্তা সমস্যা সমাধানের জন্য। তারপরে আমরা রেডিস ব্যবহার করে প্রক্রিয়াটিকে একাধিক প্রক্রিয়া বা মেশিনে প্রসারিত করেছি, এর পরিবর্তে একটি ইন-মেমরি বাস্তবায়ন।
পূর্বে উল্লিখিত হিসাবে, এটি একটি পটভূমি প্রক্রিয়াকরণ সিস্টেমের একটি সরলীকৃত বাস্তবায়ন। অনেক কিছু অনুপস্থিত এবং স্পষ্টভাবে মোকাবেলা করা হয় না. এর মধ্যে রয়েছে (তবে সীমাবদ্ধ নয়) ত্রুটি পরিচালনা, একাধিক সারি, সময়সূচী, সংযোগ পুলিং এবং সিগন্যাল হ্যান্ডলিং৷
তবুও, আমরা এটি লিখতে মজা পেয়েছি এবং আশা করি আপনি একটি ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের হুডের নীচে উঁকিঝুঁকি উপভোগ করেছেন। সম্ভবত আপনি একটি বা দুটি জিনিস নিয়ে গেছেন।