কম্পিউটার

কাফকার সাথে রেলে ইভেন্ট স্ট্রিমিং

অন্তর্দৃষ্টি অর্জন করতে এবং গ্রাহকদের আরও আকর্ষক অভিজ্ঞতা তৈরি করতে রিয়েল টাইমে প্রচুর পরিমাণে ডেটা প্রক্রিয়াকরণ এবং ভাগ করার প্রয়োজনে কোম্পানিগুলি দ্রুত প্রতিক্রিয়া জানাতে চায়। সুতরাং, ঐতিহ্যগত ডেটা প্রক্রিয়াকরণ আজকের বিশ্বে আর কার্যকর নয়৷

এটি অর্জন করতে, আপনাকে যত দ্রুত সম্ভব অনেক ডেটা প্রক্রিয়া করতে হবে এবং তারপরে আরও প্রক্রিয়াকরণের জন্য এটি অন্যান্য পরিষেবাগুলিতে পাঠাতে হবে। কিন্তু এই সমস্ত দ্রুত পদক্ষেপের মাঝখানে, ইভেন্টটি ঘটলে ভোক্তাদের অবহিত করা প্রয়োজন—এবং আমরা ইভেন্ট স্ট্রিমিং ব্যবহার করে এটি করতে পারি।

এটি গিটহাবের রেপো যা আমরা ব্যবহার করব।

ইভেন্টগুলি

ইভেন্ট স্ট্রিমিং সম্পর্কে কথা বলার আগে, আসুন একটি ইভেন্ট কী তা নিয়ে কথা বলি। একটি অ্যাপ্লিকেশনের মধ্যে ঘটে যাওয়া একটি ঘটনা ব্যবহারকারীর প্রক্রিয়ার সাথে সম্পর্কিত হতে পারে বা ব্যবসাকে প্রভাবিত করে এমন কর্মের সাথে সম্পর্কিত হতে পারে৷

ইভেন্টগুলি রাষ্ট্রীয় পরিবর্তনের প্রতিনিধিত্ব করে, কীভাবে অ্যাপ্লিকেশনটি সংশোধন করতে হয় সেই প্রশ্ন নয়। এগুলোকে উদাহরণ হিসেবে বিবেচনা করুন:

  • একজন ব্যবহারকারী একটি পরিষেবাতে লগ ইন করছেন
  • একটি অর্থপ্রদানের লেনদেন
  • একজন লেখক একটি ব্লগে একটি পোস্ট প্রকাশ করছেন

বেশিরভাগ ক্ষেত্রে, একটি ঘটনা আরও ইভেন্টকে ট্রিগার করবে; উদাহরণস্বরূপ, যখন কোনও ব্যবহারকারী কোনও পরিষেবার জন্য সাইন আপ করেন, অ্যাপটি তাদের ডিভাইসে একটি বিজ্ঞপ্তি পাঠায়, ডেটাবেসে রেকর্ড সন্নিবেশ করায় এবং একটি স্বাগত ইমেল পাঠায়৷

ইভেন্ট স্ট্রিমিং

ইভেন্ট স্ট্রিমিং ইভেন্ট উত্স যেমন ডাটাবেস থেকে রিয়েল টাইমে ডেটা ক্যাপচার করার একটি প্যাটার্ন। ইভেন্ট স্ট্রিমিংয়ের প্রধান অংশগুলি নিম্নরূপ:

  • দালাল :ইভেন্ট সংরক্ষণের দায়িত্বে থাকা সিস্টেম
  • বিষয় :ইভেন্টের একটি বিভাগ
  • প্রযোজক :একটি নির্দিষ্ট বিষয়ে একটি ব্রোকারের কাছে ইভেন্ট পাঠায়
  • ভোক্তা :ঘটনা পড়ে
  • ইভেন্টগুলি৷ :ডেটা যা প্রযোজকরা গ্রাহকদের সাথে যোগাযোগ করতে চায়

