class Pf_Lab_Interface

Constants

BARCODE
BARCODES
BASE_URL
CATEGORIES
CODE
DEFAULT_LOOK_BACK_IN_SECONDS

will look back 12 hours if no previous request is found.

DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS

time to keep old orders in memory 48 hours, expressed as seconds.

FAILED_UPDATES
FROM_EPOCH
ID
ITEMS
LAST_REQUEST
LIS_CODE
ORDERS
ORDERS_KEY
ORDERS_SORTED_SET
ORDERS_TO_UPDATE_PER_CYCLE
PATIENTS_REDIS_LIST
POLL_ENDPOINT

the last request that was made and what it said.

PREV_REQUEST_COMPLETED
PROCESSING_REDIS_LIST
PUT_ENDPOINT
REPORTS
REQUIREMENTS
RESULT_RAW
SIZE
SKIP
TESTS
TO_EPOCH
UPDATE_QUEUE
USE_CATEGORY_FOR_LIS

Attributes

lis_security_key[RW]
retry_count[RW]
server_url_with_port[RW]

should include the www.xyz.com:3000 defaults to localhost:3000

Public Class Methods

new(mpg=nil,lis_security_key,server_url_with_port,organization_id,private_key_hash) click to toggle source

METHODS OVERRIDDEN FROM THE BASIC POLLER.

@param mpg : path to mappings file. Defaults to nil. @param lis_security_key : the security key for the LIS organization, to be dowloaded from the organizations/show/id, endpoint in the website.

Calls superclass method Poller::new
# File lib/publisher/pf_lab_interface.rb, line 370
def initialize(mpg=nil,lis_security_key,server_url_with_port,organization_id,private_key_hash)
    super(mpg)
    self.private_key_hash = private_key_hash
    self.event_source = "organizations/" + organization_id
    self.on_message_handler_function = "evented_poll_LIS_for_requisition"
    self.lis_security_key = lis_security_key
    
    self.server_url_with_port = (server_url_with_port || BASE_URL)
    self.retry_count = 0
    ## called from stream module
    setup_connection
    AstmServer.log("Initialized Lab Interface")
end

Public Instance Methods

_start() click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 806
def _start
        evented_poll_LIS_for_requisition({"trigger_lis_poll" => {"epoch" => Time.now.to_i.to_s}})
        reattempt_failed_updates
        update_LIS
end
add_barcode(code,order_id) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 270
def add_barcode(code,order_id)
        $redis.hset(BARCODES,code,order_id)
end
add_order(order) click to toggle source

@param order : order object, as a hash.

# File lib/publisher/pf_lab_interface.rb, line 198
def add_order(order)
        at_least_one_item_exists = false
        order[REPORTS].each do |report|
                test_machine_codes = report[TESTS].map{|c|
                        $inverted_mappings[c[LIS_CODE]]
                }.compact.uniq
                report[REQUIREMENTS].each do |req|
                        get_priority_category(req)[ITEMS].each do |item|
                                if !item[BARCODE].blank?
                                        at_least_one_item_exists = true
                                        add_barcode(item[BARCODE],JSON.generate(
                                                {
                                                        :order_id => order[ID],
                                                        :machine_codes => test_machine_codes
                                                }
                                        ))
                                elsif !item[CODE].blank?
                                        at_least_one_item_exists = true
                                        add_barcode(item[CODE],JSON.generate({
                                                        :order_id => order[ID],
                                                        :machine_codes => test_machine_codes
                                                }))
                                end
                        end
                end
        end
        
        unless at_least_one_item_exists.blank?
                $redis.hset(ORDERS,order[ID],JSON.generate(order))
                $redis.zadd(ORDERS_SORTED_SET,Time.now.to_i,order[ID])
        end
end
add_test_result(order,res,lis_code) click to toggle source

@param order : the existing order @param res : the result from the machine, pertaining to this order. @return @working : updates the results from res, into the order at the relevant tests inside the order. $MAPPINGS -> [MACHINE_CODE => LIS_CODE] $INVERTED_MAPPINGS -> [LIS_CODE => MACHINE_CODE]

# File lib/publisher/pf_lab_interface.rb, line 243
def add_test_result(order,res,lis_code)
        #puts "res is:"
        #puts res.to_s

        order[REPORTS.to_sym].each do |report|
                #puts "doing report"
                report[TESTS.to_sym].each_with_index{|t,k|
                        #puts "doing test"
                        #puts t.to_s
                        
                        puts "teh test lis code to sym is:"
                        puts t[LIS_CODE.to_sym]
                        puts "lis code is: #{lis_code.to_s}"
                        if t[LIS_CODE.to_sym] == lis_code.to_s
                                puts "got equality"
                                t[RESULT_RAW.to_sym] = res[:value]
                                puts "set value"
                        end
                }
        end
