Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"
- DEFAULT_VHOST =
Default virtual host used for connection
"/"
- DEFAULT_USER =
Default username used for connection
"guest"
- DEFAULT_PASSWORD =
Default password used for connection
"guest"
- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server
- DEFAULT_CHANNEL_MAX =
CHANNEL_MAX_LIMIT
- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :basic.nack" => true, :connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", }
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
-
#network_recovery_interval ⇒ Object
readonly
Returns the value of attribute network_recovery_interval.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#close ⇒ Object
(also: #stop)
Closes the connection.
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #heartbeat_disabled?(val) ⇒ Boolean protected
-
#heartbeat_interval ⇒ Integer
Heartbeat interval used.
- #host ⇒ Object
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #ignoring_io_errors(&block) ⇒ Object protected
-
#initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
-
#manually_closed? ⇒ Boolean
True if this AMQP 0.9.1 connection has been programmatically closed.
- #normalize_client_channel_max(n) ⇒ Object protected
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
-
#password ⇒ String
Password used.
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #reset_address_index ⇒ Object
-
#start ⇒ Object
Starts the connection process.
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
-
#username ⇒ String
Username used.
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) ⇒ Session
Returns a new instance of Session
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/bunny/session.rb', line 130 def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) opts = case (connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN # we might need to log a warning about ill-formatted IPv6 address but # progname includes hostname, so init like this first @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 # re-init, see above @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) (opts) # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end @max_recovery_attempts = opts[:recovery_attempts] @recovery_attempts = @max_recovery_attempts # When this is set, connection attempts won't be reset after # successful reconnection. Some find this behavior more sensible # than the per-failure attempt counter. MK. @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @client_heartbeat = self.heartbeat_from(opts) @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(opts.fetch(:properties, {})) @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def channel_max @channel_max end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
90 91 92 |
# File 'lib/bunny/session.rb', line 90 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
88 89 90 |
# File 'lib/bunny/session.rb', line 88 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
86 87 88 |
# File 'lib/bunny/session.rb', line 86 def mechanism @mechanism end |
#network_recovery_interval ⇒ Object (readonly)
Returns the value of attribute network_recovery_interval
91 92 93 |
# File 'lib/bunny/session.rb', line 91 def network_recovery_interval @network_recovery_interval end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def pass @pass end |
#port ⇒ Object (readonly)
Returns the value of attribute port
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def port @port end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def server_properties @server_properties end |
#status ⇒ Object (readonly)
Returns the value of attribute status
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def threaded @threaded end |
#transport ⇒ Bunny::Transport (readonly)
80 81 82 |
# File 'lib/bunny/session.rb', line 80 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
462 463 464 |
# File 'lib/bunny/session.rb', line 462 def self.parse_uri(uri) AMQ::Settings.parse_amqp_url(uri) end |
Instance Method Details
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled
426 427 428 |
# File 'lib/bunny/session.rb', line 426 def automatically_recover? @automatically_recover end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise
454 455 456 |
# File 'lib/bunny/session.rb', line 454 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
803 804 805 806 807 808 809 810 811 812 813 |
# File 'lib/bunny/session.rb', line 803 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @origin_thread.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
815 816 817 818 819 820 821 822 823 824 825 826 827 |
# File 'lib/bunny/session.rb', line 815 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.}" ensure close_transport end end |
#close ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/bunny/session.rb', line 362 def close @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? close_all_channels self.close_connection(true) end clean_up_on_shutdown end @status_mutex.synchronize do @status = :closed @manually_closed = true end end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed
408 409 410 |
# File 'lib/bunny/session.rb', line 408 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing
403 404 405 |
# File 'lib/bunny/session.rb', line 403 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
269 270 271 272 273 |
# File 'lib/bunny/session.rb', line 269 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open
397 398 399 |
# File 'lib/bunny/session.rb', line 397 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/bunny/session.rb', line 345 def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)) ch.open ch end end end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
494 495 496 497 498 499 500 501 502 503 504 |
# File 'lib/bunny/session.rb', line 494 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#heartbeat_disabled?(val) ⇒ Boolean (protected)
1230 1231 1232 |
# File 'lib/bunny/session.rb', line 1230 def heartbeat_disabled?(val) 0 == val || val.nil? end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat interval used
233 |
# File 'lib/bunny/session.rb', line 233 def heartbeat_interval; self.heartbeat; end |
#host ⇒ Object
252 253 254 |
# File 'lib/bunny/session.rb', line 252 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used
224 |
# File 'lib/bunny/session.rb', line 224 def hostname; self.host; end |
#ignoring_io_errors(&block) ⇒ Object (protected)
1372 1373 1374 1375 1376 1377 1378 |
# File 'lib/bunny/session.rb', line 1372 def ignoring_io_errors(&block) begin block.call rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ # ignore end end |
#inspect ⇒ Object
1100 1101 1102 |
# File 'lib/bunny/session.rb', line 1100 def inspect to_s end |
#local_port ⇒ Integer
Returns Client socket port
276 277 278 |
# File 'lib/bunny/session.rb', line 276 def local_port @transport.local_address.ip_port end |
#manually_closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection has been programmatically closed
413 414 415 |
# File 'lib/bunny/session.rb', line 413 def manually_closed? @status_mutex.synchronize { @manually_closed == true } end |
#normalize_client_channel_max(n) ⇒ Object (protected)
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 |
# File 'lib/bunny/session.rb', line 1361 def normalize_client_channel_max(n) return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT case n when 0 then CHANNEL_MAX_LIMIT else n end end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
437 438 439 |
# File 'lib/bunny/session.rb', line 437 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
446 447 448 |
# File 'lib/bunny/session.rb', line 446 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open
418 419 420 421 422 |
# File 'lib/bunny/session.rb', line 418 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#password ⇒ String
Returns Password used
228 |
# File 'lib/bunny/session.rb', line 228 def password; self.pass; end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/bunny/session.rb', line 474 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#reset_address_index ⇒ Object
260 261 262 |
# File 'lib/bunny/session.rb', line 260 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#start ⇒ Object
Starts the connection process.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/bunny/session.rb', line 285 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect if @socket_configurator @transport.configure_socket(&@socket_configurator) end self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end @status_mutex.synchronize { @manually_closed = false } self end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity
248 249 250 |
# File 'lib/bunny/session.rb', line 248 def threaded? @threaded end |
#to_s ⇒ String
1095 1096 1097 1098 |
# File 'lib/bunny/session.rb', line 1095 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#username ⇒ String
Returns Username used
226 |
# File 'lib/bunny/session.rb', line 226 def username; self.user; end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL)
242 243 244 |
# File 'lib/bunny/session.rb', line 242 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL)
236 237 238 |
# File 'lib/bunny/session.rb', line 236 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
213 214 215 216 217 218 219 220 221 |
# File 'lib/bunny/session.rb', line 213 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one." end end |
#virtual_host ⇒ String
Returns Virtual host used
230 |
# File 'lib/bunny/session.rb', line 230 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
385 386 387 388 389 390 391 392 393 394 |
# File 'lib/bunny/session.rb', line 385 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |