module DbSucker::Application::Container::SSH
Constants
- ChannelOpenFailedError
- CommandExecutionError
Public Instance Methods
blocking_channel(ssh = nil, result = nil, &block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 108 def blocking_channel ssh = nil, result = nil, &block waitlock = Queue.new (ssh || ssh_start).open_channel do |ch| waitlock.pop block.call(ch) end.tap do |ch| # catch open_fail errors ch.on_open_failed do |_ch, code, desc| result.try(:close!) _ch[:open_failed] = true raise ChannelOpenFailedError, "#{code}: #{desc}" end ch[:wait_monitor] = Monitor.new ch[:wait_condition] = ch[:wait_monitor].new_cond st = app.sklaventreiber waitlock << true if !ssh && st && st.sync{ st.try(:poll) } ch[:wait_monitor].synchronize do ch[:wait_condition].wait(0.1) while ch.active? end else ch.wait end end end
blocking_channel_result(cmd, opts = {})
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 156 def blocking_channel_result cmd, opts = {} opts = opts.reverse_merge(ssh: nil, blocking: true, channel: false, request_pty: false, use_sh: false) if opts[:use_sh] cmd = %{/bin/sh -c 'echo $(ps -o pgid= $$ | grep -o [0-9]*) && #{cmd}'} pid_monitor = Monitor.new pid_signal = pid_monitor.new_cond end result = EventedResultset.new chan = send(opts[:blocking] ? :blocking_channel : :nonblocking_channel, opts[:ssh], result) do |ch| chproc = ->(ch, cmd, result) { ch.exec(cmd) do |ch, success| Thread.main[:app].debug "START: #{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{cmd} #{success}" raise CommandExecutionError, "could not execute command" unless success # "on_data" is called when the process writes something to stdout ch.on_data do |c, data| Thread.main[:app].debug "#{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{Time.current.to_f}: STDOUT: #{data}".chomp if opts[:use_sh] && result.empty? ch[:pid] = data.to_i ch[:pid] = false if ch[:pid].zero? pid_monitor.synchronize { pid_signal.broadcast } if opts[:use_sh] next end result.enq(data, :stdout) end # "on_extended_data" is called when the process writes something to stderr ch.on_extended_data do |c, type, data| Thread.main[:app].debug "#{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{Time.current.to_f}: STDERR: #{data}".chomp result.enq(data, :stderr) end ch.on_request "exit-status" do |ch, data| Thread.main[:app].debug "#{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{Time.current.to_f}: EXIT: #{data.read_long} #{cmd}".chomp end ch.on_close do |ch| ch[:wait_monitor].synchronize { ch[:wait_condition].broadcast } Thread.main[:app].debug "#{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{Time.current.to_f}: CLOSED: #{cmd}".chomp end ch.on_eof do Thread.main[:app].debug "#{Thread.current == Thread.main ? :main : Thread.current[:itype]}-#{Time.current.to_f}: EOF: #{cmd}".chomp result.close! ch[:handler].try(:signal) end end } if opts[:request_pty] ch.request_pty do |ch, success| raise CommandExecutionError, "could not obtain pty" unless success ch[:pty] = true chproc.call(ch, cmd, result) end else chproc.call(ch, cmd, result) end end pid_monitor.synchronize { pid_signal.wait(1) if !chan[:pid] } if opts[:use_sh] opts[:channel] ? [chan, result] : result end
kill_remote_process(pid, sig = :INT)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 150 def kill_remote_process pid, sig = :INT ssh_start(true) do |ssh| blocking_channel_result("kill -#{sig} -#{pid}", ssh: ssh) end end
loop_ssh(*args, &block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 103 def loop_ssh *args, &block return false unless @ssh @ssh.loop(*args, &block) end
nonblocking_channel(ssh = nil, result = nil, &block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 135 def nonblocking_channel ssh = nil, result = nil, &block (ssh || ssh_start).open_channel do |ch| ch[:wait_monitor] = Monitor.new ch[:wait_condition] = ch[:wait_monitor].new_cond block.call(ch) end.tap do |ch| # catch open_fail errors ch.on_open_failed do |_ch, code, desc| result.try(:close!) _ch[:open_failed] = true raise ChannelOpenFailedError, "#{code}: #{desc}" end end end
nonblocking_channel_result(cmd, opts = {})
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 218 def nonblocking_channel_result cmd, opts = {} blocking_channel_result(cmd, opts.merge(blocking: false)) end
sftp_begin() { |sftp| ... }
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 56 def sftp_begin debug "Opening SFTP connection for identifier `#{name}'" @sftp = sftp_start begin yield(@sftp) ensure sftp_end end if block_given? end
sftp_end()
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 66 def sftp_end return unless @sftp debug "Closing SFTP connection for identifier `#{name}'" @sftp.try(:close) rescue false debug "CLOSED SFTP connection for identifier `#{name}'" @sftp = nil end
sftp_start(new_connection = false, &block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 78 def sftp_start new_connection = false, &block if @sftp && !new_connection sftp_sync do debug "Reusing SFTP connection in start for identifier `#{name}'" return block ? block.call(@sftp) : @sftp end end debug "Opening new SFTP connection in start for identifier `#{name}'" opt = {} opt[:user] = source["ssh"]["username"] if source["ssh"]["username"].present? opt[:password] = source["ssh"]["password"] if source["ssh"]["password"].present? opt[:keys] = ssh_key_files if ssh_key_files.any? opt[:port] = source["ssh"]["port"] if source["ssh"]["port"].present? if block Net::SFTP.start(source["ssh"]["hostname"], nil, opt) do |sftp| block.call(sftp) end else Net::SFTP.start(source["ssh"]["hostname"], nil, opt) end end
sftp_sync(&block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 74 def sftp_sync &block @sftp_mutex.synchronize(&block) end
ssh_begin() { |ssh| ... }
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 9 def ssh_begin debug "Opening SSH connection for identifier `#{name}'" @ssh = ssh_start begin yield(@ssh) ensure ssh_end end if block_given? end
ssh_end()
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 19 def ssh_end return unless @ssh debug "Closing SSH connection for identifier `#{name}'" @ssh.try(:close) rescue false debug "CLOSED SSH connection for identifier `#{name}'" @ssh = nil end
ssh_start(new_connection = false, &block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 31 def ssh_start new_connection = false, &block if @ssh && !new_connection ssh_sync do debug "Reusing SSH connection in start for identifier `#{name}'" return block ? block.call(@ssh) : @ssh end end debug "Opening new SSH connection in start for identifier `#{name}'" opt = {} opt[:user] = source["ssh"]["username"] if source["ssh"]["username"].present? opt[:password] = source["ssh"]["password"] if source["ssh"]["password"].present? opt[:keys] = ssh_key_files if ssh_key_files.any? opt[:port] = source["ssh"]["port"] if source["ssh"]["port"].present? if block Net::SSH.start(source["ssh"]["hostname"], nil, opt) do |ssh| block.call(ssh) end else Net::SSH.start(source["ssh"]["hostname"], nil, opt) end end
ssh_sync(&block)
click to toggle source
# File lib/db_sucker/application/container/ssh.rb, line 27 def ssh_sync &block @ssh_mutex.synchronize(&block) end