class Pf_Lab_Interface
Constants
- BARCODE
- BARCODES
- BASE_URL
- CATEGORIES
- CODE
- DEFAULT_LOOK_BACK_IN_SECONDS
will look back 12 hours if no previous request is found.
- DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS
time to keep old orders in memory 48 hours, expressed as seconds.
- FAILED_UPDATES
- FROM_EPOCH
- ID
- ITEMS
- LAST_REQUEST
- LIS_CODE
- ORDERS
- ORDERS_KEY
- ORDERS_SORTED_SET
- ORDERS_TO_UPDATE_PER_CYCLE
- PATIENTS_REDIS_LIST
- POLL_ENDPOINT
the last request that was made and what it said.
- PREV_REQUEST_COMPLETED
- PROCESSING_REDIS_LIST
- PUT_ENDPOINT
- REPORTS
- REQUIREMENTS
- RESULT_RAW
- SIZE
- SKIP
- TESTS
- TO_EPOCH
- UPDATE_QUEUE
- USE_CATEGORY_FOR_LIS
Attributes
should include the www.xyz.com:3000 defaults to localhost:3000
Public Class Methods
METHODS OVERRIDDEN FROM THE BASIC POLLER.
@param mpg : path to mappings file. Defaults to nil. @param lis_security_key
: the security key for the LIS organization, to be dowloaded from the organizations/show/id, endpoint in the website.
Poller::new
# File lib/publisher/pf_lab_interface.rb, line 370 def initialize(mpg=nil,lis_security_key,server_url_with_port,organization_id,private_key_hash) super(mpg) self.private_key_hash = private_key_hash self.event_source = "organizations/" + organization_id self.on_message_handler_function = "evented_poll_LIS_for_requisition" self.lis_security_key = lis_security_key self.server_url_with_port = (server_url_with_port || BASE_URL) self.retry_count = 0 ## called from stream module setup_connection AstmServer.log("Initialized Lab Interface") end
Public Instance Methods
# File lib/publisher/pf_lab_interface.rb, line 806 def _start evented_poll_LIS_for_requisition({"trigger_lis_poll" => {"epoch" => Time.now.to_i.to_s}}) reattempt_failed_updates update_LIS end
# File lib/publisher/pf_lab_interface.rb, line 270 def add_barcode(code,order_id) $redis.hset(BARCODES,code,order_id) end
@param order : order object, as a hash.
# File lib/publisher/pf_lab_interface.rb, line 198 def add_order(order) at_least_one_item_exists = false order[REPORTS].each do |report| test_machine_codes = report[TESTS].map{|c| $inverted_mappings[c[LIS_CODE]] }.compact.uniq report[REQUIREMENTS].each do |req| get_priority_category(req)[ITEMS].each do |item| if !item[BARCODE].blank? at_least_one_item_exists = true add_barcode(item[BARCODE],JSON.generate( { :order_id => order[ID], :machine_codes => test_machine_codes } )) elsif !item[CODE].blank? at_least_one_item_exists = true add_barcode(item[CODE],JSON.generate({ :order_id => order[ID], :machine_codes => test_machine_codes })) end end end end unless at_least_one_item_exists.blank? $redis.hset(ORDERS,order[ID],JSON.generate(order)) $redis.zadd(ORDERS_SORTED_SET,Time.now.to_i,order[ID]) end end
@param order : the existing order @param res : the result from the machine, pertaining to this order. @return @working : updates the results from res, into the order at the relevant tests inside the order. $MAPPINGS -> [MACHINE_CODE => LIS_CODE
] $INVERTED_MAPPINGS -> [LIS_CODE => MACHINE_CODE]
# File lib/publisher/pf_lab_interface.rb, line 243 def add_test_result(order,res,lis_code) #puts "res is:" #puts res.to_s order[REPORTS.to_sym].each do |report| #puts "doing report" report[TESTS.to_sym].each_with_index{|t,k| #puts "doing test" #puts t.to_s puts "teh test lis code to sym is:" puts t[LIS_CODE.to_sym] puts "lis code is: #{lis_code.to_s}" if t[LIS_CODE.to_sym] == lis_code.to_s puts "got equality" t[RESULT_RAW.to_sym] = res[:value] puts "set value" end } end end
def delete_last_request
$redis.del(LAST_REQUEST)
end
# File lib/publisher/pf_lab_interface.rb, line 283 def all_hits_downloaded?(last_request) last_request[PREV_REQUEST_COMPLETED].to_s == "true" end
so build request should have a from and a to what are the defaults ? @param from : default (nil) @param to : default(nil)
# File lib/publisher/pf_lab_interface.rb, line 302 def build_request(from=nil,to=nil) puts "entering build request with from: #{from} and to:#{to}" to ||= Time.now from ||= to - 1.day last_request = get_last_request params = nil if last_request.blank? AstmServer.log("no last request, making fresh request") params = fresh_request_params(from,to) else if all_hits_downloaded?(last_request) AstmServer.log("last request all hits have been downloaded, going for next request.") if last_request[TO_EPOCH].to_i == to.to_i return nil else params = fresh_request_params(last_request[TO_EPOCH],to) end else AstmServer.log("last request all hits not downloaded.") params = last_request end end params.merge!(lis_security_key: self.lis_security_key) AstmServer.log("reuqest params become: #{params}") AstmServer.log("sleeping") #sleep(10000) Typhoeus::Request.new(self.get_poll_url_path,params: params) end
commits the request params to redis. the response hash is expected to have whatever parameters were sent into it in the request. so it must always return: a -> how many it was told to skip (SKIP
) b -> from_epoch : from which epoch it was queried. c -> to_epoch : to which epoch it was queried.
# File lib/publisher/pf_lab_interface.rb, line 337 def commit_request_params_to_redis(response_hash) $redis.hset(LAST_REQUEST,SKIP,response_hash[SKIP].to_i + response_hash[ORDERS].size.to_i) $redis.hset(LAST_REQUEST,SIZE,response_hash[SIZE].to_i) $redis.hset(LAST_REQUEST,FROM_EPOCH,response_hash[FROM_EPOCH].to_i) $redis.hset(LAST_REQUEST,TO_EPOCH,response_hash[TO_EPOCH].to_i) $redis.hset(LAST_REQUEST,PREV_REQUEST_COMPLETED,request_size_completed?(response_hash).to_s) end
# File lib/publisher/pf_lab_interface.rb, line 415 def delete_completed_order(order_id) remove_order(order_id) end
this is triggered by whatever firebase sends you put this in the callback, and let me block and see what happens. we cannot watch two different endpoints ? or we can ? on the same endpoint -> will so it becomes a merged document. and both events will fire. and get triggered.
# File lib/publisher/pf_lab_interface.rb, line 392 def evented_poll_LIS_for_requisition(data) unless data.blank? data = data["data"].blank? ? data : data["data"] unless data["delete_order"].blank? puts "delete order is not blank" unless data["delete_order"]["order_id"].blank? puts "order id is not blank" puts "going to delete the completed order --------------->" delete_completed_order(data["delete_order"]["order_id"]) end end unless data["trigger_lis_poll"].blank? unless data["trigger_lis_poll"]["epoch"].blank? new_poll_LIS_for_requisition(data["trigger_lis_poll"]["epoch"].to_i) end end else end end
@param from : time object @param to : time object
# File lib/publisher/pf_lab_interface.rb, line 289 def fresh_request_params(from,to) #puts "came to make fresh request params, with from epoch: #{from_epoch}" params = {} params[TO_EPOCH] = to.to_i params[FROM_EPOCH] = from.to_i params[SKIP] = 0 params end
@return the entry at the barcode, or nil. key (order_id) value (array of tests registered on that barcode, the names of the tests are the machine codes, and not the lis_codes) this key is generated originally in add_barcode
# File lib/publisher/pf_lab_interface.rb, line 167 def get_barcode(barcode) if barcode_hash = $redis.hget(BARCODES,barcode) JSON.parse(barcode_hash).deep_symbolize_keys else nil end end
# File lib/publisher/pf_lab_interface.rb, line 274 def get_last_request $redis.hgetall(LAST_REQUEST) end
# File lib/publisher/pf_lab_interface.rb, line 175 def get_order(order_id) if order_string = $redis.hget(ORDERS,order_id) JSON.parse(order_string).deep_symbolize_keys else nil end end
# File lib/publisher/pf_lab_interface.rb, line 797 def get_poll_url_path self.server_url_with_port + POLL_ENDPOINT end
@param req : the requirement hash. @return priority_category : the category which has been chosen as the top priority for the requirement.
# File lib/publisher/pf_lab_interface.rb, line 185 def get_priority_category(req) priority_category = req[CATEGORIES].select{|c| c[USE_CATEGORY_FOR_LIS] == 1 } if priority_category.blank? priority_category = req[CATEGORIES][0] else priority_category = priority_category[0] end priority_category end
# File lib/publisher/pf_lab_interface.rb, line 801 def get_put_url_path self.server_url_with_port + PUT_ENDPOINT end
# File lib/publisher/pf_lab_interface.rb, line 431 def new_poll_LIS_for_requisition(to_epoch=nil) AstmServer.log(to_epoch.to_s) while true orders = [] begin Retriable.retriable(on: PfDownloadException) do self.retry_count+=1 AstmServer.log("retrying----->") request = build_request(nil,to_epoch) break if request.blank? request.run response = request.response if response.success? code = response.code time = response.total_time headers = response.headers #AstmServer.log("successfully polled server") response_hash = JSON.parse(response.body) #AstmServer.log("Pathofast LIS poll response --->") #AstmServer.log(response_hash.to_s) orders = response_hash[ORDERS] orders.each do |order| add_order(order) end commit_request_params_to_redis(response_hash) #puts "are the orders blank: #{orders.blank?}" #break if orders.blank? elsif response.timed_out? #AstmServer.log("Error polling server with code: #{code}") raise PfDownloadException.new("timeout") elsif response.code == 0 #AstmServer.log("Error polling server with code: #{code}") raise PfDownloadException.new("didnt get any http response") else #AstmServer.log("Error polling server with code: #{code}") raise PfDownloadException.new("non 200 response") end end rescue => e puts e.to_s puts "raised exception-----------> breaking." ## retryable has raised the errors again. break else ## break only if the orders are blank. break if orders.blank? end end end
this method is redundant, and no longer used the whole thing is now purely evented.
# File lib/publisher/pf_lab_interface.rb, line 817 def poll end
how it deletes records so this is called in the form of a while loop. how it handles the update response.
# File lib/publisher/pf_lab_interface.rb, line 488 def poll_LIS_for_requisition(to_epoch=nil) AstmServer.log(to_epoch.to_s) while true #puts "came back to true" request = build_request(nil,to_epoch) break if request.blank? request.run response = request.response code = response.code time = response.total_time headers = response.headers if code.to_s != "200" AstmServer.log("Error polling server with code: #{code}") break else AstmServer.log("successfully polled server") response_hash = JSON.parse(response.body) AstmServer.log("Pathofast LIS poll response --->") #AstmServer.log(response_hash.to_s) orders = response_hash[ORDERS] orders.each do |order| add_order(order) end commit_request_params_to_redis(response_hash) puts "are the orders blank: #{orders.blank?}" break if orders.blank? end end end
data = [
{ :id => "ARUBA", :results => [ { :name => "TLCparam", :value => 10 }, { :name => "Nparam", :value => 23 }, { :name => "ANCparam", :value => 25 }, { :name => "Lparam", :value => 10 }, { :name => "ALCparam", :value => 44 }, { :name => "Mparam", :value => 55 }, { :name => "AMCparam", :value => 22 }, { :name => "Eparam", :value => 222 }, { :name => "AECparam", :value => 21 }, { :name => "BASOparam", :value => 222 }, { :name => "ABCparam", :value => 300 }, { :name => "RBCparam", :value => 2.22 }, { :name => "HBparam", :value => 19 }, { :name => "HCTparam", :value => 22 }, { :name => "MCVparam", :value => 222 }, { :name => "MCHparam", :value => 21 }, { :name => "MCHCparam", :value => 10 }, { :name => "MCVparam", :value => 222 }, { :name => "RDWCVparam", :value => 12 }, { :name => "PCparam", :value => 1.22322 } ] } ]
# File lib/publisher/pf_lab_interface.rb, line 608 def process_update_queue #puts "came to process update queue." order_ids = [] #puts $redis.lrange UPDATE_QUEUE, 0, -1 ## first push that to patient. ## first create that order and add that barcode. ## for citrate. ## then let that get downloaded. ## so keep on test going for that. ## ## why complicate this so much. ## just do a brpop? ## ORDERS_TO_UPDATE_PER_CYCLE.times do |n| order_ids << $redis.rpop(UPDATE_QUEUE) end #puts "order ids popped" #puts order_ids.to_s order_ids.compact! order_ids.uniq! orders = order_ids.map{|c| get_order(c) }.compact #puts orders[0].to_s #puts "orders are:" #puts orders.size #exit(1) req = Typhoeus::Request.new(self.get_put_url_path, method: :put, body: {orders: orders}.to_json, params: {lis_security_key: self.lis_security_key}, headers: {Accept: 'application/json', "Content-Type".to_sym => 'application/json'}) req.on_complete do |response| if response.success? response_body = response.body orders = JSON.parse(response.body)["orders"] #puts orders.to_s orders.values.each do |order| #puts order.to_s if order["errors"].blank? else puts "got an error for the order." ## how many total error attempts to manage. end end ## here we have to raise. elsif response.timed_out? AstmServer.log("got a time out") raise PfUpdateException.new("update order timed out") elsif response.code == 0 AstmServer.log(response.return_message) raise PfUpdateException.new("update order response code 0") else AstmServer.log("HTTP request failed: " + response.code.to_s) raise PfUpdateException.new("update order response code non success: #{response.code}") end end req.run end
# File lib/publisher/pf_lab_interface.rb, line 419 def put_delete_order_event(order_id) puts self.connection.put(self.event_source,:order_id => order_id) end
# File lib/publisher/pf_lab_interface.rb, line 265 def queue_order_for_update(order) update_order(order) $redis.lpush(UPDATE_QUEUE,order[ID.to_sym]) end
this is only done on startup okay so what do we
# File lib/publisher/pf_lab_interface.rb, line 690 def reattempt_failed_updates $redis.scard(FAILED_UPDATES).times do if patient_results = $redis.spop(FAILED_UPDATES) patient_results = JSON.parse(patient_results) begin Retriable.retriable(on: PfUpdateException) do unless update(patient_results) raise PfUpdateException.new("didnt get any http response") end end rescue => e AstmServer.log("reattempted and failed") ensure end end end end
# File lib/publisher/pf_lab_interface.rb, line 158 def remove_barcode(barcode) return if barcode.blank? $redis.hdel(BARCODES,barcode) end
removes any orders that start from now - 4 days ago now - 2 days ago
# File lib/publisher/pf_lab_interface.rb, line 674 def remove_old_orders stale_order_ids = $redis.zrangebyscore(ORDERS_SORTED_SET,(Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS*2).to_s, (Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS)) $redis.pipelined do stale_order_ids.each do |order_id| remove_order(order_id) end end end
UTILITY METHOD FOR THE ORDER AND BARCODE
HASHES ADD AND REMOVE
# File lib/publisher/pf_lab_interface.rb, line 140 def remove_order(order_id) order = get_order(order_id) puts "order id is:#{order_id} is" unless order.blank? puts "order not blank." order[:reports].each do |report| report[:tests].each do |test| remove_barcode(test[:barcode]) remove_barcode(test[:code]) end end $redis.hdel(ORDERS,order_id) $redis.zrem(ORDERS_SORTED_SET,order_id) else puts "order is blank." end end
since we request only a certain set of orders per request we need to know if the earlier request has been completed or we still need to rerequest the same time frame again.
# File lib/publisher/pf_lab_interface.rb, line 348 def request_size_completed?(response_hash) #puts response_hash.to_s response_hash[SKIP].to_i + response_hash[ORDERS].size >= response_hash[SIZE].to_i end
# File lib/publisher/pf_lab_interface.rb, line 427 def test_trigger_delete_order(order_id) puts self.connection.put(self.event_source + "/delete_order", :order_id => order_id) end
# File lib/publisher/pf_lab_interface.rb, line 423 def test_trigger_lis_poll(epoch=nil) puts self.connection.put(self.event_source + "/trigger_lis_poll", :epoch => epoch) end
# File lib/publisher/pf_lab_interface.rb, line 746 def update(data) puts "data is:" puts JSON.pretty_generate(data) data[ORDERS_KEY].each do |order| puts "order is " puts order barcode = order["id"] results = order["results"] puts "barcode is: #{barcode}, results are : #{results}" results.deep_symbolize_keys! if barcode_hash = get_barcode(barcode) puts "barcode hash is: #{barcode_hash}" if order = get_order(barcode_hash[:order_id]) puts "got order" ## update the test results, and add the order to the final update hash. #puts "order got from barcode is:" #puts order machine_codes = barcode_hash[:machine_codes] puts "machine codes: #{machine_codes}" results.keys.each do |lis_code| res = results[lis_code] add_test_result(order,res,lis_code) end =begin results.values.each do |res| if machine_codes.include? res[:name] ## so we need to update to the requisite test inside the order. puts "came to add test result" puts res add_test_result(order,res,nil) ## commit to redis ## and then end end =end #puts "came to queue order for update" queue_order_for_update(order) end else AstmServer.log("the barcode:#{barcode}, does not exist in the barcodes hash") ## does not exist. end end process_update_queue end
we can do this. args can be used to modulate exit behaviours @param args : hash of arguments
# File lib/publisher/pf_lab_interface.rb, line 712 def update_LIS(args={}) prepare_redis exit_requested = false Kernel.trap( "INT" ) { exit_requested = true } while !exit_requested puts "exit not requested." if patient_results = $redis.brpoplpush(PATIENTS_REDIS_LIST,PROCESSING_REDIS_LIST,0) puts "got patient results." patient_results = JSON.parse(patient_results) puts "patient results are:" puts JSON.pretty_generate(patient_results) begin Retriable.retriable(on: PfUpdateException) do unless update(patient_results) raise PfUpdateException.new("didnt get any http response") end end exit_requested = !args[:exit_on_success].blank? #puts "exit requested becomes: #{exit_requested}" rescue => e $redis.sadd(FAILED_UPDATES,JSON.generate(patient_results)) exit_requested = !args[:exit_on_failure].blank? puts "came to eventual rescue, exit requested is: #{exit_requested}" ensure $redis.lpop("processing") end else puts "no patient results" end end end
start work on simple.
# File lib/publisher/pf_lab_interface.rb, line 233 def update_order(order) $redis.hset(ORDERS,order[ID.to_sym],JSON.generate(order)) end