# File lib/reactor/connector.rb, line 24 def initialize(connection, url, opts) @connection, @opts = connection, opts @urls = URLs.new(url) if url opts.each do |k,v| case k when :url, :urls, :address @urls = URLs.new(v) unless @urls when :reconnect @reconnect = v end end raise ::ArgumentError.new("no url for connect") unless @urls # TODO aconway 2017-08-17: review reconnect configuration and defaults @reconnect = Backoff.new() unless @reconnect @ssl_domain = SessionPerConnection.new # TODO seems this should be configurable @connection.overrides = self @connection.open end
# File lib/reactor/connector.rb, line 80 def connect(connection) url = @urls.next transport = Qpid::Proton::Transport.new @opts.each do |k,v| case k when :user connection.user = v when :password connection.password = v when :heartbeat transport.idle_timeout = v.to_i when :idle_timeout transport.idle_timeout = v.(v*1000).to_i when :sasl_enabled transport.sasl if v when :sasl_allow_insecure_mechs transport.sasl.allow_insecure_mechs = v when :sasl_allowed_mechs, :sasl_mechanisms transport.sasl.allowed_mechs = v end end # TODO aconway 2017-08-11: hostname setting is incorrect, reactor only connection.hostname = "#{url.host}:#{url.port}" connection.user = url.username if url.username && !url.username.empty? connection.password = url.password if url.password && !url.password.empty? transport.bind(connection) if (url.scheme == "amqps") && @ssl_domain @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain) @ssl.peer_hostname = url.host end end
# File lib/reactor/connector.rb, line 44 def on_connection_local_open(event) self.connect(event.connection) end
# File lib/reactor/connector.rb, line 76 def on_connection_remote_close(event) @connection = nil end
# File lib/reactor/connector.rb, line 48 def on_connection_remote_open(event) @reconnect.reset if @reconnect end
# File lib/reactor/connector.rb, line 72 def on_timer_task(event) self.connect(@connection) end
# File lib/reactor/connector.rb, line 56 def on_transport_closed(event) if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero? if !@reconnect.nil? event.transport.unbind delay = @reconnect.next if delay == 0 self.connect(@connection) else event.reactor.schedule(delay, self) end else @connection = nil end end end
# File lib/reactor/connector.rb, line 52 def on_transport_tail_closed(event) self.on_transport_closed(event) end