Class: Egis::Table

Inherits:
Object
  • Object
show all
Defined in:
lib/egis/table.rb

Overview

Interface for Athena table manipulation.

It is recommended to create table objects using Database#table method.

Constant Summary collapse

DEFAULT_OPTIONS =
{format: :tsv}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) ⇒ Table

Returns a new instance of Table.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/egis/table.rb', line 19

def initialize(database, name, schema, location, options: {},
               client: Egis::Client.new,
               partitions_generator: Egis::PartitionsGenerator.new,
               table_ddl_generator: Egis::TableDDLGenerator.new,
               output_downloader: Egis::OutputDownloader.new(client.aws_s3_client),
               output_parser: Egis::OutputParser.new,
               s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client),
               table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner))
  @database = database
  @name = name
  @schema = schema
  @location = location
  @options = DEFAULT_OPTIONS.merge(options)
  @partitions_generator = partitions_generator
  @table_ddl_generator = table_ddl_generator
  @output_downloader = output_downloader
  @output_parser = output_parser
  @table_data_wiper = table_data_wiper
end

Instance Attribute Details

#databaseEgis::Database (readonly)

Returns:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/egis/table.rb', line 16

class Table
  DEFAULT_OPTIONS = {format: :tsv}.freeze

  def initialize(database, name, schema, location, options: {},
                 client: Egis::Client.new,
                 partitions_generator: Egis::PartitionsGenerator.new,
                 table_ddl_generator: Egis::TableDDLGenerator.new,
                 output_downloader: Egis::OutputDownloader.new(client.aws_s3_client),
                 output_parser: Egis::OutputParser.new,
                 s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client),
                 table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner))
    @database = database
    @name = name
    @schema = schema
    @location = location
    @options = DEFAULT_OPTIONS.merge(options)
    @partitions_generator = partitions_generator
    @table_ddl_generator = table_ddl_generator
    @output_downloader = output_downloader
    @output_parser = output_parser
    @table_data_wiper = table_data_wiper
  end

  attr_reader :database, :name, :schema

  ##
  # Creates table in Athena.
  #
  # @return [void]

  def create
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # The same as {#create} but raising error when table with a given name already exists.
  #
  # @return [void]

  def create!
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # Creates partitions with all possible combinations of given partition values.
  #
  # @example
  #   table.add_partitions(year: [2000, 2001], type: ['user'])
  #
  # @param [Hash] partitions
  # @return [void]

  def add_partitions(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # (see add_partitions)
  # It raises error when a partition already exists.

  def add_partitions!(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # Tells Athena to automatically discover table's partitions by scanning table's S3 location.
  # This operation might take long time with big number of partitions. If that's the case, instead of this method use
  # {#add_partitions} to define partitions manually.
  #
  # @return [void]

  def discover_partitions
    database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true)
  end

  ##
  # Insert data into the table. Mostly useful for testing purposes.
  #
  # @example Insert with array of arrays
  #   table.upload_data([
  #       ['hello world', 'mx', 1],
  #       ['hello again', 'us', 2]
  #   ])
  #
  # @example Insert with array of hashes
  #   table.upload_data([
  #       {message: 'hello world', country: 'mx', type: 1},
  #       {message: 'hello again', country: 'us', type: 2}
  #   ])
  #
  # @param [Array] rows Array of arrays or hashes with row values
  # @return [void]

  def upload_data(rows)
    query = data_insert_query(rows)
    database.execute_query(query, async: false, system_execution: true)
  end

  ##
  # Downloads table contents into memory. Mostly useful for testing purposes.
  #
  # @return [Array] Array of arrays with row values.

  def download_data
    result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true)
    content = output_downloader.download(result.output_location)
    output_parser.parse(content, column_types)
  end

  ##
  # Removes table's content on S3. Optionally, you can limit files removed to specific partitions.
  #
  # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}.
  # @return [void]

  def wipe_data(partitions: nil)
    table_data_wiper.wipe_table_data(self, partitions)
  end

  ##
  # @return Table data format
  def format
    options.fetch(:format)
  end

  ##
  # @return [String] table location URL

  def location
    Egis.mode.s3_path(@location)
  end

  private

  attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser,
              :table_data_wiper

  def log_table_creation
    Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" }
  end

  def column_types
    all_columns.map(&:type)
  end

  def all_columns
    schema.columns + schema.partitions
  end

  def data_insert_query(rows)
    insert_values = rows.map { |row| row_literal_values(row) }
    row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n")

    <<~SQL
      INSERT INTO #{name} VALUES
      #{row_clause}
    SQL
  end

  def row_literal_values(row)
    all_columns.map.with_index do |column, index|
      value = row.is_a?(Hash) ? row[column.name] : row[index]
      Egis::Types.serializer(column.type).literal(value)
    end
  end

  def row_values_statement(row)
    "(#{row.join(', ')})"
  end