end
all_hits_downloaded?(last_request) click to toggle source

def delete_last_request

$redis.del(LAST_REQUEST)

end

# File lib/publisher/pf_lab_interface.rb, line 283
def all_hits_downloaded?(last_request)
        last_request[PREV_REQUEST_COMPLETED].to_s == "true"
end
build_request(from=nil,to=nil) click to toggle source

so build request should have a from and a to what are the defaults ? @param from : default (nil) @param to : default(nil)

# File lib/publisher/pf_lab_interface.rb, line 302
def build_request(from=nil,to=nil)
        puts "entering build request with from: #{from} and to:#{to}"
        to ||= Time.now
        from ||= to - 1.day
        last_request = get_last_request
        params = nil
        if last_request.blank?
                AstmServer.log("no last request, making fresh request")
                params = fresh_request_params(from,to)
        else
                if all_hits_downloaded?(last_request)
                        AstmServer.log("last request all hits have been downloaded, going for next request.")
                        if last_request[TO_EPOCH].to_i == to.to_i
                                return nil
                        else
                                params = fresh_request_params(last_request[TO_EPOCH],to)
                        end
                else
                        AstmServer.log("last request all hits not downloaded.")
                        params = last_request
                end 
        end
        params.merge!(lis_security_key: self.lis_security_key)
        AstmServer.log("reuqest params become: #{params}")
        AstmServer.log("sleeping")
        #sleep(10000)
        Typhoeus::Request.new(self.get_poll_url_path,params: params)
end
commit_request_params_to_redis(response_hash) click to toggle source

commits the request params to redis. the response hash is expected to have whatever parameters were sent into it in the request. so it must always return: a -> how many it was told to skip (SKIP) b -> from_epoch : from which epoch it was queried. c -> to_epoch : to which epoch it was queried.

# File lib/publisher/pf_lab_interface.rb, line 337
def commit_request_params_to_redis(response_hash)
        $redis.hset(LAST_REQUEST,SKIP,response_hash[SKIP].to_i + response_hash[ORDERS].size.to_i)
        $redis.hset(LAST_REQUEST,SIZE,response_hash[SIZE].to_i)
        $redis.hset(LAST_REQUEST,FROM_EPOCH,response_hash[FROM_EPOCH].to_i)
        $redis.hset(LAST_REQUEST,TO_EPOCH,response_hash[TO_EPOCH].to_i)
        $redis.hset(LAST_REQUEST,PREV_REQUEST_COMPLETED,request_size_completed?(response_hash).to_s)
end
delete_completed_order(order_id) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 415
def delete_completed_order(order_id)
        remove_order(order_id)
end
evented_poll_LIS_for_requisition(data) click to toggle source

this is triggered by whatever firebase sends you put this in the callback, and let me block and see what happens. we cannot watch two different endpoints ? or we can ? on the same endpoint -> will so it becomes a merged document. and both events will fire. and get triggered.

# File lib/publisher/pf_lab_interface.rb, line 392
def evented_poll_LIS_for_requisition(data)
        unless data.blank?
                
                data = data["data"].blank? ? data : data["data"]

                unless data["delete_order"].blank?
                        puts "delete order is not blank"
                        unless data["delete_order"]["order_id"].blank?
                                puts "order id is not blank"
                                puts "going to delete the completed order --------------->"
                                delete_completed_order(data["delete_order"]["order_id"])
                        end
                end
                unless data["trigger_lis_poll"].blank?
                        unless data["trigger_lis_poll"]["epoch"].blank?
                                new_poll_LIS_for_requisition(data["trigger_lis_poll"]["epoch"].to_i)
                        end
                end
        else

        end
end
fresh_request_params(from,to) click to toggle source

@param from : time object @param to : time object

# File lib/publisher/pf_lab_interface.rb, line 289
def fresh_request_params(from,to)
        #puts "came to make fresh request params, with from epoch: #{from_epoch}"
        params = {}
        params[TO_EPOCH] = to.to_i
        params[FROM_EPOCH] = from.to_i
        params[SKIP] = 0
        params
end
get_barcode(barcode) click to toggle source

@return the entry at the barcode, or nil. key (order_id) value (array of tests registered on that barcode, the names of the tests are the machine codes, and not the lis_codes) this key is generated originally in add_barcode