আর্কিটেকচার প্যাটার্ন (পাব/সাব প্যাটার্ন) প্রকাশ এবং সদস্যতা নিয়ে কথা বলা অনিবার্য এই মুহূর্তে; ইভেন্ট স্ট্রিমিং সেই প্যাটার্নের একটি বাস্তবায়ন কিন্তু এই পরিবর্তনগুলির সাথে:

  • বার্তার পরিবর্তে ঘটনা ঘটে।
  • ইভেন্টগুলি সাজানো হয়, সাধারণত সময় অনুসারে৷
  • ভোক্তারা বিষয়ের একটি নির্দিষ্ট বিন্দু থেকে ইভেন্ট পড়তে পারেন।
  • ইভেন্টগুলোর সাময়িক স্থায়িত্ব আছে।

প্রবাহ শুরু হয় যখন উৎপাদক একটি নতুন ইভেন্ট প্রকাশ করে৷ একটি বিষয়-এ (যেমন আমরা আগে দেখেছি, বিষয়টি শুধুমাত্র একটি নির্দিষ্ট ধরণের ইভেন্টের জন্য শ্রেণীকরণ)। তারপর, ভোক্তাদের একটি নির্দিষ্ট বিভাগের ইভেন্টে আগ্রহীরা সেই বিষয়ে সাবস্ক্রাইব করুন। অবশেষে, দালাল বিষয়ের ভোক্তাদের সনাক্ত করে এবং পছন্দসই ইভেন্টগুলি উপলব্ধ করে।

ইভেন্ট স্ট্রিমিংয়ের সুবিধা

  • ডিকপলিং প্রকাশক এবং ভোক্তাদের মধ্যে কোন নির্ভরতা নেই কারণ তাদের একে অপরকে জানার প্রয়োজন নেই। উপরন্তু, ইভেন্টগুলি তাদের ক্রিয়াগুলি নির্দিষ্ট করে না, তাই অনেক ভোক্তা একই ইভেন্ট পেতে এবং বিভিন্ন ক্রিয়া সম্পাদন করতে পারে৷

  • কম লেটেন্সি ইভেন্টগুলি ডিকপল করা হয় এবং ভোক্তাকে যে কোনো সময় সেগুলি ব্যবহার করতে দেয়; এটা মিলিসেকেন্ডে ঘটতে পারে।

  • স্বাধীনতা আমরা জানি, প্রকাশক এবং ভোক্তারা স্বাধীন, তাই বিভিন্ন দল তাদের সাথে কাজ করতে পারে একই ইভেন্ট ব্যবহার করে অন্যান্য ক্রিয়া বা উদ্দেশ্যে।

  • ফল্ট টলারেন্স কিছু ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম আপনাকে ভোক্তাদের ব্যর্থতা মোকাবেলা করতে সাহায্য করে; উদাহরণস্বরূপ, ভোক্তারা তাদের অবস্থান সংরক্ষণ করতে পারেন এবং একটি ত্রুটি ঘটলে সেখান থেকে আবার শুরু করতে পারেন৷

  • রিয়েল-টাইম হ্যান্ডলিং প্রতিক্রিয়া রিয়েল টাইমে পাওয়া যায়, তাই ব্যবহারকারীদের তাদের ইভেন্টের প্রতিক্রিয়া দেখতে মিনিট বা ঘন্টা অপেক্ষা করতে হবে না।

  • উচ্চ কর্মক্ষমতা কম বিলম্বের কারণে ইভেন্ট প্ল্যাটফর্ম অনেক বার্তা পরিচালনা করতে পারে—উদাহরণস্বরূপ, এক সেকেন্ডে হাজার হাজার ইভেন্ট।

ইভেন্ট স্ট্রিমিং এর অসুবিধা

  • মনিটরিং কিছু ইভেন্ট স্ট্রিমিং টুলের একটি সম্পূর্ণ মনিটরিং টুল নেই; তারা ডেটাডগ বা নিউ রিলিকের মতো অতিরিক্ত সরঞ্জামগুলি প্রয়োগ করার আহ্বান জানায়৷

  • কনফিগারেশন কিছু সরঞ্জামের কনফিগারেশন এমনকি অভিজ্ঞ ব্যক্তিদের জন্য অপ্রতিরোধ্য হতে পারে। অনেকগুলি পরামিতি রয়েছে এবং কখনও কখনও, সেগুলি প্রয়োগ করার জন্য আপনাকে বিষয়টি সম্পর্কে গভীরভাবে জানতে হবে৷

  • ক্লায়েন্ট লাইব্রেরি জাভা ছাড়া অন্য ভাষায় কাফকা বাস্তবায়ন করা সহজ নয়। কখনও কখনও, ক্লায়েন্ট লাইব্রেরিগুলি আপ টু ডেট নয়, অস্থিরতা দেখায়, বা বেছে নেওয়ার জন্য অনেকগুলি বিকল্প অফার করে না৷

