Automating Athena Queries with Python

Automating Athena Queries with Python


Over the last few weeks I’ve been using Amazon Athena quite heavily. For those of you who haven’t encountered it, Athena basically lets you query data stored in various formats on S3 using SQL (under the hood it’s a managed Presto/Hive Cluster). Pricing for Athena is pretty nice as well, you pay only for the amount of data you process and that’s relatively cheap at $5 per TB when you consider the effort to set up EMR Clusters for one-time or very infrequent queries and transformations.

In this post I’m going to share some code I’m using to automate queries in Athena.

Using Athena


If you use Athena interactively, it is very simple - you have your schemas and tables on the left, your editor on the right and a big beautiful Run query button. Once you enter your query, you wait for the result, it shows a pretty loading-animation and afterwards you get your data, which you could then download as CSV.

Athena Gui

Inside of your code

Using Athena inside of your code is a little more annoying, at least when you’re using Lambda and/or try to keep things serverless. Running Athena queries from the SDK is pretty straightforward.

If you were to do it using boto3 it would look something like this:

import boto3

query = "Your query"
database = "database_name"
athena_result_bucket = "s3://my-bucket/"

response = client.start_query_execution(
        'Database': database
        'OutputLocation': athena_result_bucket,

query_execution_id = response["QueryExecutionId"]

Running queries is all fine and dandy, but you usually care about the result of queries as well or at least would like to know, if they succeeded.

Then you encounter the problem, that the order of magnitude for query runtime in Athena is not milliseconds, rather seconds and minutes - up to a limit of 30 minutes. This is a problem, because the Lambda execution limit is currently at 15 minutes and long running Lambdas aren’t cool anyways.

(If you only have short running queries, let’s say up to 5 minutes and you know that beforehand, you can skip the section for short running queries)

Enter: Step Functions. Yes I know, having to use yet another service isn’t ideal, but there are two limitations with Athena:

  1. There is no Lambda trigger, when the query terminates
  2. There is no other integration like SNS or SQS for queries that finish

You could summarize it as: Athena lacks integration for the result of queries (if I have overlooked something, please let me know!).

If you haven’t yet encountered Step Functions: step functions help you automate workflows that include several AWS services - you define your workflow as a state machine and AWS takes care of orchestrating your resources in the order and with the constraints you specified.

I usually use this pattern in my step functions:

Step Function Workflow

A lambda function starts the long running Athena query, then we enter a kind of loop. First of all, a wait step pauses the execution, then another lambda function queries the state of the query execution. A choice-step (wording?) checks if the query has succeeded, if yes - we continue. If it’s still running, we move back to the waiting step (adding error-handling is trivial here).

You can find a sample project with the code for all of the functions on Github. To make life easier for myself I wrote the mini-library, which wraps some on the annoying parts of the API.

Now I’m going to show you first of all the code for long running queries and afterwards a simplified version for short queries.

Automating Athena

Long running queries

As mentioned above, there are 3 Lambda functions involved in this. We’re going to start with the function that executes the query:

def start_long_running_query(event, context):

    # This is the default table
    query = "select * from elb_logs limit 1"
    database_name = "sampledb"

    # Build the name of the default Athena bucket
    account_id = BOTO_SESSION.client('sts').get_caller_identity().get('Account')
    region = BOTO_SESSION.region_name
    result_bucket = "s3://aws-athena-query-results-{}-{}/".format(account_id, region)

    my_query = AthenaQuery(query, database_name, result_bucket)

    query_execution_id = my_query.execute()

    # This will be processed by our waiting-step
    event = {
        "MyQueryExecutionId": query_execution_id,
        "WaitTask": {
            "QueryExecutionId": query_execution_id,
            "ResultPrefix": "Sample"

    return event

This functions sets up the relevant parameters for the query:

  • query - the query itself
  • database_name - the name of the schema the query is executed in
  • result_bucket - this builds the name of the result bucket that gets created by default

The actual code for executing the query is just two lines, we build the AthenaQuery object and call execute() on it and receive the execution id of the query:

my_query = AthenaQuery(query, database_name, result_bucket)
query_execution_id = my_query.execute()

Afterwards we build the object that gets passed to the next step. Passing down the Query Execution Id would be sufficient, but I like stats.

The next Lambda function is considerably simpler, it takes the QueryExecutionId out of the input event, builds an AthenaQuery object from it and retrieves the current status of the query.

def get_long_running_query_status(event, context):

    query_execution_id = event["WaitTask"]["QueryExecutionId"]
    aq = AthenaQuery.from_execution_id(query_execution_id)

    status_information = aq.get_status_information()
    event["WaitTask"]["QueryState"] = status_information["QueryState"]

    status_key = "{}StatusInformation".format(event["WaitTask"]["ResultPrefix"])
    event[status_key] = status_information

    return event

A choice-step in the step function processes this - you can find the full definition in the serverless.yml of the project, but here is an excerpt of it:

    Type: Choice
    - Or:
        - Variable: "$.WaitTask.QueryState"
            StringEquals: FAILED
        - Variable: "$.WaitTask.QueryState"
            StringEquals: CANCELED
      Next: query_failed

This basically tells the state machine to go to the error state query_failed when the query FAILED or is in status CANCELED.

We only get to the next step, if the query has succeeded. This Lambda again builds the AthenaQuery object from the QueryExecutionId and retrieves the result:

def get_long_running_result(event, context):
    query_execution_id = event["MyQueryExecutionId"]

    # Build the query object from the execution id
    aq = AthenaQuery.from_execution_id(query_execution_id)

    # Fetch the result
    result_data = aq.get_result()

    # Do whatever you want with the result

    event["GotResult"] = True
    return event

Inside of this function you can process the results the way you want. This is how you can deal with long running Athena-queries in Lambda.

Let’s have a look at the much simpler case now: short running queries:

Short running queries

I’d recommend this for queries that run for up to 5 minutes - otherwise it’s probably worth setting up the state machine as described above.

The code for this one relies on the as well:

import boto3
from athena_helper import AthenaQuery

BOTO_SESSION = boto3.Session()

def short_running_query(event, context):

    # Build the name of the default Athena bucket
    account_id = BOTO_SESSION.client('sts').get_caller_identity().get('Account')
    region = BOTO_SESSION.region_name
    result_bucket = "s3://aws-athena-query-results-{}-{}/".format(account_id, region)

    my_query = AthenaQuery(
        "select elb_name from elb_logs limit 1",

    result_data = my_query.get_result()

    # Process the result

    return result_data

This uses the same functions that have been described above, only without the waiting step in between - the get_result() function will actually wait for the query to finish - up to a timeout that’s by default set to 60 seconds.


In this post I’ve shown you how to use the athena_helper mini-library to work with long-running and short-running Athena queries in python.

If you have any questions, feedback or suggestions, feel free to reach out to me on Twitter (@Maurice_Brg)

Photo by Hitesh Choudhary on Unsplash

Similar Posts You Might Enjoy

Athena Abfragen mit Python automatisieren

Athena Abfragen mit Python automatisieren Einleitung In den letzten Wochen hatte ich die Gelegenheit mit einigermaßen intensiv mit Amazon Athena zu beschäftigen. Für alle, die damit bisher noch keine Berührungspunkte hatten, mit Athena kann man im Kern SQL-Abfragen auf Daten, die in S3 liegen durchführen (unter der Haube ist Athena ein managed Hive/Presto Cluster). Das Abrechnungsmodell ist auch einigermaßen attraktiv - man bezahlt nur für die Menge an verarbeiteten Daten und hier sind die kosten bei ca. - by Maurice Borgmeier

Das Gleiche für alle: Lambda Funktion *und* Ressourcen mit der gleichen Sprache im CDK

Das AWS CDK, Cloud Development Kit wurde auf der reInvent 2018 vorgestellt und soll die codebasierte Entwicklung von AWS Ressourcen vereinfachen. Infrastructure as Code zu Ende gedacht. Nun gibt es ja schon einige andere Frameworks, wie z.B. in tRick hier beschrieben. Die Konkurrenz ist also groß! Kann man das neue CDK denn auch für Serverless/Lambda Funktionen verwenden? Von mir ein großes “Ja”, aber lest selber… - by Gernot Glawe

The Same for everyone: Lambda Function *and* Code with the same language via AWS CDK

The AWS CDK has been presented on reInvent 2018. Its mission is to simplify develop infrastructure as code. There are several other frameworks, as discussed here on the blog in tRick. So there are some competitors… Can you use shiny new CDK also for serverless lambda applications? Big “Yes” from my side, but read for yourself… - by Gernot Glawe