class Cumulus::Kinesis::Manager
Public Class Methods
new()
click to toggle source
Calls superclass method
Cumulus::Common::Manager::new
# File lib/kinesis/manager/Manager.rb, line 14 def initialize super() @create_asset = true @client = Aws::Kinesis::Client.new(Configuration.instance.client) end
Public Instance Methods
added_diff(local)
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 36 def added_diff(local) StreamDiff.added(local) end
aws_resources()
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 28 def aws_resources @aws_resources ||= Kinesis::named_streams end
create(local)
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 67 def create(local) @client.create_stream({ stream_name: local.name, shard_count: local.shards }) @client.wait_until(:stream_exists, { stream_name: local.name }) # Describe the newly created stream created_stream = Kinesis::describe_stream(local.name) # If the stream retention period is different, then update it if created_stream.retention_period_hours > local.retention_period @client.decrease_stream_retention_period({ stream_name: local.name, retention_period_hours: local.retention_period }) elsif created_stream.retention_period_hours < local.retention_period @client.increase_stream_retention_period({ stream_name: local.name, retention_period_hours: local.retention_period }) end # If the created stream has tags, add them if !local.tags.empty? @client.add_tags_to_stream({ stream_name: local.name, tags: local.tags }) end end
diff_resource(local, aws)
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 40 def diff_resource(local, aws) local.diff(aws) end
local_resources()
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 24 def local_resources @local_resources ||= Hash[Loader.streams.map { |local| [local.name, local] }] end
migrate()
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 44 def migrate puts Colors.blue("Migrating Kinesis Streams...") # Create the directories streams_dir = "#{@migration_root}/kinesis" if !Dir.exists?(@migration_root) Dir.mkdir(@migration_root) end if !Dir.exists?(streams_dir) Dir.mkdir(streams_dir) end Kinesis::named_streams.each do |name, stream| puts "Migrating stream #{name}" cumulus_stream = StreamConfig.new(name).populate!(stream) json = JSON.pretty_generate(cumulus_stream.to_hash) File.open("#{streams_dir}/#{name}.json", "w") { |f| f.write(json) } end end
resource_name()
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 20 def resource_name "Kinesis Stream" end
unmanaged_diff(aws)
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 32 def unmanaged_diff(aws) StreamDiff.unmanaged(aws) end
update(local, diffs)
click to toggle source
# File lib/kinesis/manager/Manager.rb, line 103 def update(local, diffs) diffs.each do |diff| case diff.type when StreamChange::SHARDS # See if we are splitting or merging and make sure it is a multiple of 2 if diff.aws < diff.local if diff.aws != diff.local / 2.0 puts Colors.red("Can only increase the number of shards by a factor of 2") else aws_stream = Kinesis::named_streams[local.name] # Split the shards 1 at a time aws_stream.sorted_shards.each do |shard| puts Colors.blue("Splitting shard #{shard.shard_id}") # The splitting point is halfway between the hash start and end hash_start = shard.hash_key_range.starting_hash_key.to_i hash_end = shard.hash_key_range.ending_hash_key.to_i hash_split = hash_start + ((hash_end - hash_start) / 2) @client.split_shard({ stream_name: local.name, shard_to_split: shard.shard_id, new_starting_hash_key: hash_split.to_s }) # After every split we have to wait until the stream is ready @client.wait_until(:stream_exists, { stream_name: local.name }) end end elsif diff.aws > diff.local aws_stream = Kinesis::named_streams[local.name] if aws_stream.sorted_shards.length != local.shards * 2.0 puts Colors.red("Can only decrease the number of shards by a factor of 2") else # Merge the sorted shards in groups of 2 aws_stream.sorted_shards.each_slice(2) do |slice| puts Colors.blue("Merging shards #{slice[0].shard_id} and #{slice[1].shard_id}") @client.merge_shards({ stream_name: local.name, shard_to_merge: slice[0].shard_id, adjacent_shard_to_merge: slice[1].shard_id }) # After every merge we have to wait until the stream is ready @client.wait_until(:stream_exists, { stream_name: local.name }) end end end when StreamChange::RETENTION puts Colors.blue("Updating retention period...") if diff.aws > local.retention_period @client.decrease_stream_retention_period({ stream_name: local.name, retention_period_hours: local.retention_period }) elsif diff.aws < local.retention_period @client.increase_stream_retention_period({ stream_name: local.name, retention_period_hours: local.retention_period }) end # Wait for the stream to be in an active state or shard updates will fail @client.wait_until(:stream_exists, { stream_name: local.name }) when StreamChange::TAGS puts Colors.blue("Updating tags...") if !diff.tags_to_remove.empty? @client.remove_tags_from_stream({ stream_name: local.name, tag_keys: diff.tags_to_remove.keys }) end if !diff.tags_to_add.empty? @client.add_tags_to_stream({ stream_name: local.name, tags: diff.tags_to_add }) end end end end