]> git.donarmstrong.com Git - dsa-puppet.git/blob - 3rdparty/modules/aviator/lib/puppet/feature/faraday/adapter/em_http.rb
add aimonb/aviator to 3rdparty
[dsa-puppet.git] / 3rdparty / modules / aviator / lib / puppet / feature / faraday / adapter / em_http.rb
1 module Faraday
2   class Adapter
3     # EventMachine adapter is useful for either asynchronous requests
4     # when in EM reactor loop or for making parallel requests in
5     # synchronous code.
6     class EMHttp < Faraday::Adapter
7       module Options
8         def connection_config(env)
9           options = {}
10           configure_proxy(options, env)
11           configure_timeout(options, env)
12           configure_socket(options, env)
13           configure_ssl(options, env)
14           options
15         end
16
17         def request_config(env)
18           options = {
19             :body => read_body(env),
20             :head => env[:request_headers],
21             # :keepalive => true,
22             # :file => 'path/to/file', # stream data off disk
23           }
24           configure_compression(options, env)
25           options
26         end
27
28         def read_body(env)
29           body = env[:body]
30           body.respond_to?(:read) ? body.read : body
31         end
32
33         def configure_proxy(options, env)
34           if proxy = request_options(env)[:proxy]
35             options[:proxy] = {
36               :host => proxy[:uri].host,
37               :port => proxy[:uri].port,
38               :authorization => [proxy[:user], proxy[:password]]
39             }
40           end
41         end
42
43         def configure_socket(options, env)
44           if bind = request_options(env)[:bind]
45             options[:bind] = {
46               :host => bind[:host],
47               :port => bind[:port]
48             }
49           end
50         end
51
52         def configure_ssl(options, env)
53           if env[:url].scheme == 'https' && env[:ssl]
54             options[:ssl] = {
55               :cert_chain_file => env[:ssl][:ca_file],
56               :verify_peer => env[:ssl].fetch(:verify, true)
57             }
58           end
59         end
60
61         def configure_timeout(options, env)
62           timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
63           options[:connect_timeout] = options[:inactivity_timeout] = timeout
64           options[:connect_timeout] = open_timeout if open_timeout
65         end
66
67         def configure_compression(options, env)
68           if env[:method] == :get and not options[:head].key? 'accept-encoding'
69             options[:head]['accept-encoding'] = 'gzip, compressed'
70           end
71         end
72
73         def request_options(env)
74           env[:request]
75         end
76       end
77
78       include Options
79
80       dependency 'em-http'
81
82       self.supports_parallel = true
83
84       def self.setup_parallel_manager(options = nil)
85         Manager.new
86       end
87
88       def call(env)
89         super
90         perform_request env
91         @app.call env
92       end
93
94       def perform_request(env)
95         if parallel?(env)
96           manager = env[:parallel_manager]
97           manager.add {
98             perform_single_request(env).
99               callback { env[:response].finish(env) }
100           }
101         else
102           unless EventMachine.reactor_running?
103             error = nil
104             # start EM, block until request is completed
105             EventMachine.run do
106               perform_single_request(env).
107                 callback { EventMachine.stop }.
108                 errback { |client|
109                   error = error_message(client)
110                   EventMachine.stop
111                 }
112             end
113             raise_error(error) if error
114           else
115             # EM is running: instruct upstream that this is an async request
116             env[:parallel_manager] = true
117             perform_single_request(env).
118               callback { env[:response].finish(env) }.
119               errback {
120                 # TODO: no way to communicate the error in async mode
121                 raise NotImplementedError
122               }
123           end
124         end
125       rescue EventMachine::Connectify::CONNECTError => err
126         if err.message.include?("Proxy Authentication Required")
127           raise Error::ConnectionFailed, %{407 "Proxy Authentication Required "}
128         else
129           raise Error::ConnectionFailed, err
130         end
131       rescue => err
132         if defined?(OpenSSL) && OpenSSL::SSL::SSLError === err
133           raise Faraday::SSLError, err
134         else
135           raise
136         end
137       end
138
139       # TODO: reuse the connection to support pipelining
140       def perform_single_request(env)
141         req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
142         req.setup_request(env[:method], request_config(env)).callback { |client|
143           save_response(env, client.response_header.status, client.response) do |resp_headers|
144             client.response_header.each do |name, value|
145               resp_headers[name.to_sym] = value
146             end
147           end
148         }
149       end
150
151       def error_message(client)
152         client.error or "request failed"
153       end
154
155       def raise_error(msg)
156         errklass = Faraday::Error::ClientError
157         if msg == Errno::ETIMEDOUT
158           errklass = Faraday::Error::TimeoutError
159           msg = "request timed out"
160         elsif msg == Errno::ECONNREFUSED
161           errklass = Faraday::Error::ConnectionFailed
162           msg = "connection refused"
163         elsif msg == "connection closed by server"
164           errklass = Faraday::Error::ConnectionFailed
165         end
166         raise errklass, msg
167       end
168
169       def parallel?(env)
170         !!env[:parallel_manager]
171       end
172
173       # The parallel manager is designed to start an EventMachine loop
174       # and block until all registered requests have been completed.
175       class Manager
176         def initialize
177           reset
178         end
179
180         def reset
181           @registered_procs = []
182           @num_registered = 0
183           @num_succeeded = 0
184           @errors = []
185           @running = false
186         end
187
188         def running?() @running end
189
190         def add
191           if running?
192             perform_request { yield }
193           else
194             @registered_procs << Proc.new
195           end
196           @num_registered += 1
197         end
198
199         def run
200           if @num_registered > 0
201             @running = true
202             EventMachine.run do
203               @registered_procs.each do |proc|
204                 perform_request(&proc)
205               end
206             end
207             if @errors.size > 0
208               raise Faraday::Error::ClientError, @errors.first || "connection failed"
209             end
210           end
211         ensure
212           reset
213         end
214
215         def perform_request
216           client = yield
217           client.callback { @num_succeeded += 1; check_finished }
218           client.errback { @errors << client.error; check_finished }
219         end
220
221         def check_finished
222           if @num_succeeded + @errors.size == @num_registered
223             EventMachine.stop
224           end
225         end
226       end
227     end
228   end
229 end
230
231 begin
232   require 'openssl'
233 rescue LoadError
234   warn "Warning: no such file to load -- openssl. Make sure it is installed if you want HTTPS support"
235 else
236   require 'faraday/adapter/em_http_ssl_patch'
237 end if Faraday::Adapter::EMHttp.loaded?