ইভেন্ট স্ট্রিমিং এর অন্যতম জনপ্রিয় টুল হল Apache Kafka . এই টুল ব্যবহারকারীদের যখনই এবং যেখানেই প্রয়োজন ডেটা পাঠাতে, সঞ্চয় করতে এবং অনুরোধ করতে দেয়; আসুন এটি সম্পর্কে কথা বলি।

Apache Kafka

"Apache Kafka হল একটি ওপেন সোর্স ডিস্ট্রিবিউটেড ইভেন্ট স্ট্রিমিং প্ল্যাটফর্ম যা হাজার হাজার কোম্পানির দ্বারা উচ্চ-পারফরম্যান্স ডেটা পাইপলাইন, স্ট্রিমিং অ্যানালিটিক্স, ডেটা ইন্টিগ্রেশন, এবং মিশন-সমালোচনামূলক অ্যাপ্লিকেশনগুলির জন্য ব্যবহৃত হয়৷"

রিয়েল-টাইম লগ ট্রান্সমিশনের জন্য বিশেষভাবে ডিজাইন করা হয়েছে, Apache Kafka নিম্নলিখিত অ্যাপ্লিকেশনগুলির জন্য আদর্শ:

  • বিভিন্ন উপাদানের মধ্যে নির্ভরযোগ্য তথ্য বিনিময়
  • অ্যাপ্লিকেশানের প্রয়োজনীয়তা পরিবর্তিত হওয়ার সাথে সাথে মেসেজিং ওয়ার্কলোডগুলিকে ভাগ করার ক্ষমতা
  • ডাটা প্রক্রিয়াকরণের জন্য রিয়েল-টাইম ট্রান্সমিশন

আসুন একটি রেল অ্যাপ্লিকেশনে কাফকা ব্যবহার করি!

রেলের সাথে কাফকা ব্যবহার করা

রুবিতে কাফকা ব্যবহার করার জন্য সবচেয়ে বিখ্যাত রত্নটিকে জেনডেস্ক দ্বারা রুবি-কাফকা বলা হয় এবং এটি দুর্দান্ত! তবুও, আপনাকে ম্যানুয়ালি সমস্ত বাস্তবায়ন করতে হবে, যে কারণে আমাদের কাছে রুবি-কাফকা দিয়ে তৈরি কিছু "ফ্রেমওয়ার্ক" রয়েছে। তারা আমাদের সমস্ত কনফিগারেশন এবং কার্যকরী পদক্ষেপগুলিতে সহায়তা করে।

কারাফকা হল একটি কাঠামো যা অ্যাপাচি কাফকা-ভিত্তিক রুবি অ্যাপ্লিকেশন ডেভেলপমেন্টকে সহজ করার জন্য ব্যবহৃত হয়।

কাফকার সাথে কাজ করতে হলে জাভা ইন্সটল করতে হবে। যেহেতু কাফকা একটি স্কালা এবং জাভা অ্যাপ্লিকেশন, তাই জুকিপার ইনস্টল করার প্রয়োজন হবে।

ইনস্টলেশনের আগে, আমি Zookeeper সম্পর্কে কিছুটা ব্যাখ্যা করতে চাই। চিড়িয়াখানা হল কাফকার জন্য অপরিহার্য একটি কেন্দ্রীভূত পরিষেবা; এটি পরিবর্তনের ক্ষেত্রে বিজ্ঞপ্তি পাঠায় যেমন একটি নতুন বিষয় তৈরি করা, একটি ব্রোকারের ক্র্যাশ, একটি ব্রোকারকে অপসারণ করা, বিষয়গুলি মুছে ফেলা ইত্যাদি।

এর প্রধান কাজ হল কাফকা ব্রোকারদের পরিচালনা করা, তাদের নিজ নিজ মেটাডেটা সহ একটি তালিকা বজায় রাখা এবং স্বাস্থ্য-পরীক্ষার প্রক্রিয়া সহজতর করা। উপরন্তু, এটি বিভিন্ন বিষয়ের পার্টিশনের জন্য নেতৃস্থানীয় ব্রোকার নির্বাচন করতে সাহায্য করে।