end

#nameString (readonly)

Returns Athena table name.

Returns:

  • (String)

    Athena table name



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/egis/table.rb', line 16

class Table
  DEFAULT_OPTIONS = {format: :tsv}.freeze

  def initialize(database, name, schema, location, options: {},
                 client: Egis::Client.new,
                 partitions_generator: Egis::PartitionsGenerator.new,
                 table_ddl_generator: Egis::TableDDLGenerator.new,
                 output_downloader: Egis::OutputDownloader.new(client.aws_s3_client),
                 output_parser: Egis::OutputParser.new,
                 s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client),
                 table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner))
    @database = database
    @name = name
    @schema = schema
    @location = location
    @options = DEFAULT_OPTIONS.merge(options)
    @partitions_generator = partitions_generator
    @table_ddl_generator = table_ddl_generator
    @output_downloader = output_downloader
    @output_parser = output_parser
    @table_data_wiper = table_data_wiper
  end

  attr_reader :database, :name, :schema

  ##
  # Creates table in Athena.
  #
  # @return [void]

  def create
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # The same as {#create} but raising error when table with a given name already exists.
  #
  # @return [void]

  def create!
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # Creates partitions with all possible combinations of given partition values.
  #
  # @example
  #   table.add_partitions(year: [2000, 2001], type: ['user'])
  #
  # @param [Hash] partitions
  # @return [void]

  def add_partitions(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # (see add_partitions)
  # It raises error when a partition already exists.

  def add_partitions!(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # Tells Athena to automatically discover table's partitions by scanning table's S3 location.
  # This operation might take long time with big number of partitions. If that's the case, instead of this method use
  # {#add_partitions} to define partitions manually.
  #
  # @return [void]

  def discover_partitions
    database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true)
  end

  ##
  # Insert data into the table. Mostly useful for testing purposes.
  #
  # @example Insert with array of arrays
  #   table.upload_data([
  #       ['hello world', 'mx', 1],
  #       ['hello again', 'us', 2]
  #   ])
  #
  # @example Insert with array of hashes
  #   table.upload_data([
  #       {message: 'hello world', country: 'mx', type: 1},
  #       {message: 'hello again', country: 'us', type: 2}
  #   ])
  #
  # @param [Array] rows Array of arrays or hashes with row values
  # @return [void]

  def upload_data(rows)
    query = data_insert_query(rows)
    database.execute_query(query, async: false, system_execution: true)
  end

  ##
  # Downloads table contents into memory. Mostly useful for testing purposes.
  #
  # @return [Array] Array of arrays with row values.

  def download_data
    result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true)
    content = output_downloader.download(result.output_location)
    output_parser.parse(content, column_types)
  end

  ##
  # Removes table's content on S3. Optionally, you can limit files removed to specific partitions.
  #
  # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}.
  # @return [void]

  def wipe_data(partitions: nil)
    table_data_wiper.wipe_table_data(self, partitions)
  end

  ##
  # @return Table data format
  def format
    options.fetch(:format)
  end

  ##
  # @return [String] table location URL

  def location
    Egis.mode.s3_path(@location)
  end

  private

  attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser,
              :table_data_wiper

  def log_table_creation
    Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" }
  end

  def column_types
    all_columns.map(&:type)
  end

  def all_columns
    schema.columns + schema.partitions
  end

  def data_insert_query(rows)
    insert_values = rows.map { |row| row_literal_values(row) }
    row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n")

    <<~SQL
      INSERT INTO #{name} VALUES
      #{row_clause}
    SQL
  end

  def row_literal_values(row)
    all_columns.map.with_index do |column, index|
      value = row.is_a?(Hash) ? row[column.name] : row[index]
      Egis::Types.serializer(column.type).literal(value)
    end
  end

  def row_values_statement(row)
    "(#{row.join(', ')})"
  end
