@private
# File lib/reactor/reactor.rb, line 53 def initialize(handlers, options = {}) @impl = options[:impl] if @impl.nil? @impl = Cproton.pn_reactor end if !handlers.nil? [handlers].flatten.each {|handler| self.handler.add(handler)} end @errors = [] @handlers = [] self.class.store_instance(self, :pn_reactor_attachments) end
# File lib/reactor/reactor.rb, line 47 def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_reactor_attachments) || Reactor.new(nil, :impl => impl) end
# File lib/reactor/reactor.rb, line 154 def acceptor(host, port, handler = nil) impl = chandler(handler, self.method(:on_error)) aimpl = Cproton.pn_reactor_acceptor(@impl, host, "#{port}", impl) Cproton.pn_decref(impl) if !aimpl.nil? return Acceptor.new(aimpl) else io_error = Cproton.pn_reactor_error(@impl) error_text = Cproton.pn_error_text(io_error) text = "(#{Cproton.pn_error_text(io_error)} (#{host}:#{port}))" raise IOError.new(text) end end
# File lib/reactor/reactor.rb, line 168 def connection(handler = nil) impl = chandler(handler, self.method(:on_error)) conn = Qpid::Proton::Connection.wrap(Cproton.pn_reactor_connection(@impl, impl)) Cproton.pn_decref(impl) return conn end
# File lib/reactor/reactor.rb, line 79 def global_handler impl = Cproton.pn_reactor_get_global_handler(@impl) Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error)) end
# File lib/reactor/reactor.rb, line 84 def global_handler=(handler) impl = chandler(handler, self.method(:on_error)) Cproton.pn_reactor_set_global_handler(@impl, impl) Cproton.pn_decref(impl) end
# File lib/reactor/reactor.rb, line 106 def handler impl = Cproton.pn_reactor_get_handler(@impl) Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error)) end
# File lib/reactor/reactor.rb, line 111 def handler=(handler) impl = chandler(handler, set.method(:on_error)) Cproton.pn_reactor_set_handler(@impl, impl) Cproton.pn_decref(impl) end
# File lib/reactor/reactor.rb, line 74 def on_error(info) self.errors << info self.yield end
# File lib/reactor/reactor.rb, line 135 def process result = Cproton.pn_reactor_process(@impl) if !self.errors.nil? && !self.errors.empty? (0...self.errors.size).each do |index| error_set = self.errors[index] print error.backtrace.join("\n") end raise self.errors.last end return result end
# File lib/reactor/reactor.rb, line 190 def push_event(obj, etype) Cproton.pn_collector_put(Cproton.pn_reactor_collector(@impl), Qpid::Proton::Util::RBCTX, Cproton.pn_py2void(obj), etype.number) end
Returns whether the reactor has any unbuffered data.
@return [Boolean] True if there is no unbuffered data.
# File lib/reactor/reactor.rb, line 70 def quiesced? Cproton.pn_reactor_quiesced(@impl) end
# File lib/reactor/reactor.rb, line 117 def run(&block) self.timeout = 3.14159265359 self.start while self.process do if block_given? yield end end self.stop end
# File lib/reactor/reactor.rb, line 147 def schedule(delay, task) impl = chandler(task, self.method(:on_error)) task = Task.wrap(Cproton.pn_reactor_schedule(@impl, sec_to_millis(delay), impl)) Cproton.pn_decref(impl) return task end
# File lib/reactor/reactor.rb, line 175 def selectable(handler = nil) impl = chandler(handler, self.method(:on_error)) result = Selectable.wrap(Cproton.pn_reactor_selectable(@impl)) if !impl.nil? record = Cproton.pn_selectable_attachments(result.impl) Cproton.pn_record_set_handler(record, impl) Cproton.pn_decref(impl) end return result end
Returns the timeout period.
@return [Fixnum] The timeout period, in seconds.
# File lib/reactor/reactor.rb, line 94 def timeout millis_to_timeout(Cproton.pn_reactor_get_timeout(@impl)) end
Sets the timeout period.
@param timeout [Fixnum] The timeout, in seconds.
# File lib/reactor/reactor.rb, line 102 def timeout=(timeout) Cproton.pn_reactor_set_timeout(@impl, timeout_to_millis(timeout)) end
# File lib/reactor/reactor.rb, line 186 def update(sel) Cproton.pn_reactor_update(@impl, sel.impl) end
# File lib/reactor/reactor.rb, line 128 def wakeup n = Cproton.pn_reactor_wakeup(@impl) unless n.zero? raise IOError.new(Cproton.pn_reactor_error(@impl)) end end