# File lib/publisher/pf_lab_interface.rb, line 167
def get_barcode(barcode)       
        if barcode_hash = $redis.hget(BARCODES,barcode)
                JSON.parse(barcode_hash).deep_symbolize_keys
        else
                nil
        end
end
get_last_request() click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 274
def get_last_request
        $redis.hgetall(LAST_REQUEST)
end
get_order(order_id) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 175
def get_order(order_id)
        if order_string = $redis.hget(ORDERS,order_id)
                JSON.parse(order_string).deep_symbolize_keys
        else
                nil
        end
end
get_poll_url_path() click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 797
def get_poll_url_path
        self.server_url_with_port + POLL_ENDPOINT
end
get_priority_category(req) click to toggle source

@param req : the requirement hash. @return priority_category : the category which has been chosen as the top priority for the requirement.

# File lib/publisher/pf_lab_interface.rb, line 185
def get_priority_category(req)
        priority_category = req[CATEGORIES].select{|c| 
                c[USE_CATEGORY_FOR_LIS] == 1
        }
        if priority_category.blank?
                priority_category = req[CATEGORIES][0]
        else
                priority_category = priority_category[0]
        end
        priority_category
end
get_put_url_path() click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 801
def get_put_url_path
        self.server_url_with_port + PUT_ENDPOINT
end
new_poll_LIS_for_requisition(to_epoch=nil) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 431
def new_poll_LIS_for_requisition(to_epoch=nil)
        AstmServer.log(to_epoch.to_s)
        while true
                orders = []
                begin
                        Retriable.retriable(on: PfDownloadException) do 
                                self.retry_count+=1
                                AstmServer.log("retrying----->")
                                request = build_request(nil,to_epoch)
                                break if request.blank?
                                request.run
                                response = request.response
                                if response.success?
                                        code = response.code
                                        time = response.total_time
                                        headers = response.headers
                                        #AstmServer.log("successfully polled server")
                                        response_hash = JSON.parse(response.body)
                                    #AstmServer.log("Pathofast LIS poll response --->")
                                    #AstmServer.log(response_hash.to_s)
                                    orders = response_hash[ORDERS]
                                    orders.each do |order|
                                           add_order(order) 
                                    end
                                    commit_request_params_to_redis(response_hash)
                                    #puts "are the orders blank: #{orders.blank?}"
                                    #break if orders.blank?
                                elsif response.timed_out?
                                        #AstmServer.log("Error polling server with code: #{code}")
                                        raise PfDownloadException.new("timeout")
                                elsif response.code == 0
                                        #AstmServer.log("Error polling server with code: #{code}")
                                        raise PfDownloadException.new("didnt get any http response")
                                else
                                        #AstmServer.log("Error polling server with code: #{code}")
                                        raise PfDownloadException.new("non 200 response")
                                end
                        end
                rescue => e
                        puts e.to_s
                        puts "raised exception-----------> breaking."
                        ## retryable has raised the errors again.
                        break
                else
                        ## break only if the orders are blank.
                        break if orders.blank?
                end
        end
end
poll() click to toggle source

this method is redundant, and no longer used the whole thing is now purely evented.

# File lib/publisher/pf_lab_interface.rb, line 817
def poll
end
poll_LIS_for_requisition(to_epoch=nil) click to toggle source

how it deletes records so this is called in the form of a while loop. how it handles the update response.

# File lib/publisher/pf_lab_interface.rb, line 488
def poll_LIS_for_requisition(to_epoch=nil)
        AstmServer.log(to_epoch.to_s)
        while true
                #puts "came back to true"
                request = build_request(nil,to_epoch)
                break if request.blank?
                request.run
                response = request.response
                code = response.code
                time = response.total_time
                headers = response.headers
                if code.to_s != "200"
                        AstmServer.log("Error polling server with code: #{code}")
                        break
                else
                        AstmServer.log("successfully polled server")
                        response_hash = JSON.parse(response.body)
                    AstmServer.log("Pathofast LIS poll response --->")
                    #AstmServer.log(response_hash.to_s)
                    orders = response_hash[ORDERS]
                    orders.each do |order|
                     add_order(order) 
                    end
                    commit_request_params_to_redis(response_hash)
                    puts "are the orders blank: #{orders.blank?}"
                    break if orders.blank?
                end
        end
end
process_update_queue() click to toggle source

data = [

  {
    :id => "ARUBA",
    :results => [
      {
        :name => "TLCparam",
        :value => 10
      },
      {
        :name => "Nparam",
        :value => 23
      },
      {
        :name => "ANCparam",
        :value => 25
      },
      {
        :name => "Lparam",
        :value => 10
      },
      {
        :name => "ALCparam",
        :value => 44
      },
      {
        :name => "Mparam",
        :value => 55
      },
      {
        :name => "AMCparam",
        :value => 22
      },
      {
        :name => "Eparam",
        :value => 222
      },
      {
        :name => "AECparam",
        :value => 21
      },
      {
        :name => "BASOparam",
        :value => 222
      },
      {
        :name => "ABCparam",
        :value => 300
      },
      {
        :name => "RBCparam",
        :value => 2.22
      },
      {
        :name => "HBparam",
        :value => 19
      },
      {
        :name => "HCTparam",
        :value => 22
      },
      {
        :name => "MCVparam",
        :value => 222
      },
      {
        :name => "MCHparam",
        :value => 21
      },
      {
        :name => "MCHCparam",
        :value => 10
      },
      {
        :name => "MCVparam",
        :value => 222
      },
      {
        :name => "RDWCVparam",
        :value => 12
      },
      {
        :name => "PCparam",
        :value => 1.22322
      }
    ]
  }
]
# File lib/publisher/pf_lab_interface.rb, line 608
def process_update_queue
        #puts "came to process update queue."
        order_ids = []
        #puts $redis.lrange UPDATE_QUEUE, 0, -1
                
        ## first push that to patient.
        ## first create that order and add that barcode.
        ## for citrate.
        ## then let that get downloaded.
        ## so keep on test going for that.
        ##
        ## why complicate this so much.
        ## just do a brpop?
        ##
        ORDERS_TO_UPDATE_PER_CYCLE.times do |n|
                order_ids << $redis.rpop(UPDATE_QUEUE)
        end
        #puts "order ids popped"
        #puts order_ids.to_s
        order_ids.compact!
        order_ids.uniq!
        orders = order_ids.map{|c|
                get_order(c)
        }.compact
        #puts orders[0].to_s
        
        #puts "orders are:"
        #puts orders.size
        #exit(1)

        req = Typhoeus::Request.new(self.get_put_url_path, method: :put, body: {orders: orders}.to_json, params: {lis_security_key: self.lis_security_key}, headers: {Accept: 'application/json', "Content-Type".to_sym => 'application/json'})


        req.on_complete do |response|
                if response.success?
                    response_body = response.body
                    orders = JSON.parse(response.body)["orders"]
                    #puts orders.to_s
                    orders.values.each do |order|
                     #puts order.to_s
                     if order["errors"].blank?
                     else
                             puts "got an error for the order."
                             ## how many total error attempts to manage.
                     end
                    end
                    ## here we have to raise.
                elsif response.timed_out?
                    AstmServer.log("got a time out")
                    raise PfUpdateException.new("update order timed out")
                elsif response.code == 0
                    AstmServer.log(response.return_message)
                    raise PfUpdateException.new("update order response code 0")
                else
                    AstmServer.log("HTTP request failed: " + response.code.to_s)
                    raise PfUpdateException.new("update order response code non success: #{response.code}")
                end
        end

        req.run

end
put_delete_order_event(order_id) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 419
def put_delete_order_event(order_id)
        puts self.connection.put(self.event_source,:order_id => order_id)
end
queue_order_for_update(order) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 265
def queue_order_for_update(order)
        update_order(order)
        $redis.lpush(UPDATE_QUEUE,order[ID.to_sym])
end
reattempt_failed_updates() click to toggle source

this is only done on startup okay so what do we

# File lib/publisher/pf_lab_interface.rb, line 690
def reattempt_failed_updates
        $redis.scard(FAILED_UPDATES).times do 
                if patient_results = $redis.spop(FAILED_UPDATES)
                        patient_results = JSON.parse(patient_results)
                        begin
                                Retriable.retriable(on: PfUpdateException) do 
                                        unless update(patient_results)
                                                raise PfUpdateException.new("didnt get any http response")
                                        end
                                end
                        rescue => e
                                AstmServer.log("reattempted and failed")
                        ensure

                        end
                end
        end
end
remove_barcode(barcode) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 158
def remove_barcode(barcode)
        return if barcode.blank?
        $redis.hdel(BARCODES,barcode)
end
remove_old_orders() click to toggle source

removes any orders that start from now - 4 days ago now - 2 days ago

# File lib/publisher/pf_lab_interface.rb, line 674
def remove_old_orders
        stale_order_ids = $redis.zrangebyscore(ORDERS_SORTED_SET,(Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS*2).to_s, (Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS))
        $redis.pipelined do 
                stale_order_ids.each do |order_id|
                        remove_order(order_id)
                end
        end
end
remove_order(order_id) click to toggle source

