def call(params)
client = params.gdc_gd_client
domain_name = params.organization || params.domain
fail "Either organisation or domain has to be specified in params" unless domain_name
domain = client.domain(domain_name) if domain_name
project = client.projects(params.gdc_project) || client.projects(params.gdc_project_id)
fail "Either project or project_id has to be specified in params" unless project
data_product = params.data_product
config = params.filters_config
fail 'User filters brick requires configuration how the filter should be setup. For this use the param "filters_config"' if config.blank?
symbolized_config = GoodData::Helpers.deep_dup(config)
symbolized_config = GoodData::Helpers.symbolize_keys(symbolized_config)
symbolized_config[:labels] = symbolized_config[:labels].map { |l| GoodData::Helpers.symbolize_keys(l) }
multiple_projects_column = params.multiple_projects_column
number_of_threads = Integer(params.number_of_threads || '10')
mode = params.sync_mode
unless MODES.include?(mode)
fail "The parameter \"sync_mode\" has to have one of the values #{MODES.map(&:to_s).join(', ')} or has to be empty."
end
user_filters = load_data(params, symbolized_config)
run_params = {
restrict_if_missing_all_values: params.restrict_if_missing_all_values == 'true',
ignore_missing_values: params.ignore_missing_values == 'true',
do_not_touch_filters_that_are_not_mentioned: params.do_not_touch_filters_that_are_not_mentioned == 'true',
domain: domain,
dry_run: params[:dry_run].to_b,
users_brick_input: params.users_brick_users
}
all_clients = domain.clients(:all, data_product).to_a
GoodData.gd_logger.info("Synchronizing in mode=#{mode}, number_of_clients=#{all_clients.size}, data_rows=#{user_filters.size} ,")
GoodData.logger.info("Synchronizing in mode \"#{mode}\"")
results = []
case mode
when 'sync_project', 'sync_one_project_based_on_pid', 'sync_one_project_based_on_custom_id'
if mode == 'sync_one_project_based_on_pid'
filter = project.pid
elsif mode == 'sync_one_project_based_on_custom_id'
filter = UserBricksHelper.resolve_client_id(domain, project, params.data_product)
end
user_filters = user_filters.select { |f| f[:pid] == filter } if filter
GoodData.gd_logger.info("Synchronizing in mode=#{mode}, project_id=#{project.pid}, data_rows=#{user_filters.size} ,")
current_results = sync_user_filters(project, user_filters, run_params, symbolized_config)
results.concat(current_results[:results]) unless current_results.nil? || current_results[:results].empty?
when 'sync_multiple_projects_based_on_pid', 'sync_multiple_projects_based_on_custom_id'
users_by_project = run_params[:users_brick_input].group_by { |u| u[:pid] }
user_filters.group_by { |u| u[:pid] }.flat_map.pmap do |id, new_filters|
users = users_by_project[id]
fail "The #{multiple_projects_column} cannot be empty" if id.blank?
if mode == 'sync_multiple_projects_based_on_custom_id'
c = all_clients.detect { |specific_client| specific_client.id == id }
current_project = c.project
elsif mode == 'sync_multiple_projects_based_on_pid'
current_project = client.projects(id)
end
GoodData.gd_logger.info("Synchronizing in mode=#{mode}, project_id=#{id}, data_rows=#{new_filters.size} ,")
current_results = sync_user_filters(current_project, new_filters, run_params.merge(users_brick_input: users), symbolized_config)
results.concat(current_results[:results]) unless current_results.nil? || current_results[:results].empty?
end
when 'sync_domain_client_workspaces'
domain_clients = all_clients
if params.segments
segment_uris = params.segments.map(&:uri)
domain_clients = domain_clients.select { |c| segment_uris.include?(c.segment_uri) }
end
working_client_ids = []
semaphore = Mutex.new
users_by_project = run_params[:users_brick_input].group_by { |u| u[:pid] }
user_filters.group_by { |u| u[multiple_projects_column] }.flat_map.pmap do |client_id, new_filters|
users = users_by_project[client_id]
fail "Client id cannot be empty" if client_id.blank?
c = all_clients.detect { |specific_client| specific_client.id == client_id }
if c.nil?
params.gdc_logger.warn "Client #{client_id} is not found"
next
end
if params.segments && !segment_uris.include?(c.segment_uri)
params.gdc_logger.warn "Client #{client_id} is outside segments_filter #{params.segments}"
next
end
current_project = c.project
fail "Client #{client_id} does not have project." unless current_project
semaphore.synchronize do
working_client_ids << client_id.to_s
end
GoodData.gd_logger.info("Synchronizing in mode=#{mode}, client_id=#{client_id}, data_rows=#{new_filters.size} ,")
partial_results = sync_user_filters(current_project, new_filters, run_params.merge(users_brick_input: users), symbolized_config)
results.concat(partial_results[:results]) unless partial_results.nil? || partial_results[:results].empty?
end
GoodData.gd_logger.info("Synchronizing in mode=#{mode}, working_client_ids=#{working_client_ids.join(', ')} ,") if working_client_ids.size < 50
unless run_params[:do_not_touch_filters_that_are_not_mentioned]
to_be_deleted_clients = UserBricksHelper.non_working_clients(domain_clients, working_client_ids)
to_be_deleted_clients.peach(number_of_threads) do |c|
begin
current_project = c.project
users = users_by_project[c.client_id]
params.gdc_logger.info "Delete all filters in project #{current_project.pid} of client #{c.client_id}"
GoodData.gd_logger.info("Delete all filters in project_id=#{current_project.pid}, client_id=#{c.client_id} ,")
current_results = sync_user_filters(current_project, [], run_params.merge(users_brick_input: users), symbolized_config)
results.concat(current_results[:results]) unless current_results.nil? || current_results[:results].empty?
rescue StandardError => e
params.gdc_logger.error "Failed to clear filters of #{c.client_id} due to: #{e.inspect}"
end
end
end
end
{
results: results
}
end