কম্পিউটার

রুবিতে একটি ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেম তৈরি করে শেখা

আজকের পোস্টে, আমরা মজা করার জন্য একটি সাদামাটা ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেম বাস্তবায়ন করতে যাচ্ছি! Sidekiq-এর মতো জনপ্রিয় ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের অভ্যন্তরীণ দিকে উঁকি দিয়ে আমরা কিছু জিনিস শিখতে পারি। এই মজার পণ্যটি কোনভাবেই উৎপাদন ব্যবহারের উদ্দেশ্যে নয়।

আসুন কল্পনা করি আমাদের অ্যাপ্লিকেশনে একটি কাজ আছে যা এক বা একাধিক ওয়েবসাইট লোড করে এবং তাদের শিরোনাম বের করে। যেহেতু এই ওয়েবসাইটগুলির কার্যকারিতার উপর আমাদের কোন প্রভাব নেই, তাই আমরা আমাদের মূল থ্রেডের বাইরে কাজটি করতে চাই (অথবা বর্তমান অনুরোধ—যদি আমরা একটি ওয়েব অ্যাপ্লিকেশন তৈরি করছি), তবে পটভূমিতে। পি>

একটি টাস্ক এনক্যাপসুলেট করা

আমরা ব্যাকগ্রাউন্ড প্রসেসিংয়ে নামার আগে, হাতে কাজটি সম্পাদন করার জন্য একটি পরিষেবা বস্তু তৈরি করি। শিরোনাম ট্যাগের বিষয়বস্তু বের করতে আমরা OpenURI এবং Nokogiri ব্যবহার করব।

require 'open-uri'
require 'nokogiri'
 
class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

পরিষেবাতে কল করা প্রদত্ত URL-এর শিরোনাম প্রিন্ট করে৷

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

এটি প্রত্যাশিত হিসাবে কাজ করে, তবে আসুন দেখি আমরা সিনট্যাক্সটিকে অন্য ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের মতো দেখতে এবং অনুভব করতে কিছুটা উন্নত করতে পারি কিনা। একটি Magique::Worker তৈরি করে মডিউল, আমরা সার্ভিস অবজেক্টে কিছু সিনট্যাকটিক চিনি যোগ করতে পারি।

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end
 
    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end
 
    def perform(*)
      raise NotImplementedError
    end
  end
end

মডিউলটি একটি perform যোগ করে কর্মীর উদাহরণের পদ্ধতি এবং একটি perform_now আমন্ত্রণটিকে আরও ভাল করার জন্য কর্মী শ্রেণীর কাছে পদ্ধতি।

আমাদের পরিষেবা বস্তুর মধ্যে মডিউল অন্তর্ভুক্ত করা যাক। আমরা যখন এটিতে আছি, আসুন এটির নাম পরিবর্তন করে TitleExtractorWorker রাখি এবং call পরিবর্তন করুন perform করার পদ্ধতি .

class TitleExtractorWorker
  include Magique::Worker
 
  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

আমন্ত্রণটির এখনও একই ফলাফল রয়েছে, তবে এটি কি ঘটছে তা একটু পরিষ্কার।

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

অসিঙ্ক্রোনাস প্রসেসিং বাস্তবায়ন করা

এখন যেহেতু আমাদের শিরোনাম নিষ্কাশন কাজ করছে, আমরা অতীতের রুবি ম্যাজিক নিবন্ধগুলি থেকে সমস্ত শিরোনাম দখল করতে পারি। এটি করার জন্য, ধরা যাক আমাদের একটি RUBYMAGIC আছে বিগত নিবন্ধগুলির সমস্ত URLগুলির একটি তালিকা সহ ধ্রুবক৷

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end
 
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

আমরা অতীতের নিবন্ধগুলির শিরোনাম পেয়েছি, তবে সেগুলি বের করতে কিছুটা সময় লাগে। এর কারণ আমরা পরবর্তীতে যাওয়ার আগে প্রতিটি অনুরোধ সম্পূর্ণ না হওয়া পর্যন্ত অপেক্ষা করি।

আসুন একটি perform_async প্রবর্তন করে এটিকে উন্নত করি আমাদের কর্মী মডিউল পদ্ধতি. জিনিসগুলিকে গতি বাড়ানোর জন্য, এটি প্রতিটি URL-এর জন্য একটি নতুন থ্রেড তৈরি করে৷

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

আমন্ত্রণটি TitleExtractorWorker.perform_async(url) এ পরিবর্তন করার পরে , আমরা প্রায় একসাথে সব শিরোনাম পেতে. যাইহোক, এর মানে এই যে আমরা রুবি ম্যাজিক ব্লগে একসাথে 20 টির বেশি সংযোগ খুলছি। (আপনার ব্লগের সাথে বিশৃঙ্খলা করার জন্য দুঃখিত, বন্ধুরা! 😅)

যদি আপনি আপনার নিজের বাস্তবায়নের সাথে অনুসরণ করেন এবং দীর্ঘ-চলমান প্রক্রিয়ার বাইরে এটি পরীক্ষা করেন (যেমন একটি ওয়েব সার্ভার), তাহলে loop { sleep 1 } এর মত কিছু যোগ করতে ভুলবেন না প্রক্রিয়াটি অবিলম্বে বন্ধ না হয় তা নিশ্চিত করতে আপনার স্ক্রিপ্টের শেষ পর্যন্ত৷

কাজের সারিবদ্ধ করা

প্রতিটি আহ্বানের জন্য একটি নতুন থ্রেড তৈরি করার পদ্ধতির সাথে, আমরা অবশেষে সম্পদ সীমাতে আঘাত করব (আমাদের পক্ষে এবং আমরা যে ওয়েবসাইটগুলি অ্যাক্সেস করছি উভয়েই)। যেহেতু আমরা সুন্দর নাগরিক হতে চাই, আসুন বাস্তবায়নটিকে এমন কিছুতে পরিবর্তন করি যা অ্যাসিঙ্ক্রোনাস কিন্তু পরিষেবার অস্বীকৃতির আক্রমণের মতো মনে হয় না৷

এই সমস্যাটি সমাধান করার একটি সাধারণ উপায় হল প্রযোজক/ভোক্তা প্যাটার্ন ব্যবহার করা। এক বা একাধিক প্রযোজক একটি সারিতে কাজগুলিকে ঠেলে দেয় যখন এক বা একাধিক ভোক্তা সারি থেকে কাজগুলি নেয় এবং সেগুলি প্রক্রিয়া করে৷

একটি সারি মূলত উপাদানগুলির একটি তালিকা। তাত্ত্বিকভাবে, একটি সাধারণ অ্যারে কাজ করবে। যাইহোক, যেহেতু আমরা সঙ্গতি নিয়ে কাজ করছি, আমাদের নিশ্চিত করতে হবে যে শুধুমাত্র একজন প্রযোজক বা ভোক্তা একবারে সারিতে প্রবেশ করতে পারে। আমরা যদি এই বিষয়ে সতর্ক না হই, তাহলে জিনিসগুলি বিশৃঙ্খলার মধ্যে শেষ হয়ে যাবে—যেমন দু'জন লোক একবারে একটি দরজা দিয়ে চেপে যাওয়ার চেষ্টা করছে।

এই সমস্যাটি প্রযোজক-ভোক্তা সমস্যা হিসাবে পরিচিত এবং এর একাধিক সমাধান রয়েছে। ভাগ্যক্রমে, এটি একটি খুব সাধারণ সমস্যা এবং রুবি একটি সঠিক Queue সহ জাহাজে করে বাস্তবায়ন যা আমরা থ্রেড সিঙ্ক্রোনাইজেশন সম্পর্কে চিন্তা না করেই ব্যবহার করতে পারি।

এটি ব্যবহার করতে, আসুন নিশ্চিত করি যে প্রযোজক এবং ভোক্তা উভয়ই সারিতে অ্যাক্সেস করতে পারে। আমরা আমাদের Magique এ একটি ক্লাস পদ্ধতি যোগ করে এটি করি মডিউল এবং Queue এর একটি উদাহরণ বরাদ্দ করা এটিতে।

module Magique
  def self.backend
    @backend
  end
 
  def self.backend=(backend)
    @backend = backend
  end
end
 
Magique.backend = Queue.new

এরপর, আমরা আমাদের perform_async পরিবর্তন করি নিজের নতুন থ্রেড তৈরি করার পরিবর্তে একটি টাস্ককে সারিতে ঠেলে দেওয়ার জন্য বাস্তবায়ন। একটি টাস্ককে হ্যাশ হিসাবে উপস্থাপন করা হয় যার মধ্যে কর্মী শ্রেণীর একটি রেফারেন্স এবং সেইসাথে perform_async-এ পাস করা আর্গুমেন্টগুলি সহ পদ্ধতি।

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

এর সাথে, আমরা প্রযোজকের দিক দিয়ে কাজ শেষ করেছি। এর পরে, আসুন ভোক্তাদের দিকে নজর দেওয়া যাক।

