--- /dev/null
+module Faraday
+ class Adapter
+ # EventMachine adapter is useful for either asynchronous requests
+ # when in EM reactor loop or for making parallel requests in
+ # synchronous code.
+ class EMHttp < Faraday::Adapter
+ module Options
+ def connection_config(env)
+ options = {}
+ configure_proxy(options, env)
+ configure_timeout(options, env)
+ configure_socket(options, env)
+ configure_ssl(options, env)
+ options
+ end
+
+ def request_config(env)
+ options = {
+ :body => read_body(env),
+ :head => env[:request_headers],
+ # :keepalive => true,
+ # :file => 'path/to/file', # stream data off disk
+ }
+ configure_compression(options, env)
+ options
+ end
+
+ def read_body(env)
+ body = env[:body]
+ body.respond_to?(:read) ? body.read : body
+ end
+
+ def configure_proxy(options, env)
+ if proxy = request_options(env)[:proxy]
+ options[:proxy] = {
+ :host => proxy[:uri].host,
+ :port => proxy[:uri].port,
+ :authorization => [proxy[:user], proxy[:password]]
+ }
+ end
+ end
+
+ def configure_socket(options, env)
+ if bind = request_options(env)[:bind]
+ options[:bind] = {
+ :host => bind[:host],
+ :port => bind[:port]
+ }
+ end
+ end
+
+ def configure_ssl(options, env)
+ if env[:url].scheme == 'https' && env[:ssl]
+ options[:ssl] = {
+ :cert_chain_file => env[:ssl][:ca_file],
+ :verify_peer => env[:ssl].fetch(:verify, true)
+ }
+ end
+ end
+
+ def configure_timeout(options, env)
+ timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
+ options[:connect_timeout] = options[:inactivity_timeout] = timeout
+ options[:connect_timeout] = open_timeout if open_timeout
+ end
+
+ def configure_compression(options, env)
+ if env[:method] == :get and not options[:head].key? 'accept-encoding'
+ options[:head]['accept-encoding'] = 'gzip, compressed'
+ end
+ end
+
+ def request_options(env)
+ env[:request]
+ end
+ end
+
+ include Options
+
+ dependency 'em-http'
+
+ self.supports_parallel = true
+
+ def self.setup_parallel_manager(options = nil)
+ Manager.new
+ end
+
+ def call(env)
+ super
+ perform_request env
+ @app.call env
+ end
+
+ def perform_request(env)
+ if parallel?(env)
+ manager = env[:parallel_manager]
+ manager.add {
+ perform_single_request(env).
+ callback { env[:response].finish(env) }
+ }
+ else
+ unless EventMachine.reactor_running?
+ error = nil
+ # start EM, block until request is completed
+ EventMachine.run do
+ perform_single_request(env).
+ callback { EventMachine.stop }.
+ errback { |client|
+ error = error_message(client)
+ EventMachine.stop
+ }
+ end
+ raise_error(error) if error
+ else
+ # EM is running: instruct upstream that this is an async request
+ env[:parallel_manager] = true
+ perform_single_request(env).
+ callback { env[:response].finish(env) }.
+ errback {
+ # TODO: no way to communicate the error in async mode
+ raise NotImplementedError
+ }
+ end
+ end
+ rescue EventMachine::Connectify::CONNECTError => err
+ if err.message.include?("Proxy Authentication Required")
+ raise Error::ConnectionFailed, %{407 "Proxy Authentication Required "}
+ else
+ raise Error::ConnectionFailed, err
+ end
+ rescue => err
+ if defined?(OpenSSL) && OpenSSL::SSL::SSLError === err
+ raise Faraday::SSLError, err
+ else
+ raise
+ end
+ end
+
+ # TODO: reuse the connection to support pipelining
+ def perform_single_request(env)
+ req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
+ req.setup_request(env[:method], request_config(env)).callback { |client|
+ save_response(env, client.response_header.status, client.response) do |resp_headers|
+ client.response_header.each do |name, value|
+ resp_headers[name.to_sym] = value
+ end
+ end
+ }
+ end
+
+ def error_message(client)
+ client.error or "request failed"
+ end
+
+ def raise_error(msg)
+ errklass = Faraday::Error::ClientError
+ if msg == Errno::ETIMEDOUT
+ errklass = Faraday::Error::TimeoutError
+ msg = "request timed out"
+ elsif msg == Errno::ECONNREFUSED
+ errklass = Faraday::Error::ConnectionFailed
+ msg = "connection refused"
+ elsif msg == "connection closed by server"
+ errklass = Faraday::Error::ConnectionFailed
+ end
+ raise errklass, msg
+ end
+
+ def parallel?(env)
+ !!env[:parallel_manager]
+ end
+
+ # The parallel manager is designed to start an EventMachine loop
+ # and block until all registered requests have been completed.
+ class Manager
+ def initialize
+ reset
+ end
+
+ def reset
+ @registered_procs = []
+ @num_registered = 0
+ @num_succeeded = 0
+ @errors = []
+ @running = false
+ end
+
+ def running?() @running end
+
+ def add
+ if running?
+ perform_request { yield }
+ else
+ @registered_procs << Proc.new
+ end
+ @num_registered += 1
+ end
+
+ def run
+ if @num_registered > 0
+ @running = true
+ EventMachine.run do
+ @registered_procs.each do |proc|
+ perform_request(&proc)
+ end
+ end
+ if @errors.size > 0
+ raise Faraday::Error::ClientError, @errors.first || "connection failed"
+ end
+ end
+ ensure
+ reset
+ end
+
+ def perform_request
+ client = yield
+ client.callback { @num_succeeded += 1; check_finished }
+ client.errback { @errors << client.error; check_finished }
+ end
+
+ def check_finished
+ if @num_succeeded + @errors.size == @num_registered
+ EventMachine.stop
+ end
+ end
+ end
+ end
+ end
+end
+
+begin
+ require 'openssl'
+rescue LoadError
+ warn "Warning: no such file to load -- openssl. Make sure it is installed if you want HTTPS support"
+else
+ require 'faraday/adapter/em_http_ssl_patch'
+end if Faraday::Adapter::EMHttp.loaded?