end

#schemaEgis::TableSchema (readonly)

Returns table's schema object.

Returns:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/egis/table.rb', line 16

class Table
  DEFAULT_OPTIONS = {format: :tsv}.freeze

  def initialize(database, name, schema, location, options: {},
                 client: Egis::Client.new,
                 partitions_generator: Egis::PartitionsGenerator.new,
                 table_ddl_generator: Egis::TableDDLGenerator.new,
                 output_downloader: Egis::OutputDownloader.new(client.aws_s3_client),
                 output_parser: Egis::OutputParser.new,
                 s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client),
                 table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner))
    @database = database
    @name = name
    @schema = schema
    @location = location
    @options = DEFAULT_OPTIONS.merge(options)
    @partitions_generator = partitions_generator
    @table_ddl_generator = table_ddl_generator
    @output_downloader = output_downloader
    @output_parser = output_parser
    @table_data_wiper = table_data_wiper
  end

  attr_reader :database, :name, :schema

  ##
  # Creates table in Athena.
  #
  # @return [void]

  def create
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # The same as {#create} but raising error when table with a given name already exists.
  #
  # @return [void]

  def create!
    log_table_creation

    create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false)
    database.execute_query(create_table_sql, async: false, system_execution: true)
  end

  ##
  # Creates partitions with all possible combinations of given partition values.
  #
  # @example
  #   table.add_partitions(year: [2000, 2001], type: ['user'])
  #
  # @param [Hash] partitions
  # @return [void]

  def add_partitions(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # (see add_partitions)
  # It raises error when a partition already exists.

  def add_partitions!(partitions)
    load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false)
    database.execute_query(load_partitions_query, async: false, system_execution: true)
  end

  ##
  # Tells Athena to automatically discover table's partitions by scanning table's S3 location.
  # This operation might take long time with big number of partitions. If that's the case, instead of this method use
  # {#add_partitions} to define partitions manually.
  #
  # @return [void]

  def discover_partitions
    database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true)
  end

  ##
  # Insert data into the table. Mostly useful for testing purposes.
  #
  # @example Insert with array of arrays
  #   table.upload_data([
  #       ['hello world', 'mx', 1],
  #       ['hello again', 'us', 2]
  #   ])
  #
  # @example Insert with array of hashes
  #   table.upload_data([
  #       {message: 'hello world', country: 'mx', type: 1},
  #       {message: 'hello again', country: 'us', type: 2}
  #   ])
  #
  # @param [Array] rows Array of arrays or hashes with row values
  # @return [void]

  def upload_data(rows)
    query = data_insert_query(rows)
    database.execute_query(query, async: false, system_execution: true)
  end

  ##
  # Downloads table contents into memory. Mostly useful for testing purposes.
  #
  # @return [Array] Array of arrays with row values.

  def download_data
    result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true)
    content = output_downloader.download(result.output_location)
    output_parser.parse(content, column_types)
  end

  ##
  # Removes table's content on S3. Optionally, you can limit files removed to specific partitions.
  #
  # @param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}.
  # @return [void]

  def wipe_data(partitions: nil)
    table_data_wiper.wipe_table_data(self, partitions)
  end

  ##
  # @return Table data format
  def format
    options.fetch(:format)
  end

  ##
  # @return [String] table location URL

  def location
    Egis.mode.s3_path(@location)
  end

  private

  attr_reader :options, :partitions_generator, :table_ddl_generator, :output_downloader, :output_parser,
              :table_data_wiper

  def log_table_creation
    Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" }
  end

  def column_types
    all_columns.map(&:type)
  end

  def all_columns
    schema.columns + schema.partitions
  end

  def data_insert_query(rows)
    insert_values = rows.map { |row| row_literal_values(row) }
    row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n")

    <<~SQL
      INSERT INTO #{name} VALUES
      #{row_clause}
    SQL
  end

  def row_literal_values(row)
    all_columns.map.with_index do |column, index|
      value = row.is_a?(Hash) ? row[column.name] : row[index]
      Egis::Types.serializer(column.type).literal(value)
    end
  end

  def row_values_statement(row)
    "(#{row.join(', ')})"
  end
