class Tengine::Job::Runtime::SshJob
ルートジョブネットを他のジョブネット内に展開するための特殊なテンプレート用Vertex。
Public Instance Methods
ハンドリングするドライバ: ジョブ制御ドライバ スクリプトのプロセスのPIDを取得できたときに実行されます
# File lib/tengine/job/runtime/ssh_job.rb, line 388 def ack(signal) self.executing_pid = (signal.data || {})[:executing_pid] self.phase_key = :running end
ハンドリングするドライバ: ジョブ制御ドライバ
# File lib/tengine/job/runtime/ssh_job.rb, line 337 def activate(signal) case phase_key when :initialized then # 特別ルール「starting直前stop」 # initializedに戻されたジョブに対して、:readyになる際にtransmitで送信されたイベントを受け取って、 # activateしようとすると状態は遷移しないが、後続のエッジを実行する。 # (エッジを実行しようとした際、エッジがclosedならばそのジョブネットのEndに遷移する。) next_edges.first.transmit(signal) when :ready then self.phase_key = :starting self.started_at = signal.event.occurred_at signal.call_later do complete_origin_edge(signal) execution = signal.execution if execution.retry if execution.target_actual_ids.include?(self.id.to_s) execution.ack(signal) elsif execution.target_actuals.map{|t| t.parent.id.to_s if t.parent }.include?(self.parent.id.to_s) # 自身とTengine::Job::Runtime::Execution#target_actual_idsに含まれるジョブ/ジョブネットと親が同じならば、ackしない else parent.ack(signal) end else parent.ack(signal) # 再実行でない場合 end # このコールバックはjob_control_driverでupdate_with_lockの外側から # 再度呼び出してもらうためにcallbackを設定しています signal.call_later do # 実際にSSHでスクリプトを実行 execution = signal.execution execution.signal = signal # ackを呼び返してもらうための苦肉の策 begin run(execution) rescue Tengine::Job::Runtime::SshJob::Error => e Tengine.logger.warn("error on run\nerror: #{e.inspect}\njob: #{self.inspect}\nexecution: #{execution.inspect}") signal.call_later do self.fail(signal, :message => e.message) end end end end when :starting then raise "something wrong! #{self.inspect}" end end
# File lib/tengine/job/runtime/ssh_job.rb, line 314 def add_error_message(msg) self.error_messages ||= [] self.error_messages += [msg] end
def ack(pid)
@acked_pid = pid self.executing_pid = pid self.phase_key = :running self.previous_edges.each{|edge| edge.status_key = :transmitted}
end
# File lib/tengine/job/runtime/ssh_job.rb, line 202 def build_command(execution) result = [] mm_env = build_mm_env(execution).map{|k,v| "#{k}=#{v}"}.join(" ") # Hadoopジョブの場合は環境変数をセットする if is_a?(Tengine::Job::Runtime::Jobnet) && (jobnet_type_key == :hadoop_job_run) mm_env << ' ' << hadoop_job_env end result << "export #{mm_env}" template_root = (parent ? root_or_expansion.template : nil) if template_root template_job = template_root.vertex_by_name_path(self.name_path_until_expansion) unless template_job raise "job not found #{self.name_path_until_expansion.inspect} in #{template_root.inspect}" end key = Tengine::Job::Dsl::Loader.template_block_store_key(template_job, :preparation) preparation_block = Tengine::Job::Dsl::Loader.template_block_store[key] if preparation_block preparation = instance_eval(&preparation_block) unless preparation.blank? result << preparation end end end unless execution.preparation_command.blank? result << execution.preparation_command end # cmdはユーザーが設定したスクリプトを組み立てたもので、 # プロセスの監視/強制停止のためにtengine_job_agent/bin/tengine_job_agent_run # からこれらを実行させるためにはcmdを編集します。 # tengine_job_agent_runは、標準出力に監視対象となる起動したプロセスのPIDを出力します。 runner_path = ENV["MM_RUNNER_PATH"] || executable_command("tengine_job_agent_run") runner_option = "" # 実装するべきか要検討 # runner_option << " --stdout" if execution.keeping_stdout # runner_option << " --stderr" if execution.keeping_stderr # script = "#{runner_path}#{runner_option} -- #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください script = "#{runner_path}#{runner_option} #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください result << script result.join(" && ") end
MMから実行されるシェルスクリプトに渡す環境変数のHashを返します。 MM_ACTUAL_JOB_ID : 実行される末端のジョブのMM上でのID MM_ACTUAL_JOB_ANCESTOR_IDS : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (テンプレートジョブ単位) MM_FULL_ACTUAL_JOB_ANCESTOR_IDS : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (expansionから展開した単位) MM_ACTUAL_JOB_NAME_PATH : 実行される末端のジョブのname_path MM_ACTUAL_JOB_SECURITY_TOKEN : 公開API呼び出しのためのセキュリティ用のワンタイムトークン MM_TEMPLATE_JOB_ID : テンプレートジョブ(=実行される末端のジョブの元となったジョブ)のID MM_TEMPLATE_JOB_ANCESTOR_IDS : テンプレートジョブの祖先のMM上でのIDをセミコロンで繋げたもの MM_SCHEDULE_ID : 実行スケジュールのID MM_SCHEDULE_ESTIMATED_TIME : 実行スケジュールの見積り時間。単位は分。 MM_SCHEDULE_ESTIMATED_END : 実行スケジュールの見積り終了時刻をYYYYMMDDHHMMSS式で。(できればISO 8601など、タイムゾーンも表現できる標準的な形式の方が良い?) MM_MASTER_SCHEDULE_ID : マスタースケジュールがあればそのID。マスタースケジュールがない場合は環境変数は指定されません。
未実装 MM_FAILED_JOB_ID : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブのMM上でのID。 MM_FAILED_JOB_ANCESTOR_IDS : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブの祖先のMM上でのIDをセミコロンで繋げた文字列。
# File lib/tengine/job/runtime/ssh_job.rb, line 267 def build_mm_env(execution) result = { "MM_SERVER_NAME" => actual_server_name, # [Tengineの仕様として追加] ジョブの実行サーバ名を設定 "MM_ROOT_JOBNET_ID" => root.id.to_s, "MM_TARGET_JOBNET_ID" => (parent ? parent.id.to_s : nil), "MM_ACTUAL_JOB_ID" => id.to_s, "MM_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors_until_expansion.map(&:id).map(&:to_s).join(';'), "MM_FULL_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors.map(&:id).map(&:to_s).join(';'), "MM_ACTUAL_JOB_NAME_PATH" => name_path.dump, "MM_ACTUAL_JOB_SECURITY_TOKEN" => "", # TODO トークンの生成 "MM_SCHEDULE_ID" => execution.id.to_s, "MM_SCHEDULE_ESTIMATED_TIME" => execution.estimated_time, } if estimated_end = execution.actual_estimated_end result["MM_SCHEDULE_ESTIMATED_END"] = estimated_end.strftime("%Y%m%d%H%M%S") end if rjt = (parent ? root.template : nil) t = rjt.find_descendant_by_name_path(self.name_path) unless t template_name_parts = self.name_path_until_expansion.split(Tengine::Job::Structure::NamePath::SEPARATOR).select{|s| !s.empty?} root_jobnet_name = template_name_parts.first if rjt = Tengine::Job::Template::RootJobnet.find_by_name(root_jobnet_name, :version => rjt.dsl_version) t = rjt.find_descendant_by_name_path(self.name_path_until_expansion) raise "template job #{name_path.inspect} not found in #{rjt.inspect}" unless t else raise "Tengine::Job::Template::RootJobnet not found #{self.name_path_until_expansion.inspect}" end end result.update({ "MM_TEMPLATE_JOB_ID" => t.id.to_s, "MM_TEMPLATE_JOB_ANCESTOR_IDS" => '"%s"' % t.ancestors.map(&:id).map(&:to_s).join(';'), }) end # if ms = execution.master_schedule # result.update({ # "MM_MASTER_SCHEDULE_ID" => ms.id.to_s, # }) # end result end
# File lib/tengine/job/runtime/ssh_job.rb, line 243 def executable_command(command) if prefix = ENV["MM_CMD_PREFIX"] "#{prefix} #{command}" else command end end
# File lib/tengine/job/runtime/ssh_job.rb, line 139 def execute(cmd, &block) raise "actual_server not found for #{self.name_path.inspect}" unless actual_server Tengine.logger.info("connecting to #{actual_server.hostname_or_ipv4}") port = actual_server.properties["ssh_port"] || 22 keys_only = actual_credential.auth_type_cd == :ssh_public_key Net::SSH.start(actual_server.hostname_or_ipv4, actual_credential, :port => port, :logger => Tengine.logger, :keys_only => keys_only) do |ssh| # see http://net-ssh.github.com/ssh/v2/api/classes/Net/SSH/Connection/Channel.html c = ssh.open_channel do |ch0| ch0.request_pty do |channel, success| raise Error, "failed to request_pty" unless success channel.exec("#{ENV['SHELL']} -l") do |shell_ch, success| raise Error, "failed to \"#{ENV['SHELL']} -l\"" unless success shell_ch.on_extended_data do |ch, type, data| add_error_message(data) raise Error, "Failure to execute #{self.name_path} via SSH: #{data}" end client = ShellClient.new(shell_ch, cmd, block) shell_ch[:client] = client client.setup client.start end end end c.wait end rescue Tengine::Job::Runtime::SshJob::Error raise rescue Mongoid::Errors::DocumentNotFound, SocketError, Net::SSH::AuthenticationFailed => src error = Error.new("[#{src.class.name}] #{src.message}") error.set_backtrace(src.backtrace) raise error rescue Exception # puts "[#{$!.class.name}] #{$!.message}" raise end
ハンドリングするドライバ: ジョブ制御ドライバ
# File lib/tengine/job/runtime/ssh_job.rb, line 418 def fail(signal, options = nil) self.phase_key = :error if msg = signal.event[:message] add_error_message(msg) end if options && (msg = options[:message]) add_error_message(msg) end self.finished_at = signal.event.occurred_at event_options = { :exit_status => self.exit_status, :target_jobnet_id => parent.id, :target_jobnet_name_path => parent.name_path, :target_job_id => self.id, :target_job_name_path => self.name_path, } event_options.update(options) if options signal.fire(self, :"error.job.job.tengine", event_options) end
# File lib/tengine/job/runtime/ssh_job.rb, line 395 def finish(signal) self.exit_status = signal.event[:exit_status] self.finished_at = signal.event.occurred_at (self.exit_status.to_s == '0') ? succeed(signal) : fail(signal) end
# File lib/tengine/job/runtime/ssh_job.rb, line 439 def fire_stop(signal) signal.fire(self, :"stop.job.job.tengine", { :stop_reason => signal.event[:stop_reason], :target_jobnet_id => parent.id, :target_jobnet_name_path => parent.name_path, :target_job_id => self.id, :target_job_name_path => self.name_path, }) end
# File lib/tengine/job/runtime/ssh_job.rb, line 308 def hadoop_job_env s = children.select{|c| c.is_a?(Tengine::Job::Runtime::Jobnet) && (c.jobnet_type_key == :hadoop_job)}. map{|c| "#{c.name}\\t#{c.id.to_s}\\n"}.join "MM_HADOOP_JOBS=\"#{s}\"" end
# File lib/tengine/job/runtime/ssh_job.rb, line 178 def kill(execution) lines = [] if self.executing_pid.blank? Tengine.logger.warn("PID is blank when kill!!\n#{self.inspect}\n " << caller.join("\n ")) end cmd = executable_command("tengine_job_agent_kill %s %d %s" % [ self.executing_pid, self.actual_killing_signal_interval, self.actual_killing_signals.join(","), ]) lines << cmd cmd = lines.join(' && ') execute(cmd) end
# File lib/tengine/job/runtime/ssh_job.rb, line 23 def prepare_server_and_credential if t = template_vertex self.server_name = t.actual_server_name if server_name.blank? self.credential_name = t.actual_credential_name if credential_name.blank? end end
# File lib/tengine/job/runtime/ssh_job.rb, line 479 def reset(signal, &block) self.phase_key = :initialized reset_followings(signal) end
# File lib/tengine/job/runtime/ssh_job.rb, line 30 def run(execution) return ack(@acked_pid) if @acked_pid cmd = build_command(execution) # puts "cmd:\n" << cmd execute(cmd) do |ch, data| pids = data.strip.scan(/^\d+$/) case pids.length when 0 then add_error_message("expected a set of numeric charactors but not found in: " << data.inspect) raise Error, "Failure to execute #{self.name_path} via SSH. expected numeric charactors but got: #{data}" when 1 then pid = pids.first.strip Tengine.logger.info("got pid: #{pid.inspect}") else add_error_message("expected a set of numeric charactors but got #{pids.length} sets of numeric charactoers #{pids.inspect} in #{data.inspect}") raise Error, "Failure to execute #{self.name_path} via SSH. expected numeric charactors but got: #{data}" end if signal = execution.signal signal.call_later do signal.data = {:executing_pid => pid} # このブロック内の処理はupdate_with_lockによって複数回実行されることがあります。 # 1回目と同じリロードされていないオブジェクトを2回目以降に使用すると、1回目の変更が残っているので # そのオブジェクトに対して処理を行うのはNGです。 # self.ack(signal) # これはNG # このブロックが実行されるたびに、rootからselfと同じidのオブジェクトを新たに取得する必要があります。 job = root.vertex(self.id) job.ack(signal) end end end rescue Exception => e Tengine.logger.error("[#{e.class}] #{e.message}\n " << e.backtrace.join("\n ")) raise end
# File lib/tengine/job/runtime/ssh_job.rb, line 450 def stop(signal, &block) case phase_key when :ready then self.phase_key = :initialized self.stopped_at = signal.event.occurred_at self.stop_reason = signal.event[:stop_reason] next_edges.first.transmit(signal) when :starting then job = nil loop do # root = self.root.reload # class.find(self.root.id) # job = root.find_descendant(self.id) job = self.class.find(self.id) break unless job.phase_key == :starting yield if block_given? # テストの為にyieldしています sleep(0.1) end job.stop(signal, &block) when :running then self.phase_key = :dying self.stopped_at = signal.event.occurred_at self.stop_reason = signal.event[:stop_reason] signal.call_later do kill(signal.execution) end end end
ハンドリングするドライバ: ジョブ制御ドライバ
# File lib/tengine/job/runtime/ssh_job.rb, line 404 def succeed(signal) self.phase_key = :success self.finished_at = signal.event.occurred_at signal.fire(self, :"success.job.job.tengine", { :exit_status => self.exit_status, :target_jobnet_id => parent.id, :target_jobnet_name_path => parent.name_path, :target_job_id => self.id, :target_job_name_path => self.name_path, }) end
ハンドリングするドライバ: ジョブネット制御ドライバ
# File lib/tengine/job/runtime/ssh_job.rb, line 323 def transmit(signal) self.phase_key = :ready self.started_at = signal.event.occurred_at signal.fire(self, :"start.job.job.tengine", { :target_jobnet_id => parent.id, :target_jobnet_name_path => parent.name_path, :target_job_id => self.id, :target_job_name_path => self.name_path, }) end