пятница, 27 января 2012 г.

Построение событийно управляемого веб-сервиса на Ruby

Предистория

Решил привести мысли в порядок и изложить в одном месте всё то, что пришлось перелопатить за последнее время. Итак, началось все с одной очень интересной задачи, которую мне поставили на моем нынешнем месте работы (UniqSystems).

Задача

Есть несколько игровых клубов, в каждом клубе по ~200 компьютеров. Есть демон, обрабатывающий в фоне сессии клиентов. Клиент платит деньги в кассу, садится за компьютер и авторизуется. Далее компьютер шлет на сервер long-polling запрос. Сервер подписывается на новые события, по данному клиенту и запускает таймер на 30 секунд. 

Если по истечению 30 секунд не происходит никаких событий, сервер возвращает ответ "Ок" и разрывает соединение с компьютером, компьютер шлет новый запрос. Если же, например, у клиента кончаются деньги, то демон останавливает сессию. В этом случае сервер получает событие о том что сессия остановилась и шлет компьютеру команду отключить клиента.

Компьютеры и сервер общаются по протоколу XML-RPC. Запросы делятся на два типа, асинхронные (long-polling) и синхронные (авторизация, завершение сессии и т.д.)

Иллюстрация для наглядности

Модули сервера


План действий

Итак, для решения поставленной задачи нам нужно реализовать следующие модули:
  • EventPoll - Пул сообщений. Обертка, реализующая раздачу сообщений и простую отписку от событий.
  • EventEmitter - Mix-in, который наделяет класс возможностью рассылать сообщения и иметь подписчиков на них.
  • UnsubscribableTimer - Обертка вокруг обычного таймера EventMachine, позволяющая отписываться от таймера одним вызовом без параметров. Это делает отписку от любого события в системе одинаковой, независимой от его природы.
  • SubscriberRoutines - Содержит в себе функционал, который будет полезен любому подписчику: ведет репозиторий всех подписок на события и позволяет от них всех разом отписаться, когда необходимость в обновлениях отпадает.
  • CallRouter - Организует автоматическое преобразование модулей, входящих в namespace в методы.
  • Executer и SyncExecuter - обработчики, асинхронный и синхронный соответственно.
  • RackXmlRpc - Модуль реализующий Deferrable паттерн (callbacks или errbacks) для наших вызовов.
  • ServerFoundation - Базовый мiddleware нашего rake приложения.
Рассмотрим и реализуем каждый модуль в отдельности.

EventPoll

EM::Channel - это своего рода коридор, любой может крикнуть что то в коридор и любой может услышать что кричат в коридоре открыв дверь (subscribe) или перестать слушать закрыв дверь (unsubscribe). Ниже базовый класс:

class EventPool
  def initialize
    @channel = EM::Channel.new
  end
  
  def emit(*args)
    @channel.push(args)
  end
  
  def subscribe(&block)
    sid = @channel.subscribe do |args|
      block.(*args)
    end
    ->() { @channel.unsubscribe(sid) }
  end
end

EventEmitter

Этот модель будет расширять те классы, которые должны генерировать события, ниже представлено как он выглядит, а так же пример его использования:

module EventEmitter
  def emits(message)
    # метод генерации события
    define_method "emit_#{message}", ->(*args) { self.class.emitter(self.id, message).emit(*args) }
    
    # метод подписки на событие
    define_method "on_#{message}", ->(&block) do 
      raise "Block is not provided!" unless block
      self.class.emitter(self.id, message).subscribe(&block)
    end
  end

  def emitter(id, message)
    @emitters ||= { }
    @emitters[id] ||= { }
    @emitters[id][message] ||= EventPool.new
  end
end

# session.rb
class Session < ActiveRecord::Base
  extend EventEmitter  
  emits :state_changed
  
  def self.auth(login_id, computer_id)
    self.create(:login_id => login_id, :computer_id => computer_id, :state => :auth)
  end
  
  def close
    if self.update_attribute(:state, :close)
      self.emit_state_changed(:close)
    end
  end
end

UnsubscribableTimer

Для работы с событием таймера у нас будет свой класс:

class UnsubscribableTimer
  def self.on_timeout(time_period, &block)
    timer = EventMachine::Timer.new(time_period) { block.() }
    ->() { timer.cancel }
  end
end

SubscriberRoutines

Осталось реализовать простой способ подписки на события, делать это будет SubscriberRoutines, ниже сам модуль и пример его использования:


module SubscriberRoutines
  def subscribe_to(proc)
    raise "Not expecting a block!" if block_given?
    subscriptions << proc
  end
  
  def unsubscribe_from_everything
    subscriptions.each { |s| s.() }
  end
  
protected
  def subscriptions
    @subscriptions ||= [ ]
  end
end

# пример использования
class SomeClass
  include SubscriberRoutines

  def subscribe_on_event_change(session)
    subscribe_to(session.on_state_changed do |new_state|
      unsubscribe_from_everything
      raise "new state #{new_state}"
    end)
  end
  
  def subscribe_on_timeout(session)
    subscribe_to(UnsubscribableTimer.on_timeout(ServerConfig.pipe_timeout) do
      unsubscribe_from_everything
      raise "timeout"
    end)
  end

end

CallRouter

С механизмом подписки разобрались. CallRouter в связке с RackXmlRpc позволит нам не заботится об отдельной реализации XML-RPC протокола, автоматически преобразую все классы в определенном namespace в методы сервера.