প্রতিটি ভোক্তা একটি পৃথক থ্রেড যা সারি থেকে কাজ নেয় এবং সেগুলি সম্পাদন করে। থ্রেডের মতো একটি কাজের পরে থামার পরিবর্তে, ভোক্তা তারপর সারি থেকে অন্য একটি কাজ নেয় এবং এটি সম্পাদন করে এবং আরও অনেক কিছু। এখানে Magique::Processor নামে একটি ভোক্তার মৌলিক বাস্তবায়ন রয়েছে . প্রতিটি প্রসেসর একটি নতুন থ্রেড তৈরি করে যা অসীমভাবে লুপ করে। প্রতিটি পুনরাবৃত্তির জন্য, এটি সারি থেকে একটি নতুন কাজ দখল করার চেষ্টা করে, কর্মী শ্রেণীর একটি নতুন উদাহরণ তৈরি করে এবং এটিকে perform বলে। প্রদত্ত আর্গুমেন্ট সহ পদ্ধতি।

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end
 
    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end
 
      thread.name = name
    end
  end
end

প্রক্রিয়াকরণ লুপ ছাড়াও, আমরা Magique::Processor.start নামে একটি সুবিধার পদ্ধতি যোগ করি . এটি আমাদের একসাথে একাধিক প্রসেসর স্পিন আপ করতে দেয়। যদিও থ্রেডের নামকরণ সত্যিই প্রয়োজনীয় নয়, এটি আমাদের দেখার অনুমতি দেবে যে জিনিসগুলি প্রত্যাশিতভাবে কাজ করছে কিনা৷

আমাদের TitleExtractorWorker এর আউটপুট সামঞ্জস্য করা যাক বর্তমান থ্রেডের নাম অন্তর্ভুক্ত করতে।

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

আমাদের ব্যাকগ্রাউন্ড প্রসেসিং সেটআপ পরীক্ষা করার জন্য, আমাদের কাজগুলি সারিবদ্ধ করার আগে আমাদের প্রথমে প্রসেসরের একটি সেট স্পিন আপ করতে হবে৷

Magique.backend = Queue.new
Magique::Processor.start(5)
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end
 
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

যখন এটি চালানো হয়, তখনও আমরা সব নিবন্ধের শিরোনাম পাই। যদিও এটি প্রতিটি কাজের জন্য একটি পৃথক থ্রেড ব্যবহার করার মতো দ্রুত নয়, এটি এখনও প্রাথমিক বাস্তবায়নের চেয়ে দ্রুততর যার কোনও ব্যাকগ্রাউন্ড প্রক্রিয়াকরণ ছিল না। যোগ করা প্রসেসরের নামগুলির জন্য ধন্যবাদ, আমরা নিশ্চিত করতে পারি যে সমস্ত প্রসেসর সারির মাধ্যমে কাজ করছে। সমসাময়িক প্রসেসরের সংখ্যা পরিবর্তন করে, প্রক্রিয়াকরণের গতি এবং বিদ্যমান সম্পদের সীমাবদ্ধতার মধ্যে ভারসাম্য খুঁজে পাওয়া সম্ভব।

একাধিক প্রক্রিয়া এবং মেশিনে প্রসারিত করা

এখন পর্যন্ত, আমাদের পটভূমি প্রক্রিয়াকরণ সিস্টেমের বর্তমান বাস্তবায়ন যথেষ্ট ভাল কাজ করে। যদিও এটি এখনও একই প্রক্রিয়ার মধ্যে সীমাবদ্ধ। সম্পদ-ক্ষুধার্ত কাজগুলি এখনও সমগ্র প্রক্রিয়ার কর্মক্ষমতা প্রভাবিত করবে। একটি চূড়ান্ত পদক্ষেপ হিসাবে, চলুন একাধিক প্রক্রিয়া এবং এমনকি একাধিক মেশিনে কাজের চাপ বিতরণের দিকে নজর দেওয়া যাক৷

সারি হল উৎপাদক এবং ভোক্তাদের মধ্যে একমাত্র সংযোগ। এই মুহূর্তে, এটি একটি ইন-মেমরি বাস্তবায়ন ব্যবহার করছে। আসুন Sidekiq থেকে আরও অনুপ্রেরণা গ্রহণ করি এবং Redis ব্যবহার করে একটি সারি বাস্তবায়ন করি।

Redis-এর সেই তালিকাগুলির জন্য সমর্থন রয়েছে যা আমাদের কাজগুলি পুশ করতে এবং আনতে দেয়৷ উপরন্তু, Redis রুবি রত্ন থ্রেড-নিরাপদ এবং তালিকা সংশোধন করার জন্য Redis কমান্ডগুলি পারমাণবিক। এই বৈশিষ্ট্যগুলি আমাদের অ্যাসিঙ্ক্রোনাস ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের জন্য সিঙ্ক্রোনাইজেশন সমস্যায় না গিয়ে এটি ব্যবহার করা সম্ভব করে৷

আসুন একটি Redis ব্যাকড সারি তৈরি করি যা push প্রয়োগ করে এবং shift Queue মত পদ্ধতি আমরা আগে ব্যবহার করেছি।

