Class: Egis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/egis/client.rb

Overview

The most fundamental Egis class. Provides an interface for executing Athena queries.

See configuration instructions configure.

Examples:

Create client and execute asynchronous query

client = Egis::Client.new
status = client.execute_query('SELECT * FROM my_table;')

while status.in_progress?
  # do something useful
  # ...
  status = client.query_status(status.id)
end

status.output_location.url # s3://my-bucket/result/path

Execute synchronous query and fetch results

status = client.execute_query('SELECT MAX(time), MIN(id) FROM my_table;', async: false)
status.fetch_result(schema: [:timestamp, :int]) # [[2020-05-04 11:19:03 +0200, 7]]

Yield Parameters:

  • config (Egis::Configuration)

    Egis configuration block, if missing Egis will use global configuration provided by configure

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aws_client_provider: Egis::AwsClientProvider.new, s3_location_parser: Egis::S3LocationParser.new, &block) ⇒ Client

Returns a new instance of Client.



43
44
45
46
47
48
49
50
# File 'lib/egis/client.rb', line 43

def initialize(aws_client_provider: Egis::AwsClientProvider.new,
               s3_location_parser: Egis::S3LocationParser.new,
               &block)
  @configuration = block_given? ? Egis.configuration.dup.configure(&block) : Egis.configuration
  @aws_athena_client = aws_client_provider.athena_client(configuration)
  @aws_s3_client = aws_client_provider.s3_client(configuration)
  @s3_location_parser = s3_location_parser
end

Instance Attribute Details

#aws_s3_clientObject (readonly)

Returns the value of attribute aws_s3_client.



41
42
43
# File 'lib/egis/client.rb', line 41

def aws_s3_client
  @aws_s3_client
end

Instance Method Details

#database(database_name) ⇒ Egis::Database

Creates Database object with a given name. Executing it doesn't create Athena database yet.

Parameters:

  • database_name (String)

Returns:



58
59
60
# File 'lib/egis/client.rb', line 58

def database(database_name)
  Database.new(database_name, client: self)
end

#execute_query(query, work_group: nil, database: nil, output_location: nil, async: true, system_execution: false) ⇒ Egis::QueryStatus

Executes Athena query. By default, queries are being executed asynchronously.

Parameters:

  • query (String)

    SQL query to execute

  • async (Boolean) (defaults to: true)

    Decide whether you want to run query asynchronously or block execution until it finishes

  • work_group (String) (defaults to: nil)

    Change Athena work group the query will be executed in.

  • database (String) (defaults to: nil)

    Run query in the context of a specific database (implicit table references are expected to be in given database).

  • output_location (String) (defaults to: nil)

    S3 url of the desired output location. By default, Athena uses location defined in by workgroup.

Returns:

Raises:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/egis/client.rb', line 74

def execute_query(query, work_group: nil, database: nil, output_location: nil, async: true, system_execution: false)
  query_id = aws_athena_client.start_query_execution(
    query_execution_params(query, work_group, database, output_location)
  ).query_execution_id

  log_query_execution(query, query_id, system_execution)

  return query_status(query_id) if Egis.mode.async(async)

  query_status = wait_for_query_to_finish(query_id)

  raise Egis::Errors::QueryExecutionError, query_status.message unless query_status.finished?

  query_status
end

#query_status(query_id) ⇒ Egis::QueryStatus

Check the status of asynchronous query execution.

Parameters:

Returns:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/egis/client.rb', line 96

def query_status(query_id)
  resp = aws_athena_client.get_query_execution(query_execution_id: query_id)

  query_execution = resp.query_execution
  query_status = query_execution.status.state

  Egis.logger.debug { "Checking query status (#{query_id}): #{query_status}" }

  Egis::QueryStatus.new(
    query_execution.query_execution_id,
    QUERY_STATUS_MAPPING.fetch(query_status),
    query_execution.status.state_change_reason,
    parse_output_location(query_execution),
    client: self
  )
end