class CallRouter
  
  def initialize(handlers_namespace)
    handlers_namespace.constants.each do |klass|
      const = handlers_namespace.const_get(klass)
      add_method(klass.to_s.underscore, const) if const.is_a? Class
    end
  end

private
  
  def add_method(method, processor)
    code = ->(response_callback, *params) do
      processor_instance = processor.kind_of?(Class) ? processor.new : processor
      processor_instance.handle(response_callback, *params)
    end
    self.class.send(:define_method, method, code)
  end

end

# пример использования
module RequestHandlers
  class Auth
    def handle
      raise "Implement me"
    end
  end
  
  class Pipe
    def handle
      raise "Implement me"
    end
  end
end

router = CallRouter.new(RequestHandlers)
router.methods # => [:auth, :pipe]

Executer и SyncExecuter

Одна из самых интересных частей, обработчики событий, ниже сами классы с комментариями:

class Executor
  # инициализирует callback's и возвращает управление
  def handle(response_callback, params)
    raise "Implement me"
  end
end

class SyncExecutor < Executor  
  # инициализирует callback's и возвращает управление
  def handle(response_callback, *params)
    result_notification = EM.spawn { |return_value| response_callback.(return_value) }
    begin
      result_notification.notify(sync_handle(*params))
    rescue Exception => e
      result_notification.notify({:code => 255, :errorString => e.message, :backtrace => e.backtrace.join("\n")})
    end
  end
  
  # выполняется в отдельно треде
  def sync_handle(*params)
    raise "Implement me"
  end
end

Теперь можно привести пару примеров использования:

module RequestHandlers
  class Pipe < Executor
    include SubscriberRoutines
    
    def handle(response_callback, session_id)
      self.response_callback = response_callback
      
      session = Session.find(session_id)
      
      subscribe_on_event_change(session)
      subscribe_on_timeout(session)
    end

    def subscribe_on_event_change(session)
      subscribe_to(session.on_state_changed do |new_state|
        unsubscribe_from_everything
        self.response_callback.({:result => "new state #{new_state}"})
      end)
    end
  
    def subscribe_on_timeout(session)
      subscribe_to(UnsubscribableTimer.on_timeout(ServerConfig.pipe_timeout) do
        unsubscribe_from_everything
        self.response_callback.({:result => "OK"})
      end)
    end

  end

  class Auth < SyncExecutor
    def sync_handle(login_id, password, computer_id)
      return {:error => "can't find login"} unless login = Login.find(login_id)
      return {:error => "invalid password"} unless login.password != password
      return {:error => "can't find computer"} unless computer = Computer.find(computer_id)
      
      return Session.auth(login.id, computer.id) ? {:result => "ОК"} : {:error => "can't create session"}
    end
  end
end

RackXmlRpc

Сам Deferrable паттерн был заимствован из фреймворка Twisted для Python. Для наглядности приведу небольшой пример использования Deferable в EventMachine из документации:

require 'eventmachine'

 class MyClass
   include EM::Deferrable

   def print_value x
     puts "MyClass instance received #{x}"
   end
 end

 EM.run {
   df = MyClass.new
   df.callback {|x|
     df.print_value(x)
     EM.stop
   }

   EM::Timer.new(2) {
     df.set_deferred_status :succeeded, 100
   }
 }

Данный пример вернет "MyClass instance received 100" и закроется. Мы будем использовать данный паттерн для обработки xml-rpc в связке с CallRouter методами:

class RackXmlRpc
  include EM::Deferrable
  
  def initialize(call_router, rpc_server = XMLRPC::BasicServer.new)
    @xml_rpc_server = rpc_server
    @call_router = call_router
    
    @call_router.class.instance_methods(false).each do |method|
      method_name = [method.to_s].compact.join('.')
      @xml_rpc_server.add_handler(method_name) { |*args| @call_router.send(method, *args) }
    end
    
    @xml_rpc_server.set_service_hook do |obj, *args|
      obj.call(->(result) { process_xml_rpc_response([true, result]) }, *args)
    end
  end
  
  def call(request_xml)
    @xml_rpc_server.process(request_xml)
  end
  
  def each(&block)
    @xml_response_callback = block
  end
  
private
  def process_xml_rpc_response(returned)
    submit_server_response(returned)
    succeed
  end

  def submit_server_response(response)
    prepared_response = @xml_rpc_server.prepare_response(response)
    @xml_response_callback.call(prepared_response)
  end
end

ServerFoundation

И в завершение привожу rack middleware и пример запуска сервера на thin в асинхронном режиме:

require 'eventmachine'
require 'xmlrpc/server'

class ServerFoundation
  AsyncResponse = [-1, {}, []].freeze

  def initialize(handler_namespace)
    @call_router = CallRouter.new(handler_namespace)
  end

  def call(env)
    rack_xml_rpc = RackXmlRpc.new(@call_router)
    EM.next_tick do
      env['async.callback'].call [200, {'Content-Type' => 'text/xml'}, rack_xml_rpc] 
      rack_xml_rpc.call(env['rack.input'].string)
    end
    AsyncResponse
  end

end

# пример использования
app = Rack::Builder.new do
  run ServerFoundation.new(RequestHandlers, 'session')
end

Rack::Handler::Thin.run app, :Port => 3333

Заключение

Соглашусь что повествование получилось немного поверхностным, но статья и так уже достаточно большая, а если разжевывать каждую деталь то она станет просто огромной. Все не ясные моменты можно выяснить из офф. документации. Так же тим. лидер комманды UniqSystem (Иван Касатенко) выступал с докладом по этой теме на DevConf, посмотреть можно здесь.

Комментариев нет:

Отправить комментарий