UTILITY METHOD FOR THE ORDER AND BARCODE HASHES ADD AND REMOVE

# File lib/publisher/pf_lab_interface.rb, line 140
def remove_order(order_id)
        order = get_order(order_id)
        puts "order id is:#{order_id} is"
        unless order.blank?
                puts "order not blank."
                order[:reports].each do |report|
                        report[:tests].each do |test|
                                remove_barcode(test[:barcode])
                                remove_barcode(test[:code])
                        end                 
                end
                $redis.hdel(ORDERS,order_id)
                $redis.zrem(ORDERS_SORTED_SET,order_id)
        else
                puts "order is blank."
        end
end
request_size_completed?(response_hash) click to toggle source

since we request only a certain set of orders per request we need to know if the earlier request has been completed or we still need to rerequest the same time frame again.

# File lib/publisher/pf_lab_interface.rb, line 348
def request_size_completed?(response_hash)
        #puts response_hash.to_s
        response_hash[SKIP].to_i + response_hash[ORDERS].size >= response_hash[SIZE].to_i
end
test_trigger_delete_order(order_id) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 427
def test_trigger_delete_order(order_id)
        puts self.connection.put(self.event_source + "/delete_order", :order_id => order_id)
end
test_trigger_lis_poll(epoch=nil) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 423
def test_trigger_lis_poll(epoch=nil)
        puts self.connection.put(self.event_source + "/trigger_lis_poll", :epoch => epoch)
end
update(data) click to toggle source
# File lib/publisher/pf_lab_interface.rb, line 746
        def update(data)
                puts "data is:"
                puts JSON.pretty_generate(data)
                data[ORDERS_KEY].each do |order|
                        puts "order is "
                        puts order
                        barcode = order["id"]
                        results = order["results"]
                        puts "barcode is: #{barcode}, results are : #{results}"
                        results.deep_symbolize_keys!
                        if barcode_hash = get_barcode(barcode)
                                puts "barcode hash is: #{barcode_hash}"
                                if order = get_order(barcode_hash[:order_id])
                                        puts "got order"
                                        ## update the test results, and add the order to the final update hash.
                                        #puts "order got from barcode is:"
                                        #puts order
                                        machine_codes = barcode_hash[:machine_codes]
                                        puts "machine codes: #{machine_codes}"

                                        results.keys.each do |lis_code|
                                                res = results[lis_code]
                                                add_test_result(order,res,lis_code)
                                        end

=begin
                                        results.values.each do |res|
                                                if machine_codes.include? res[:name]
                                                        ## so we need to update to the requisite test inside the order.
                                                        puts "came to add test result"
                                                        puts res
                                                        add_test_result(order,res,nil)   
                                                        ## commit to redis
                                                        ## and then 
                                                end
                                        end
=end
                                        #puts "came to queue order for update"
                                        queue_order_for_update(order)
                                end
                        else
                                AstmServer.log("the barcode:#{barcode}, does not exist in the barcodes hash")
                                ## does not exist.
                        end
                end

                process_update_queue
                
        
        end
update_LIS(args={}) click to toggle source

we can do this. args can be used to modulate exit behaviours @param args : hash of arguments

# File lib/publisher/pf_lab_interface.rb, line 712
def update_LIS(args={})
        prepare_redis
        exit_requested = false
        Kernel.trap( "INT" ) { exit_requested = true }
        while !exit_requested
                puts "exit not requested."
                if patient_results = $redis.brpoplpush(PATIENTS_REDIS_LIST,PROCESSING_REDIS_LIST,0)
                        puts "got patient results."
                        patient_results = JSON.parse(patient_results)
                        puts "patient results are:"
                        puts JSON.pretty_generate(patient_results)
                        begin
                                Retriable.retriable(on: PfUpdateException) do 
                                        unless update(patient_results)
                                                raise PfUpdateException.new("didnt get any http response")
                                        end
                                end
                                exit_requested = !args[:exit_on_success].blank?
                                #puts "exit requested becomes: #{exit_requested}"
                        rescue => e
                                $redis.sadd(FAILED_UPDATES,JSON.generate(patient_results))
                                exit_requested = !args[:exit_on_failure].blank?
                                puts "came to eventual rescue, exit requested is: #{exit_requested}"
                        ensure
                                $redis.lpop("processing")
                        end
                else
                        puts "no patient results"
                end
        end
end
update_order(order) click to toggle source

start work on simple.

# File lib/publisher/pf_lab_interface.rb, line 233
def update_order(order)
        $redis.hset(ORDERS,order[ID.to_sym],JSON.generate(order))
end