প্রয়োজনীয়তা

MacOS এর জন্য:

এখন, নিম্নলিখিত কমান্ডগুলি দিয়ে Java এবং Zookeeper ইনস্টল করা যাক:

brew install java
brew install zookeeper

তারপর, আমরা এটি চালিয়ে কাফকা ইনস্টল করা চালিয়ে যেতে পারি:

brew install kafka

একবার আমাদের কাফকা এবং জুকিপার ইনস্টল করা হয়ে গেলে, এইভাবে পরিষেবাগুলি শুরু করা প্রয়োজন:

brew services start zookeeper
brew services start kafka

Windows এবং Linux-এর জন্য:

নির্দেশনা:

  1. জাভা ইনস্টল করা হচ্ছে
  2. জুকিপার ডাউনলোড করুন

রেল সেট আপ করা

যথারীতি একটি সাধারণ রেল অ্যাপ্লিকেশন তৈরি করুন:

rails new karafka_example

এবং জেমফাইলের মধ্যে কারাফকা রত্ন যোগ করুন:

gem 'karafka'

তারপর, bundle install চালান সম্প্রতি যোগ করা রত্নটি ইনস্টল করতে, এবং সমস্ত Karafka জিনিস পেতে নিম্নলিখিত কমান্ডটি চালাতে ভুলবেন না:

bundle exec karafka install

এই কমান্ডটি কিছু আকর্ষণীয় ফাইল তৈরি করবে:প্রথমটি হল karafka.rb রুট ডিরেক্টরিতে, app/consumers/application_consumer.rb , এবং app/responders/application_responder.rb .

কারাফকা ইনিশিয়ালাইজার

karafka.rb ফাইলটি রেল কনফিগারেশন থেকে আলাদা করা একটি ইনিশিয়ালাইজার অ্যাপ্লিকেশনের মতো। এটি আপনাকে Karafka অ্যাপ্লিকেশন কনফিগার করতে এবং কিছু রুট আঁকতে দেয়, যা API এর ক্ষেত্রে রেল অ্যাপ্লিকেশন রুটের মতো। কিন্তু এখানে, এটি বিষয় এবং ভোক্তাদের জন্য।

প্রযোজক

প্রযোজক ইভেন্টগুলি তৈরি করার দায়িত্বে রয়েছে, এবং আমরা সেগুলিকে app/responders-এ যোগ করতে পারি ফোল্ডার এখন, ব্যবহারকারীদের জন্য একটি সহজ প্রযোজক তৈরি করা যাক:

# app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end

ভোক্তা

ভোক্তা প্রযোজকের কাছ থেকে পাঠানো সমস্ত ঘটনা/বার্তা পড়ার জন্য দায়ী। এটি শুধুমাত্র একজন ভোক্তা যে প্রাপ্ত বার্তাটি লগ করে।

# app/consumers/users_consumer.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end

আমরা params ব্যবহার করি ঘটনা পেতে. কিন্তু আপনি যদি ব্যাচে ইভেন্টগুলি পড়েন এবং আপনার কনফিগারেশন config.batch_fetching থাকে সত্য হিসাবে, আপনার params_batch ব্যবহার করা উচিত .

পরীক্ষা

আমাদের Karafka পরিষেবা চালানোর জন্য (যেটি ঘটনাগুলি শুনবে), কনসোলে যান, একটি নতুন ট্যাব খুলুন, রেল প্রকল্পে যান এবং চালান:

bundle exec karafka server

সফল ইভেন্ট

এখন, আরেকটি কনসোল ট্যাব খুলুন, রেল প্রকল্পে যান এবং এটি টাইপ করুন:

rails c

সেখানে, আসুন আমাদের উত্তরদাতার সাথে একটি ইভেন্ট তৈরি করি:

> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })

আপনি যদি রেল কনসোল চেক করেন, ইভেন্ট তৈরি হওয়ার পরে আমরা এই বার্তাটি পাব:

Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}

এবং Karafka পরিষেবা ট্যাবে, আপনি এরকম কিছু দেখতে পাবেন:

New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092

কিন্তু আপনি যদি শুধু মেসেজ পেলোড চান, তাহলে আপনি params.payload যোগ করতে পারেন আপনার ভোক্তার মধ্যে এবং আপনার কাছে এরকম কিছু থাকবে:

Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer

ব্যর্থ ইভেন্ট

আপনি email এর মত কিছু বৈশিষ্ট্য সহ একটি ব্যবহারকারী মডেল তৈরি করতে পারেন , first_name এবং last_name নিম্নলিখিত কমান্ড চালাচ্ছে:

rails g model User email first_name last_name

তারপর, আপনি এটি দিয়ে মাইগ্রেশন চালাতে পারেন:

rails db:migrate

এখন, এই মত কিছু বৈধতা যোগ করুন:

class User < ApplicationRecord
  validates :email, uniqueness: true
end

অবশেষে, আমরা ভোক্তা পরিবর্তন করতে পারি:

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

সুতরাং, আসুন একই ইমেল দিয়ে দুটি ইভেন্ট তৈরি করি:

UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )

UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )

এটি দিয়ে, ডাটাবেসে প্রথম ইভেন্ট তৈরি করা হয়:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.1ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.0ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer

কিন্তু দ্বিতীয়টি ব্যর্থ হবে, কারণ আমাদের কাছে একটি বৈধতা রয়েছে যা বলে যে ইমেলটি অনন্য। যদি আপনি একটি বিদ্যমান ইমেলের সাথে অন্য রেকর্ড যোগ করার চেষ্টা করেন, তাহলে আপনি এরকম কিছু দেখতে পাবেন:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "[email protected]"], ["LIMIT", 1]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (0.2ms)  ROLLBACK
  ↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken

আপনি শেষ লাইনে ত্রুটি দেখতে পাচ্ছেন ActiveRecord::RecordInvalid: Validation failed: Email has already been taken . কিন্তু এখানে মজার বিষয় হল কাফকা ঘটনাটি বারবার প্রক্রিয়া করার চেষ্টা করবেন। এমনকি যদি আপনি Karafka সার্ভার পুনরায় চালু করেন, এটি শেষ ইভেন্ট প্রক্রিয়া করার চেষ্টা করবে। কাফকা কিভাবে জানেন কোথায় শুরু করবেন?

আপনি যদি আপনার কনসোল দেখেন, ত্রুটির পরে, আপনি এটি দেখতে পাবেন:

[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42

এটি আপনাকে বলবে যে কোন অফসেটটি প্রক্রিয়া করা হয়েছিল:এই ক্ষেত্রে, এটি 42 অফসেট ছিল৷ সুতরাং, আপনি যদি কারাফকা পরিষেবা পুনরায় চালু করেন, এটি সেই অফসেটে শুরু হবে৷

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches

এটি এখনও ব্যর্থ হবে কারণ আমাদের ব্যবহারকারী মডেলে আমাদের ইমেল বৈধতা রয়েছে৷ এই মুহুর্তে, কারাফকা সার্ভারটি বন্ধ করুন, সেই বৈধতাটি সরান বা মন্তব্য করুন এবং আবার আপনার সার্ভার শুরু করুন; আপনি দেখতে পাবেন কিভাবে ইভেন্টটি সফলভাবে প্রক্রিয়া করা হয়:

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.5ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed

অবশেষে, আপনি শেষ লাইনে এই বার্তাটি দেখতে পারেন:Marking users/0:43 as processed .

কলব্যাক

এটি এমন কিছু চমৎকার যা কারাফকা অফার করে:আপনি আপনার ভোক্তার কলব্যাক ব্যবহার করতে পারেন। এটি করার জন্য, আপনাকে শুধুমাত্র মডিউল আমদানি করতে হবে এবং সেগুলি ব্যবহার করতে হবে। তারপর, আপনার UserConsumer খুলুন এবং এটি যোগ করুন:

class UsersConsumer < ApplicationConsumer
  include Karafka::Consumers::Callbacks

  before_poll do
    Karafka.logger.info "*** Checking something new for #{topic.name}"
  end

  after_poll do
    Karafka.logger.info '*** We just checked for new messages!'
  end

  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

পোল হল সেই মাধ্যম যার মাধ্যমে আমরা বর্তমান পার্টিশন অফসেটের উপর ভিত্তি করে রেকর্ড আনতে পারি। সুতরাং, সেই কলব্যাকগুলি before_poll এবং after_poll , তাদের নাম প্রস্তাবিত, যে মুহূর্তে মৃত্যুদন্ড কার্যকর করা হয়. আমরা শুধু একটি বার্তা লগ করছি, এবং আপনি সেগুলি আপনার কারাফকা সার্ভারে দেখতে পাবেন—একটি আনার আগে এবং অন্যটি তার পরে:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!

হার্টবিট

একটি হৃদস্পন্দন ঠিক যেভাবে আমরা, ভোক্তা হিসাবে, কাফকাকে বলি আমরা বেঁচে আছি; অন্যথায়, কাফকা ধরে নেবেন যে ভোক্তা মারা গেছে।

কারাফকাতে, আমাদের একটি নির্দিষ্ট সময়ের মধ্যে এটি করার জন্য একটি ডিফল্ট কনফিগারেশন আছে; এটা হল kafka.heartbeat_interval এবং ডিফল্ট 10 সেকেন্ড। আপনি আপনার কারাফকা সার্ভারে এই হার্টবিট দেখতে পারেন।

*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!

Sending heartbeat... সহ , কাফকা জানেন যে আমরা বেঁচে আছি এবং আমরা এর ভোক্তা গোষ্ঠীর একটি বৈধ সদস্য। এছাড়াও, আমরা আরও রেকর্ড ব্যবহার করতে পারি।

কমিট

একটি অফসেটকে গ্রাস করা হিসাবে চিহ্নিত করাকে একটি অফসেট কমিট করা বলে। কাফকাতে, আমরা অফসেট বিষয় নামক একটি অভ্যন্তরীণ কাফকা বিষয়ে লিখে অফসেট কমিট রেকর্ড করি। একটি বার্তা শুধুমাত্র তখনই গ্রাস করা হয় যখন এর অফসেট অফসেট বিষয়ের সাথে প্রতিশ্রুতিবদ্ধ হয়৷

প্রতিবার স্বয়ংক্রিয়ভাবে এই প্রতিশ্রুতিটি সম্পাদন করার জন্য কারাফকার একটি কনফিগারেশন রয়েছে; কনফিগারেশন হল kafka.offset_commit_interval , এবং ডিফল্টরূপে এর মান 10 সেকেন্ড। এর সাথে, কারাকফা প্রতি 10 সেকেন্ডে একটি অফসেট কমিট করবে, এবং আপনি আপনার কারাফকা সার্ভারে সেই বার্তাটি দেখতে পারবেন:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!

Committing offsets: users/0:44 এটি কোন অফসেট করছে তা আমাদের বলুন; আমার ক্ষেত্রে, এটি কাফকাকে বলেছিল যে এটি টপিক 0 থেকে অফসেট নম্বর 44 কমিট করতে পারে৷ এইভাবে, আমাদের পরিষেবার সাথে কিছু ঘটলে, কারাফকা সেই অফসেট থেকে ইভেন্টগুলি প্রক্রিয়া করতে আবার শুরু করতে পারে৷

উপসংহার

ইভেন্ট স্ট্রিমিং আমাদেরকে দ্রুত হতে, ডেটার আরও ভাল ব্যবহার করতে এবং আরও ভাল ব্যবহারকারীর অভিজ্ঞতা ডিজাইন করতে সাহায্য করে। প্রকৃতপক্ষে, অনেক কোম্পানি তাদের সমস্ত পরিষেবার সাথে যোগাযোগ করতে এবং রিয়েল টাইমে বিভিন্ন ইভেন্টে প্রতিক্রিয়া জানাতে সক্ষম হওয়ার জন্য এই প্যাটার্নটি ব্যবহার করছে। আমি আগে উল্লেখ করেছি, কারাফকা ছাড়াও অন্যান্য বিকল্প রয়েছে যা আপনি রেলের সাথে ব্যবহার করতে পারেন। আপনি ইতিমধ্যে মৌলিক আছে; এখন, নির্দ্বিধায় তাদের সাথে পরীক্ষা করুন৷

রেফারেন্স

  • https://kafka.apache.org/
  • https://github.com/karafka/karafka
  • https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

  1. রেলের সাথে হটওয়্যার ব্যবহার করা

  2. বাইবাগ, রেল এবং পা দিয়ে রিমোট ডিবাগিং

  3. রেলের সাথে Tailwind CSS ব্যবহার করা

  4. রেলের সাথে কৌণিক ব্যবহার 5