class Bricolage::Job

Attributes

global_variables[R]
id[R]
params[R]
script[R]
variables[R]

Public Class Methods

instantiate(id, class_id, ctx) click to toggle source

For standalone job (command line mode)

# File lib/bricolage/job.rb, line 33
def Job.instantiate(id, class_id, ctx)
  new(id, JobClass.get(class_id), ctx).tap {|job|
    job.init_global_variables
  }
end
load_file(path, ctx) click to toggle source

For standalone job (.job file mode)

# File lib/bricolage/job.rb, line 22
def Job.load_file(path, ctx)
  f = JobFile.load(ctx, path)
  instantiate(f.job_id, f.class_id, ctx).tap {|job|
    job.bind_parameters f.values
    f.global_variables.each do |name, value|
      job.global_variables[name.to_s] = value
    end
  }
end
load_ref(ref, jobnet_context) click to toggle source

For JobNetRunner

# File lib/bricolage/job.rb, line 15
def Job.load_ref(ref, jobnet_context)
  ctx = jobnet_context.subsystem(ref.subsystem)
  path = ctx.job_file(ref.name)
  load_file(path, ctx)
end
new(id, job_class, context) click to toggle source
# File lib/bricolage/job.rb, line 39
def initialize(id, job_class, context)
  @id = id
  @job_class = job_class
  @context = context
  @global_variables = nil
  @param_decls = @job_class.get_parameters
  @param_vals = nil      # Parameters::IntermediateValues by *.job
  @param_vals_opt = nil  # Parameters::IntermediateValues by options
  @params = nil
  @variables = nil
end

Public Instance Methods

bind_parameters(values) click to toggle source

For job file

# File lib/bricolage/job.rb, line 75
def bind_parameters(values)
  @param_vals = @param_decls.parse_direct_values(values)
end
class_id() click to toggle source
# File lib/bricolage/job.rb, line 53
def class_id
  @job_class.id
end
compile() click to toggle source
# File lib/bricolage/job.rb, line 84
def compile
  param_vals_default = @param_decls.parse_default_values(@global_variables.get_force('defaults'))
  @job_class.invoke_parameters_filter(self)

  job_file_rest_vars = @param_vals ? @param_vals.variables : Variables.new
  job_v_opt_vars = @param_vals_opt ? @param_vals_opt.variables : Variables.new

  # We use different variable set for paramter expansion and
  # SQL variable expansion.  Parameter expansion uses global
  # variables and "-v" option variables (both of global and job).
  base_vars = Variables.union(
    #          ^ Low precedence
    @global_variables,
    job_v_opt_vars
    #          v High precedence
  )
  pvals = @param_decls.union_intermediate_values(*[param_vals_default, @param_vals, @param_vals_opt].compact)
  @params = pvals.resolve(@context, base_vars.resolve)

  # Then, expand SQL variables and check with declarations.
  vars = Variables.union(
    #          ^ Low precedence
    declarations.default_variables,
    @global_variables,
    @params.variables,   # Like $dest_table
    job_file_rest_vars,
    job_v_opt_vars
    #          v High precedence
  )
  @variables = vars.resolve
  @variables.bind_declarations declarations

  @script = @job_class.get_script(@params)
  @script.bind @context, @variables
end
declarations() click to toggle source
# File lib/bricolage/job.rb, line 129
def declarations
  @declarations ||= @job_class.get_declarations(@params)
end
execute(log_locator: LogLocator.empty) click to toggle source
# File lib/bricolage/job.rb, line 143
def execute(log_locator: LogLocator.empty)
  log_locator.redirect_stdouts {
    do_execute
  }
end
execute_in_process(log_locator:) click to toggle source
# File lib/bricolage/job.rb, line 149
def execute_in_process(log_locator:)
  # ??? FIXME: status_path should be independent from log_path.
  # Also, status_path should be defined regardless of log_path.
  status_path = log_locator.path ? "#{log_locator.path}.status" : nil
  isolate_process(status_path) {
    log_locator.redirect_stdouts {
      do_execute
    }
  }
end
explain() click to toggle source
# File lib/bricolage/job.rb, line 138
def explain
  raise 'Job#explain called before #compile' unless @script
  @script.run_explain
end
init_global_variables() click to toggle source
# File lib/bricolage/job.rb, line 61
def init_global_variables
  # Context#global_variables loads file on each call,
  # updating @global_variables is multi-thread safe.
  @global_variables = @context.global_variables
  @global_variables['bricolage_cwd'] = Dir.pwd
  @global_variables['bricolage_job_dir'] = @context.job_dir.to_s
end
parsing_options(&block) click to toggle source

For command line options

# File lib/bricolage/job.rb, line 80
def parsing_options(&block)
  @param_vals_opt = @param_decls.parsing_options(&block)
end
provide_default(name, value) click to toggle source
# File lib/bricolage/job.rb, line 120
def provide_default(name, value)
  @param_vals[name] ||= value if @param_vals
end
provide_sql_file_by_job_id() click to toggle source

Called from jobclasses (parameters_filter)

# File lib/bricolage/job.rb, line 125
def provide_sql_file_by_job_id
  provide_default 'sql-file', @id if @id
end
script_source() click to toggle source
# File lib/bricolage/job.rb, line 133
def script_source
  raise 'Job#script_source called before #compile' unless @script
  @script.source
end
subsystem() click to toggle source
# File lib/bricolage/job.rb, line 57
def subsystem
  @context.subsystem_name
end

Private Instance Methods

do_execute() click to toggle source
# File lib/bricolage/job.rb, line 162
def do_execute
  ENV['BRICOLAGE_PID'] = Process.pid.to_s
  logger = @context.logger
  logger.info "#{@context.environment} environment"
  result = logger.with_elapsed_time {
    script.run
  }
  logger.info result.status_string
  result
rescue JobFailure => ex
  logger.error ex.message
  logger.error "failure: #{ex.message}"
  return JobResult.failure(ex)
rescue Exception => ex
  logger.exception ex
  logger.error "error: #{ex.class}: #{ex.message}"
  return JobResult.error(ex)
end
isolate_process(status_path) { || ... } click to toggle source
# File lib/bricolage/job.rb, line 181
def isolate_process(status_path)
  cpid = Process.fork {
    Process.setproctitle "bricolage [#{@id}]"
    result = yield
    save_result result, status_path
    exit result.status
  }
  _, st = Process.waitpid2(cpid)
  restore_result(st, status_path)
end
read_if_exist(path) click to toggle source
# File lib/bricolage/job.rb, line 217
def read_if_exist(path)
  File.read(path)
rescue
  nil
end
restore_message(status_path) click to toggle source
# File lib/bricolage/job.rb, line 207
def restore_message(status_path)
  return nil unless status_path
  begin
    msg = read_if_exist(status_path)
    msg ? msg.strip : nil
  ensure
    FileUtils.rm_f status_path
  end
end
restore_result(st, status_path) click to toggle source
# File lib/bricolage/job.rb, line 203
def restore_result(st, status_path)
  JobResult.for_process_status(st, restore_message(status_path))
end
save_result(result, status_path) click to toggle source
# File lib/bricolage/job.rb, line 192
def save_result(result, status_path)
  return if result.success?
  return unless status_path
  begin
    File.open(status_path, 'w') {|f|
      f.puts result.message
    }
  rescue
  end
end