বেশিরভাগ ওয়েব অ্যাপ একটি ব্যাকগ্রাউন্ড সারি থেকে উপকৃত হতে পারে, যা প্রায়ই ত্রুটি-প্রবণ বা সময়-সাপেক্ষ পার্শ্ব কাজগুলি প্রক্রিয়া করতে ব্যবহৃত হয়৷ এই ব্যাকগ্রাউন্ড কাজগুলি ইমেল পাঠানো থেকে শুরু করে ক্যাশে আপডেট করা থেকে শুরু করে মূল ব্যবসায়িক যুক্তি সম্পাদন করা পর্যন্ত পরিবর্তিত হতে পারে৷
যেকোন ব্যাকগ্রাউন্ড সারিবদ্ধ সিস্টেম যেহেতু এটি প্রক্রিয়া করার জন্য প্রয়োজনীয় কাজের সংখ্যা স্কেল করে, সেই কাজগুলি প্রক্রিয়াকরণকারী কর্মীদের পুলকেও স্কেল করতে হবে৷ যে ক্ষেত্রে সারিবদ্ধ কাজের হার পরিবর্তিত হয়, সেখানে সারিবদ্ধ কর্মীদের সংখ্যা বৃদ্ধি করা একটি মূল দিক হয়ে ওঠে৷ প্রক্রিয়াকরণের গতি বজায় রাখার জন্য। উপরন্তু, কম সারি থ্রুপুটের সময় কর্মীদের স্কেল করা উল্লেখযোগ্য সঞ্চয় প্রদান করতে পারে!
দুর্ভাগ্যবশত, অনেক সারিবদ্ধ ব্যাকএন্ড কর্মীদের চালু বা বন্ধ করার জন্য স্কেলিং লজিক দিয়ে সজ্জিত হয় না৷ কিন্তু আমরা সারিতে অপেক্ষা করা কাজের উপর ভিত্তি করে আমাদের সর্বোত্তম কর্মী গণনা খুঁজে পেতে কিছু সাধারণ গণিত এবং কর্মক্ষমতা ডেটা ব্যবহার করতে পারি৷
অঙ্গুষ্ঠের সারিবদ্ধ নিয়ম
যদি কাজগুলি সারিবদ্ধ কর্মীদের দ্বারা প্রক্রিয়াকরণের চেয়ে বেশি হারে সারিবদ্ধ করা হয় তবে সারির গভীরতা বাড়বে এবং প্রতিটি কাজ সারিতে যে সময় ব্যয় করবে তাও বাড়বে৷ সাধারণত, আমরা অপেক্ষার সময় চাই (সময়ের পরিমাণ সারি) প্রতিটি কাজের জন্য যতটা সম্ভব কম হতে হবে — 0 সেকেন্ড থেকে কিছু গ্রহণযোগ্য সীমা পর্যন্ত। একটি পছন্দসই অপেক্ষার সময় সন্তুষ্ট করার জন্য প্রয়োজনীয় কর্মীদের সংখ্যা অনুমান করতে, আমরা থাম্বের সারিবদ্ধ নিয়ম (QROT) ব্যবহার করতে পারি। সাধারণত, QROT-কে একটি অসমতা হিসাবে প্রকাশ করা হয় যেখানে কাজের একটি সারি পরিষেবা দেওয়ার জন্য প্রয়োজনীয় সার্ভারের সংখ্যা বর্ণনা করা হয়, তবে একটি ফর্ম এইভাবে লেখা যেতে পারে:
workers = (number_of_jobs * avg_service_time_per_job) / time_to_finish_queue
সুতরাং, যদি আমরা 30 সেকেন্ডের একটি কাঙ্ক্ষিত সময়ে আমাদের সারিতে পরিষেবা দেওয়ার জন্য প্রয়োজনীয় কর্মীদের সংখ্যা বের করতে চাই, তাহলে আমাদের কেবল কাজের সংখ্যা (সারির আকার) এবং গড় সময়টি জানতে হবে প্রতিটি কাজ সম্পাদন করুন৷ উদাহরণস্বরূপ, যদি আমাদের 7500টি কাজের একটি সারি থাকে এবং প্রতিটি কাজ সম্পাদন করতে গড়ে 0.3 সেকেন্ড সময় নেয়, তাহলে আমরা 75 জন কর্মী দিয়ে সেই সারিটি 30 সেকেন্ডের মধ্যে শেষ করতে পারি৷
পারফরম্যান্স মেট্রিক্স অ্যাক্সেস করা
সারিতে থাকা কাজের জন্য গড় পরিষেবা সময় অনুমান করার জন্য, আমাদের প্রতিটি কাজের ক্লাসের জন্য পারফরম্যান্স মেট্রিক্সে অ্যাক্সেসের প্রয়োজন। সৌভাগ্যবশত, অ্যাপসিগন্যাল সাধারণ সারিবদ্ধ ব্যাকএন্ডের জন্য পারফরম্যান্স ডেটা রেকর্ড করে, প্রতিবার মেট্রিক্স রেকর্ড করে কাজ সম্পাদন করা হয়েছে৷
আমরা আসন্ন AppSignal GraphQL API ব্যবহার করতে পারি গত 24 ঘন্টা ধরে প্রতিটি কাজের প্রকারের গড় সময়কাল পেতে৷ এই API এখনও সম্পূর্ণরূপে সর্বজনীন নয়, যদিও এটি বর্তমানে AppSignal-এর পারফরম্যান্স গ্রাফ এবং অন্যান্য ডেটা প্রদর্শনের জন্য ব্যবহৃত হয়৷ ভাগ্যক্রমে, GraphQL APIগুলি হল স্ব-ডকুমেন্টিং করার উদ্দেশ্যে, এবং আমরা API-কে অন্তর্নিহিত করতে এবং এটি কোন ডেটা অবজেক্ট প্রকাশ করে তা খুঁজে বের করতে GraphiQL এর মতো একটি টুল ব্যবহার করতে পারি।
একটি GraphQL ক্যোয়ারী তৈরির প্রক্রিয়াটি এই পোস্টের সুযোগের বাইরে, কিন্তু নীচে একটি উদাহরণ রুবি ক্লাস যা অ্যাপসিগন্যাল গ্রাফকিউএল এপিআই-এর সাথে সংযোগ করে যা জনপ্রিয় ফ্যারাডে HTTP ক্লায়েন্ট লাইব্রেরি ব্যবহার করে একটি মৌলিক মেট্রিক্স একত্রিতকরণের জন্য অনুসন্ধান করে৷
require 'json'
require 'faraday'
class AppsignalClient
BASE_URL = 'https://appsignal.com/'
DEFAULT_APP_ID = ENV['APPSIGNAL_APP_ID']
DEFAULT_TOKEN = ENV['APPSIGNAL_API_TOKEN']
# GraphQL query to fetch the "mean" metric for the selected app.
METRICS_QUERY = <<~GRAPHQL.freeze
query($appId: String!, $query: [MetricAggregation!]!, $timeframe: TimeframeEnum!) {
app(id: $appId) {
metrics {
list(timeframe: $timeframe, query: $query) {
start
end
rows {
fields {
key
value
}
}
}
}
}
}
GRAPHQL
def initialize(app_id: DEFAULT_APP_ID, client_secret: DEFAULT_TOKEN)
@app_id = app_id
@client_secret = client_secret
end
# Fetch the average duration for a job class's perform action
# Default timeframe is last 24 hours
def average_job_duration(job_class, timeframe: 'R24H')
response =
connection.post(
'graphql',
JSON.dump(
query: METRICS_QUERY,
variables: {
appId: @app_id,
timeframe: timeframe,
query: [
name: 'transaction_duration',
headerType: legacy
tags: [
{ key: 'namespace', value: 'background' },
{ key: 'action', value: "#{job_class.name}#perform" },
],
fields: [{ field: 'MEAN', aggregate: 'AVG' }],
],
}
)
)
data = JSON.parse(response.body, symbolize_names: true)
rows = data.dig(:data, :app, :metrics, :list, :rows)
# There may be no metrics in the selected timeframe
return 0.0 if rows.empty?
rows.first[:fields].first[:value]
end
private
def connection
@connection ||= Faraday.new(
url: BASE_URL,
params: { token: @client_secret },
headers: { 'Content-Type' => 'application/json' },
request: { timeout: 10 }
) do |faraday|
faraday.response :raise_error
faraday.adapter Faraday.default_adapter
end
end
end
এই ক্লাসের মাধ্যমে, আমরা একটি প্রদত্ত ActiveJob ক্লাসের জন্য একটি গড় কাজের সময়কাল পেতে পারি, যা আমাদের কাছে মিলিসেকেন্ডে ফিরে আসে:
AppsignalClient.new.average_job_duration(MyMailerJob)
# => 233.1
ডিফল্টরূপে, এটি গত 24 ঘন্টার ডেটাতে কাজের গড় লেনদেনের সময়কালের জন্য কল করে৷ যদি আমাদের কাজ(গুলি) এর থেকে অনেক বেশি ঘন ঘন সম্পাদিত হয়, আমরা সেই উইন্ডোটিকে ছোট করতে চাই, আমাদের সাম্প্রতিক মৃত্যুদণ্ডগুলিকে আরও ভারীভাবে ওজন করে গড়ে এক ঘন্টা পর্যন্ত (R1H
) এখনই কার্যকর করা হলে এরকম একটি কাজের সময়কাল আরও ভালভাবে অনুমান করতে।
মনে রাখবেন যে এই কর্মক্ষমতা ডেটা আমাদের সার্ভার ব্যবহার ডেটা থেকে আলাদা৷ এই ডেটাটি আমাদের বলে যে প্রতিটি কাজের জন্য প্রয়োজনীয় কাজটি করতে আসলে কতক্ষণ লাগবে৷ এটি আমাদের কর্মীদের স্কেল করার ক্ষেত্রে ব্যবহারিক মেট্রিক্সের মতো বাহ্যিক পরিমাপের চেয়ে বেশি কার্যকর হবে৷ .
সারির অন্তর্নিহিতকরণ
এরপরে, আমাদের সারিবদ্ধ কাজগুলিকে পরিসেবা করা হবে তা নির্ধারণ করার জন্য আমাদের সারিতে আত্মবিশ্লেষণ করতে হবে৷ একটি সাধারণ রুবি সারিবদ্ধ ব্যাকএন্ড হল Resque, যা ActiveJob-এর সাথেও সুন্দরভাবে একত্রিত হয়৷ আমরা Resque-এ প্রদত্ত সারির জন্য সারিবদ্ধ কাজগুলি অ্যাক্সেস করতে পারি এবং তারপরে কার্য সম্পাদনের সময় অনুমান করতে পারি৷ আমাদের AppsignalClient
ব্যবহার করে প্রতিটি কাজ তার ক্লাসের উপর ভিত্তি করে উপরে থেকে ক্লাস।
require 'resque'
class ResqueEstimator
def initialize(queue: 'default')
@queue = queue
@cache = {}
@appsignal_client = AppsignalClient.new
end
def enqueued_duration_estimate
Resque.data_store.everything_in_queue(queue).map do |job|
estimate_job_duration decode_activejob_args(job)
end.sum
end
def estimate_job_duration(job)
@cache[job['job_class']] ||= @appsignal_client
.average_job_duration job['job_class']
end
private
# ActiveJob-specific method for parsing job arguments
# for ActiveJob+Resque integration
def decode_activejob_args(job)
decoded_job = job
decoded_job = Resque.decode(job) if job.is_a? String
decoded_job['args'].first
end
end
এই ক্লাসটি ব্যবহার করা যতটা সহজ:
ResqueEstimator.new(queue: 'my_queue').enqueued_duration_estimate
# => 23000 (ms)
লক্ষ্য করুন যে আমরা আমাদের estimate_job_duration
-এ চাকরির সময়কালের একটি সাধারণ মেমোাইজেশন ব্যবহার করি AppSignal API-এ সদৃশ কলগুলি এড়াতে পদ্ধতি৷ সম্ভবত, আমাদের সারিতে একই শ্রেণীর অনেকগুলি কাজ থাকবে এবং আমরা প্রতিটি ক্লাসের একবার নির্বাহের অনুমান করে আমাদের ওভারহেড কমাতে পারি৷
স্কেল করতে পারফরম্যান্স ডেটা ব্যবহার করা
এই সব একসাথে টেনে, আমরা এখন আমাদের সারির বিষয়বস্তুর উপর ভিত্তি করে আমাদের সারির কর্মীদের উপরে বা নীচে স্কেল করতে আমাদের সাম্প্রতিক কর্মক্ষমতা ডেটা ব্যবহার করতে পারি! যে কোনও মুহূর্তে, আমরা আমাদের সারিতে থাকা কাজগুলি দেখতে পারি এবং প্রয়োজনীয় কর্মীদের একটি অনুমান পেতে পারি আমাদের কাঙ্ক্ষিত সময়সীমার মধ্যে এটি পরিষেবা দিতে।
আমাদের একটি কাঙ্খিত সারিবদ্ধ সময়সীমার বিষয়ে সিদ্ধান্ত নিতে হবে (যেকোনো চাকরির জন্য সারিতে অপেক্ষা করা উচিত সর্বোচ্চ সময়), যেমন 30 সেকেন্ড। আমাদের একটি ন্যূনতম এবং সর্বোচ্চ কর্মী সংখ্যাও নির্দিষ্ট করতে হবে। সারির জন্য কমপক্ষে একজন কর্মীকে দৌড়ে রাখা, কিছুক্ষণের জন্য সারি খালি থাকার পরে সারিবদ্ধ প্রথম কাজ(গুলি) পরিচালনা করার জন্য এটি সহায়ক। আমাদের ডাটাবেস কানেকশন এবং/অথবা অনেক বেশি কর্মীদের সাথে সার্ভারের ব্যবহার খরচ এড়াতে সর্বোচ্চ কর্মী সংখ্যাও চাই।
আমরা আমাদের জন্য এই যুক্তিটি পরিচালনা করার জন্য একটি ক্লাস তৈরি করতে পারি, যা মূলত আমাদের পূর্ব থেকে থাম্বের সারিবদ্ধ নিয়মের বাস্তবায়ন।
class ResqueWorkerScaler
def initialize(queue: 'default', workers_range: 1..100, desired_wait_ms: 300_000)
@queue = queue
@workers_range = workers_range
@desired_wait_ms = desired_wait_ms
@estimator = ResqueEstimator.new(queue: @queue)
end
def desired_workers
total_time_ms = @estimator.enqueued_duration_estimate
workers_required = [(total_time_ms / desired_wait_ms).ceil, workers_range.last].min
[workers_required, workers_range.first].max
end
def scale
# using platform-specific scaling interface, scale to desired_workers
end
end
আমরা আমাদের কর্মীদের নিয়মিত ব্যবধানে স্কেল করতে চাই যাতে আমরা চাহিদার উপর ভিত্তি করে উপরে এবং নীচের স্কেল করতে পারি। আমরা একটি রেক টাস্ক করতে পারি যা আমাদের ResqueWorkerScaler
কল করে শ্রমিকদের স্কেল করার জন্য শ্রেণি:
# inside lib/tasks/resque_workers.rake
namespace :resque_workers do
desc 'Scale worker pool based on enqueued jobs'
task :scale, [:queue] => [:environment] do |_t, args|
queue = args[:queue] || 'default'
ResqueWorkerScaler.new(queue: queue).scale
end
end
এবং তারপরে আমরা নিয়মিত বিরতিতে এই রেক টাস্কটি চালানোর জন্য একটি ক্রোন কাজ সেট আপ করতে পারি:
*/5 * * * * /path/to/our/rake resque_workers:scale
# scale a non-default queue:
*/5 * * * * /path/to/our/rake resque_workers:scale['my_queue']
লক্ষ্য করুন যে আমরা প্রতি 5 মিনিটে চালানোর জন্য স্কেলিং টাস্ক সেট করেছি৷ প্রতিটি নতুন কর্মী অনলাইনে আসতে এবং কাজগুলি প্রক্রিয়াকরণ শুরু করতে কিছু সময় নেবে — সম্ভবত 10-40 সেকেন্ডের মধ্যে যে কোনও জায়গায়, আমাদের কোডবেসের আকার এবং রত্নগুলির সংখ্যার উপর নির্ভর করে আমরা ব্যবহার করি। তাই, যদি আমরা প্রতি মিনিটে আমাদের কর্মীদের স্কেল করার চেষ্টা করি, তাহলে আমাদের কাঙ্খিত পরিবর্তনগুলি কার্যকর হওয়ার আগে আমরা সম্ভবত আবার উপরে বা নিচে স্কেল করব। যদি আমাদের অ্যাপটি শুধুমাত্র দিনের বিভিন্ন সময়ে সারির ব্যবহার ওঠানামা করতে দেখে, আমরা তা করতে পারি। সম্ভবত একটি ঘন্টার ব্যবধানে আমাদের রেক টাস্ককে কল করুন৷ কিন্তু যদি আমাদের সারির আকার ঘন্টার মধ্যে ওঠানামা করে, আমরা আমাদের সারিটি আরও ঘন ঘন ব্যবধানে আত্মদর্শন করতে চাই, যেমন উপরের 5 মিনিট৷
পরবর্তী ধাপগুলি
এই ধরনের একটি সিস্টেম যেখানে প্রকৃত পারফরম্যান্স ডেটা পরিকাঠামো স্কেল করার জন্য ব্যবহার করা হয় তা চাহিদার জন্য খুব প্রতিক্রিয়াশীল এবং বিভিন্ন ব্যবহারের জন্য স্থিতিস্থাপক হতে পারে। বিশেষ করে ব্যাকগ্রাউন্ড প্রসেসিংয়ের মতো পরিবেশে, যেখানে হোস্ট মেট্রিক্স যেমন মেমরি ব্যবহার এবং লোড গড় পরিবর্তিত হওয়ার সম্ভাবনা নেই, কর্মক্ষমতা মেট্রিক্স ব্যবহার করে স্কেল অনেক বেশি উপযুক্ত।
বিকল্প সারি স্কেলিং বাস্তবায়ন সম্পূর্ণ সারির আত্মনিদর্শনের পরিবর্তে কাজের প্রতি গড় অপেক্ষার সময়কে পরিমাপ করতে পারে, কিন্তু সেই মেট্রিকটি প্রতিনিধিত্বমূলক হতে পারে যখন সারির বিষয়বস্তু এবং আকার দ্রুত পরিবর্তিত হয়। যদি আমাদের সিস্টেম ব্যাপকভাবে পরিবর্তনশীল লোড অনুভব করে, একই সাথে প্রচুর কাজ সারিবদ্ধ হয়, বা ব্যাপকভাবে পরিবর্তনশীল কাজের নির্বাহের সময়, তারপর সারিবদ্ধ আত্মদর্শন সাড়া দিতে অনেক দ্রুত এবং নির্ভরযোগ্যভাবে সঠিক।
কিন্তু আমাদের সারি আত্মপরিদর্শন পদ্ধতিতে বিবেচনা করার জন্য কিছু সীমাবদ্ধতা রয়েছে৷ যদি সারিটি যথেষ্ট বড় হয়, তাহলে প্রতিটি কাজকে নির্বাহের অনুমানের জন্য তাকানো নিষেধমূলকভাবে ধীর হবে৷ এই ধরনের ক্ষেত্রে, মোট কাজের সংখ্যা খুঁজে বের করা ভাল হতে পারে, তারপর নির্বাচন করুন৷ সারি থেকে কাজের একটি এলোমেলো প্রতিনিধি নমুনা এবং সেই নমুনা থেকে গড় নির্বাহের গণনা করুন৷ বিকল্পভাবে, যদি কোনও চাকরির শ্রেণিতে এখনও এটির সাথে সম্পর্কিত কোনও কর্মক্ষমতা ডেটা না থাকে, তবে এটি কার্যকর করা এবং রেকর্ড করা না হওয়া পর্যন্ত আমাদের একটি অনুমানকৃত কার্যকর সময় ব্যবহার করতে হবে৷ কয়েকবার।
উপরে উল্লিখিত সিস্টেমটিও কয়েকটি পরিবর্তনের মাধ্যমে উল্লেখযোগ্যভাবে উন্নত করা যেতে পারে। সমান্তরালভাবে প্রতিটি চাকরি শ্রেণীর জন্য নির্বাহের সময় অনুমান করার কথা বিবেচনা করুন, কারণ প্রতিটি অনুমান বিচ্ছিন্ন এবং অদম্য। আমাদের মোট পরিষেবা সময়ের অনুমানের নির্ভুলতা উন্নত করার জন্য কর্মী৷ একাধিক সারি সহ একটি পটভূমি প্রক্রিয়াকরণ আর্কিটেকচারের জন্য, আমরা প্রতিটি সারিকে একটি পছন্দসই অপেক্ষার সময় বরাদ্দ করতে পারি, সারির অগ্রাধিকারের ভিত্তিতে এবং যথাযথভাবে কর্মীদের স্কেল করতে পারি৷
সারিবদ্ধ সিস্টেমগুলি যে কোনও প্রকল্পে প্রচুর পরিমাণে পরিবর্তনশীল কাজ সংগ্রহ করার প্রবণতা রাখে৷ সারি থেকে কাজগুলি সম্পাদনের কর্মক্ষমতা ডেটা সহ, আমরা কার্যকরভাবে সংস্থানগুলিকে কার্যকরভাবে স্কেল করতে পারি সেই সমস্ত কাজগুলিকে একটি প্রতিক্রিয়াশীল, দক্ষ পদ্ধতিতে পরিষেবা দিতে৷
শুভ স্কেলিং!
পি.এস. আপনি যদি রুবি ম্যাজিক পোস্টগুলি প্রেস থেকে বের হওয়ার সাথে সাথে পড়তে চান তবে আমাদের রুবি ম্যাজিক নিউজলেটারে সাবস্ক্রাইব করুন এবং একটি পোস্টও মিস করবেন না!