end

Instance Method Details

#add_partitions(partitions) ⇒ void

This method returns an undefined value.

Creates partitions with all possible combinations of given partition values.

Examples:

table.add_partitions(year: [2000, 2001], type: ['user'])

Parameters:

  • partitions (Hash)


74
75
76
77
# File 'lib/egis/table.rb', line 74

def add_partitions(partitions)
  load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true)
  database.execute_query(load_partitions_query, async: false, system_execution: true)
end

#add_partitions!(partitions) ⇒ void

This method returns an undefined value.

Creates partitions with all possible combinations of given partition values.

It raises error when a partition already exists.

Examples:

table.add_partitions(year: [2000, 2001], type: ['user'])

Parameters:

  • partitions (Hash)


83
84
85
86
# File 'lib/egis/table.rb', line 83

def add_partitions!(partitions)
  load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false)
  database.execute_query(load_partitions_query, async: false, system_execution: true)
end

#createvoid

This method returns an undefined value.

Creates table in Athena.



46
47
48
49
50
51
# File 'lib/egis/table.rb', line 46

def create
  log_table_creation

  create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true)
  database.execute_query(create_table_sql, async: false, system_execution: true)
end

#create!void

This method returns an undefined value.

The same as #create but raising error when table with a given name already exists.



58
59
60
61
62
63
# File 'lib/egis/table.rb', line 58

def create!
  log_table_creation

  create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false)
  database.execute_query(create_table_sql, async: false, system_execution: true)
end

#discover_partitionsvoid

This method returns an undefined value.

Tells Athena to automatically discover table's partitions by scanning table's S3 location. This operation might take long time with big number of partitions. If that's the case, instead of this method use #add_partitions to define partitions manually.



95
96
97
# File 'lib/egis/table.rb', line 95

def discover_partitions
  database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true)
end

#download_dataArray

Downloads table contents into memory. Mostly useful for testing purposes.

Returns:

  • (Array)

    Array of arrays with row values.



127
128
129
130
131
# File 'lib/egis/table.rb', line 127

def download_data
  result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true)
  content = output_downloader.download(result.output_location)
  output_parser.parse(content, column_types)
end

#formatObject

Returns Table data format.

Returns:

  • Table data format



145
146
147
# File 'lib/egis/table.rb', line 145

def format
  options.fetch(:format)
end

#locationString

Returns table location URL.

Returns:

  • (String)

    table location URL



152
153
154
# File 'lib/egis/table.rb', line 152

def location
  Egis.mode.s3_path(@location)
end

#upload_data(rows) ⇒ void

This method returns an undefined value.

Insert data into the table. Mostly useful for testing purposes.

Examples:

Insert with array of arrays

table.upload_data([
    ['hello world', 'mx', 1],
    ['hello again', 'us', 2]
])

Insert with array of hashes

table.upload_data([
    {message: 'hello world', country: 'mx', type: 1},
    {message: 'hello again', country: 'us', type: 2}
])

Parameters:

  • rows (Array)

    Array of arrays or hashes with row values



117
118
119
120
# File 'lib/egis/table.rb', line 117

def upload_data(rows)
  query = data_insert_query(rows)
  database.execute_query(query, async: false, system_execution: true)
end

#wipe_data(partitions: nil) ⇒ void

This method returns an undefined value.

Removes table's content on S3. Optionally, you can limit files removed to specific partitions.

Parameters:

  • partitions (Hash) (defaults to: nil)

    Partitions values to remove. Follows the same argument format as #add_partitions.



139
140
141
# File 'lib/egis/table.rb', line 139

def wipe_data(partitions: nil)
  table_data_wiper.wipe_table_data(self, partitions)
end