require 'json'
require 'redis'
 
module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end
 
      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end
 
      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

যেহেতু রেডিস রুবি অবজেক্ট সম্পর্কে কিছুই জানে না, তাই lpush ব্যবহার করে ডাটাবেসে সংরক্ষণ করার আগে আমাদের কাজগুলিকে JSON-এ সিরিয়ালাইজ করতে হবে। কমান্ড যা তালিকার সামনে একটি উপাদান যোগ করে।

সারি থেকে একটি টাস্ক আনতে, আমরা brpop ব্যবহার করছি কমান্ড, যা একটি তালিকা থেকে শেষ উপাদান পায়। তালিকাটি খালি থাকলে, একটি নতুন উপাদান উপলব্ধ না হওয়া পর্যন্ত এটি ব্লক করা হবে। কোনো কাজ উপলব্ধ না থাকলে আমাদের প্রসেসরকে বিরতি দেওয়ার এটি একটি চমৎকার উপায়। অবশেষে, রেডিস থেকে একটি টাস্ক পাওয়ার পর, আমাদের Object.const_get ব্যবহার করে কর্মীর নামের উপর ভিত্তি করে আসল রুবি ক্লাস খুঁজতে হবে। .

একটি চূড়ান্ত পদক্ষেপ হিসাবে, আসুন জিনিসগুলিকে একাধিক প্রক্রিয়ায় বিভক্ত করি। জিনিসগুলির প্রযোজকের দিক থেকে, শুধুমাত্র আমাদের যা করতে হবে তা হল ব্যাকএন্ডকে আমাদের নতুন প্রয়োগ করা Redis সারিতে পরিবর্তন করা৷

# ...
 
Magique.backend = Magique::Backend::Redis.new
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

জিনিসের ভোক্তা দিক থেকে, আমরা এইরকম কয়েকটি লাইন দিয়ে দূরে যেতে পারি:

# ...
 
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
 
loop { sleep 1 }

সম্পাদিত হলে, ভোক্তা প্রক্রিয়া সারিতে আসার জন্য নতুন কাজের জন্য অপেক্ষা করবে। একবার আমরা প্রযোজক প্রক্রিয়া শুরু করি যা কাজগুলিকে সারিতে ঠেলে দেয়, আমরা দেখতে পারি যে সেগুলি অবিলম্বে প্রক্রিয়া করা হয়৷

দায়িত্বের সাথে উপভোগ করুন এবং এটি উৎপাদনে ব্যবহার করবেন না

যদিও আমরা এটিকে একটি বাস্তব বিশ্বের সেটআপ থেকে দূরে রেখেছি যা আপনি উত্পাদনে ব্যবহার করবেন (তাই করবেন না!), আমরা একটি পটভূমি প্রসেসর তৈরিতে কয়েকটি পদক্ষেপ নিয়েছি। আমরা ব্যাকগ্রাউন্ড পরিষেবা হিসাবে একটি প্রক্রিয়া চালানোর মাধ্যমে শুরু করেছি। তারপর আমরা এটিকে অ্যাসিঙ্ক করেছি এবং Queue ব্যবহার করেছি উৎপাদক-ভোক্তা সমস্যা সমাধানের জন্য। তারপরে আমরা রেডিস ব্যবহার করে প্রক্রিয়াটিকে একাধিক প্রক্রিয়া বা মেশিনে প্রসারিত করেছি, এর পরিবর্তে একটি ইন-মেমরি বাস্তবায়ন।

পূর্বে উল্লিখিত হিসাবে, এটি একটি পটভূমি প্রক্রিয়াকরণ সিস্টেমের একটি সরলীকৃত বাস্তবায়ন। অনেক কিছু অনুপস্থিত এবং স্পষ্টভাবে মোকাবেলা করা হয় না. এর মধ্যে রয়েছে (তবে সীমাবদ্ধ নয়) ত্রুটি পরিচালনা, একাধিক সারি, সময়সূচী, সংযোগ পুলিং এবং সিগন্যাল হ্যান্ডলিং৷

তবুও, আমরা এটি লিখতে মজা পেয়েছি এবং আশা করি আপনি একটি ব্যাকগ্রাউন্ড প্রসেসিং সিস্টেমের হুডের নীচে উঁকিঝুঁকি উপভোগ করেছেন। সম্ভবত আপনি একটি বা দুটি জিনিস নিয়ে গেছেন।


  1. রুবিতে একটি নতুন প্রোগ্রামিং ভাষা তৈরি করা:দোভাষী

  2. RBS, Rubys নতুন টাইপ টীকা সিস্টেম বোঝা

  3. একটি কেগ স্কেল সতর্কতা সিস্টেম তৈরি করা

  4. রুবি 2.6-এ 9টি নতুন বৈশিষ্ট্য