Class: Egis::Table
- Inherits:
-
Object
- Object
- Egis::Table
- Defined in:
- lib/egis/table.rb
Overview
Interface for Athena table manipulation.
It is recommended to create table objects using Database#table method.
Constant Summary collapse
- DEFAULT_OPTIONS =
{format: :tsv}.freeze
Instance Attribute Summary collapse
- #database ⇒ Egis::Database readonly
-
#name ⇒ String
readonly
Athena table name.
-
#schema ⇒ Egis::TableSchema
readonly
Table's schema object.
Instance Method Summary collapse
-
#add_partitions(partitions) ⇒ void
Creates partitions with all possible combinations of given partition values.
-
#add_partitions!(partitions) ⇒ void
Creates partitions with all possible combinations of given partition values.
-
#create ⇒ void
Creates table in Athena.
-
#create! ⇒ void
The same as #create but raising error when table with a given name already exists.
-
#discover_partitions ⇒ void
Tells Athena to automatically discover table's partitions by scanning table's S3 location.
-
#download_data ⇒ Array
Downloads table contents into memory.
-
#format ⇒ Object
Table data format.
-
#initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) ⇒ Table
constructor
A new instance of Table.
-
#location ⇒ String
Table location URL.
-
#upload_data(rows) ⇒ void
Insert data into the table.
-
#wipe_data(partitions: nil) ⇒ void
Removes table's content on S3.
Constructor Details
#initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) ⇒ Table
Returns a new instance of Table.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/egis/table.rb', line 19 def initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) @database = database @name = name @schema = schema @location = location @options = DEFAULT_OPTIONS.merge() @partitions_generator = partitions_generator @table_ddl_generator = table_ddl_generator @output_downloader = output_downloader @output_parser = output_parser @table_data_wiper = table_data_wiper end |
Instance Attribute Details
#database ⇒ Egis::Database (readonly)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/egis/table.rb', line 16 class Table DEFAULT_OPTIONS = {format: :tsv}.freeze def initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) @database = database @name = name @schema = schema @location = location @options = DEFAULT_OPTIONS.merge() @partitions_generator = partitions_generator @table_ddl_generator = table_ddl_generator @output_downloader = output_downloader @output_parser = output_parser @table_data_wiper = table_data_wiper end attr_reader :database, :name, :schema ## # Creates table in Athena. # # @return [void] def create log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # The same as {#create} but raising error when table with a given name already exists. # # @return [void] def create! log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # Creates partitions with all possible combinations of given partition values. # # @example # table.add_partitions(year: [2000, 2001], type: ['user']) # # @param [Hash] partitions # @return [void] def add_partitions(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # (see add_partitions) # It raises error when a partition already exists. def add_partitions!(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # Tells Athena to automatically discover table's partitions by scanning table's S3 location. # This operation might take long time with big number of partitions. If that's the case, instead of this method use # {#add_partitions} to define partitions manually. # # @return [void] def discover_partitions database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true) end ## # Insert data into the table. Mostly useful for testing purposes. # # @example Insert with array of arrays # table.upload_data([ # ['hello world', 'mx', 1], # ['hello again', 'us', 2] # ]) # # @example Insert with array of hashes # table.upload_data([ # {message: 'hello world', country: 'mx', type: 1}, # {message: 'hello again', country: 'us', type: 2} # ]) # # @param [Array] rows Array of arrays or hashes with row values # @return [void] def upload_data(rows) query = data_insert_query(rows) database.execute_query(query, async: false, system_execution: true) end ## # Downloads table contents into memory. Mostly useful for testing purposes. # # @return [Array] Array of arrays with row values. def download_data result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true) content = output_downloader.download(result.output_location) output_parser.parse(content, column_types) end ## # Removes table's content on S3. Optionally, you can limit files removed to specific partitions. # # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}. # @return [void] def wipe_data(partitions: nil) table_data_wiper.wipe_table_data(self, partitions) end ## # @return Table data format def format .fetch(:format) end ## # @return [String] table location URL def location Egis.mode.s3_path(@location) end private attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser, :table_data_wiper def log_table_creation Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" } end def column_types all_columns.map(&:type) end def all_columns schema.columns + schema.partitions end def data_insert_query(rows) insert_values = rows.map { |row| row_literal_values(row) } row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n") <<~SQL INSERT INTO #{name} VALUES #{row_clause} SQL end def row_literal_values(row) all_columns.map.with_index do |column, index| value = row.is_a?(Hash) ? row[column.name] : row[index] Egis::Types.serializer(column.type).literal(value) end end def row_values_statement(row) "(#{row.join(', ')})" end end |
#name ⇒ String (readonly)
Returns Athena table name.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/egis/table.rb', line 16 class Table DEFAULT_OPTIONS = {format: :tsv}.freeze def initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) @database = database @name = name @schema = schema @location = location @options = DEFAULT_OPTIONS.merge() @partitions_generator = partitions_generator @table_ddl_generator = table_ddl_generator @output_downloader = output_downloader @output_parser = output_parser @table_data_wiper = table_data_wiper end attr_reader :database, :name, :schema ## # Creates table in Athena. # # @return [void] def create log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # The same as {#create} but raising error when table with a given name already exists. # # @return [void] def create! log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # Creates partitions with all possible combinations of given partition values. # # @example # table.add_partitions(year: [2000, 2001], type: ['user']) # # @param [Hash] partitions # @return [void] def add_partitions(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # (see add_partitions) # It raises error when a partition already exists. def add_partitions!(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # Tells Athena to automatically discover table's partitions by scanning table's S3 location. # This operation might take long time with big number of partitions. If that's the case, instead of this method use # {#add_partitions} to define partitions manually. # # @return [void] def discover_partitions database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true) end ## # Insert data into the table. Mostly useful for testing purposes. # # @example Insert with array of arrays # table.upload_data([ # ['hello world', 'mx', 1], # ['hello again', 'us', 2] # ]) # # @example Insert with array of hashes # table.upload_data([ # {message: 'hello world', country: 'mx', type: 1}, # {message: 'hello again', country: 'us', type: 2} # ]) # # @param [Array] rows Array of arrays or hashes with row values # @return [void] def upload_data(rows) query = data_insert_query(rows) database.execute_query(query, async: false, system_execution: true) end ## # Downloads table contents into memory. Mostly useful for testing purposes. # # @return [Array] Array of arrays with row values. def download_data result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true) content = output_downloader.download(result.output_location) output_parser.parse(content, column_types) end ## # Removes table's content on S3. Optionally, you can limit files removed to specific partitions. # # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}. # @return [void] def wipe_data(partitions: nil) table_data_wiper.wipe_table_data(self, partitions) end ## # @return Table data format def format .fetch(:format) end ## # @return [String] table location URL def location Egis.mode.s3_path(@location) end private attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser, :table_data_wiper def log_table_creation Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" } end def column_types all_columns.map(&:type) end def all_columns schema.columns + schema.partitions end def data_insert_query(rows) insert_values = rows.map { |row| row_literal_values(row) } row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n") <<~SQL INSERT INTO #{name} VALUES #{row_clause} SQL end def row_literal_values(row) all_columns.map.with_index do |column, index| value = row.is_a?(Hash) ? row[column.name] : row[index] Egis::Types.serializer(column.type).literal(value) end end def row_values_statement(row) "(#{row.join(', ')})" end end |
#schema ⇒ Egis::TableSchema (readonly)
Returns table's schema object.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/egis/table.rb', line 16 class Table DEFAULT_OPTIONS = {format: :tsv}.freeze def initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) @database = database @name = name @schema = schema @location = location @options = DEFAULT_OPTIONS.merge() @partitions_generator = partitions_generator @table_ddl_generator = table_ddl_generator @output_downloader = output_downloader @output_parser = output_parser @table_data_wiper = table_data_wiper end attr_reader :database, :name, :schema ## # Creates table in Athena. # # @return [void] def create log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # The same as {#create} but raising error when table with a given name already exists. # # @return [void] def create! log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false) database.execute_query(create_table_sql, async: false, system_execution: true) end ## # Creates partitions with all possible combinations of given partition values. # # @example # table.add_partitions(year: [2000, 2001], type: ['user']) # # @param [Hash] partitions # @return [void] def add_partitions(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # (see add_partitions) # It raises error when a partition already exists. def add_partitions!(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false) database.execute_query(load_partitions_query, async: false, system_execution: true) end ## # Tells Athena to automatically discover table's partitions by scanning table's S3 location. # This operation might take long time with big number of partitions. If that's the case, instead of this method use # {#add_partitions} to define partitions manually. # # @return [void] def discover_partitions database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true) end ## # Insert data into the table. Mostly useful for testing purposes. # # @example Insert with array of arrays # table.upload_data([ # ['hello world', 'mx', 1], # ['hello again', 'us', 2] # ]) # # @example Insert with array of hashes # table.upload_data([ # {message: 'hello world', country: 'mx', type: 1}, # {message: 'hello again', country: 'us', type: 2} # ]) # # @param [Array] rows Array of arrays or hashes with row values # @return [void] def upload_data(rows) query = data_insert_query(rows) database.execute_query(query, async: false, system_execution: true) end ## # Downloads table contents into memory. Mostly useful for testing purposes. # # @return [Array] Array of arrays with row values. def download_data result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true) content = output_downloader.download(result.output_location) output_parser.parse(content, column_types) end ## # Removes table's content on S3. Optionally, you can limit files removed to specific partitions. # # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}. # @return [void] def wipe_data(partitions: nil) table_data_wiper.wipe_table_data(self, partitions) end ## # @return Table data format def format .fetch(:format) end ## # @return [String] table location URL def location Egis.mode.s3_path(@location) end private attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser, :table_data_wiper def log_table_creation Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" } end def column_types all_columns.map(&:type) end def all_columns schema.columns + schema.partitions end def data_insert_query(rows) insert_values = rows.map { |row| row_literal_values(row) } row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n") <<~SQL INSERT INTO #{name} VALUES #{row_clause} SQL end def row_literal_values(row) all_columns.map.with_index do |column, index| value = row.is_a?(Hash) ? row[column.name] : row[index] Egis::Types.serializer(column.type).literal(value) end end def row_values_statement(row) "(#{row.join(', ')})" end end |
Instance Method Details
#add_partitions(partitions) ⇒ void
This method returns an undefined value.
Creates partitions with all possible combinations of given partition values.
74 75 76 77 |
# File 'lib/egis/table.rb', line 74 def add_partitions(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true) database.execute_query(load_partitions_query, async: false, system_execution: true) end |
#add_partitions!(partitions) ⇒ void
This method returns an undefined value.
Creates partitions with all possible combinations of given partition values.
It raises error when a partition already exists.
83 84 85 86 |
# File 'lib/egis/table.rb', line 83 def add_partitions!(partitions) load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false) database.execute_query(load_partitions_query, async: false, system_execution: true) end |
#create ⇒ void
This method returns an undefined value.
Creates table in Athena.
46 47 48 49 50 51 |
# File 'lib/egis/table.rb', line 46 def create log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true) database.execute_query(create_table_sql, async: false, system_execution: true) end |
#create! ⇒ void
This method returns an undefined value.
The same as #create but raising error when table with a given name already exists.
58 59 60 61 62 63 |
# File 'lib/egis/table.rb', line 58 def create! log_table_creation create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false) database.execute_query(create_table_sql, async: false, system_execution: true) end |
#discover_partitions ⇒ void
This method returns an undefined value.
Tells Athena to automatically discover table's partitions by scanning table's S3 location. This operation might take long time with big number of partitions. If that's the case, instead of this method use #add_partitions to define partitions manually.
95 96 97 |
# File 'lib/egis/table.rb', line 95 def discover_partitions database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true) end |
#download_data ⇒ Array
Downloads table contents into memory. Mostly useful for testing purposes.
127 128 129 130 131 |
# File 'lib/egis/table.rb', line 127 def download_data result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true) content = output_downloader.download(result.output_location) output_parser.parse(content, column_types) end |
#format ⇒ Object
Returns Table data format.
145 146 147 |
# File 'lib/egis/table.rb', line 145 def format .fetch(:format) end |
#location ⇒ String
Returns table location URL.
152 153 154 |
# File 'lib/egis/table.rb', line 152 def location Egis.mode.s3_path(@location) end |
#upload_data(rows) ⇒ void
This method returns an undefined value.
Insert data into the table. Mostly useful for testing purposes.
117 118 119 120 |
# File 'lib/egis/table.rb', line 117 def upload_data(rows) query = data_insert_query(rows) database.execute_query(query, async: false, system_execution: true) end |
#wipe_data(partitions: nil) ⇒ void
This method returns an undefined value.
Removes table's content on S3. Optionally, you can limit files removed to specific partitions.
139 140 141 |
# File 'lib/egis/table.rb', line 139 def wipe_data(partitions: nil) table_data_wiper.wipe_table_data(self, partitions) end |