James McMurray's Blog

Rust, Linux and other curiosities

Serverless Data Ingestion with Rust and AWS SES


In this post we will set up a simple, serverless data ingestion pipeline using Rust, AWS Lambda and AWS SES with Workmail.

We will handle multiple types of AWS events with one Lambda function, parse received emails with the mailparse crate, and send email with SES and the lettre crate.

The complete code for this example is available on GitHub here.

Opinions expressed are solely my own and do not express the views or opinions of my employer.

You can follow along using the AWS Free Tier, in this case all resources are within the AWS Free Tier (at least for new users). Note the Workmail free trial is only for 30 days (but will only cost $4 a month for one user after that).

Process Requirements

In this process we would like to send an email to a recipient, prompting them to send us some CSV data as an attachment. When we receive that email we need to parse the attachment, validate its contents, save the parsed content in S3 (if we are able to parse it) and email the recipient with either a confirmation of success, or a list of encountered errors (either in deserialization of the attachment or validation) and their file for reference.

Note that in a previous post I dealt with creating a Lambda based pipeline for parsing Excel files and loading the resulting CSV to Redshift. There are a few differences and things to note in this process:

  • We will use rustls not OpenSSL (so it is easier to statically link with musl) - in this case, we only need to select that feature in Rusoto.
  • The Rusoto API no longer provides the .sync() method, so we must instantiate an async runtime to block on the Future returned.
  • I am still using the crates.io version of the lambda_runtime crate (currently v0.2.1), so we do not have complete async support (the Github master branch version does allow you to just use async functions directly!)
  • Here we will deal with parsing MIME-formatted emails.

Note that the reasoning for using Rust is the same - mainly that we can deploy a statically linked binary with musl, and we can easily share Rust code between different Lambda functions via crates with feature gates, so we can include only what we need - helping to control binary size.

Overall, we can break this process down in to the following steps:

  • Parsing a received email (we will set up Workmail to save all emails directly to an S3 bucket, and trigger our Lambda function with the S3 CreateObject event).
  • Deserializing and validating the attached CSV file.
  • Reading and writing a file to an S3 bucket.
  • Sending an email with an attachment (this is harder than it sounds, since we have to use SES's SendRawEmail API)

Implementation

We will follow the order of the steps above (i.e. handling the receipt of emails first, and then sending emails). First we need to set up our AWS resources.

AWS resources

Create an AWS Workmail organisation (note this will charge $4 a month after 30 days) in the Workmail console. We only need the default user and domain. Now in the SES console in Email Receiving > Rule Sets, edit the INBOUND_MAIL Rule Set, and edit the existing rule to write to an S3 bucket (create a bucket if you don't have one). You can also specify specific recipients for which to apply the rule.

Note the same effect can be achieved by setting a rule in the Workmail organisation to trigger a Lambda directly, we would then need to use the GetRawMessageContent call in the Workmail Message Flow API with the message ID in the event. Since it's useful to have an archive of all received emails on S3, we use the S3 trigger approach here.

MIME format

You can now test sending an email to the domain you created (and a specific recipient if you set the rule to be specific). You should see a file created like the following, containing the MIME-formatted message:

MIME-Version: 1.0
From: SENDER NAME <SENDER_EMAIL@gmail.com>
Date: Sat, 8 Aug 2020 22:54:58 +0200
Message-ID: <CAMJE-1uq35qGwMeMvryzpLXuN4htOHTDHt4kiLD-7K97XwFE7A@mail.gmail.com>
Subject: testmail
To: test@testses.awsapps.com
Content-Type: multipart/alternative; boundary="000000000000d2fb5805ac63efb8"

--000000000000d2fb5805ac63efb8
Content-Type: text/plain; charset="UTF-8"

testbody

--000000000000d2fb5805ac63efb8
Content-Type: text/html; charset="UTF-8"

<div dir="ltr">testbody</div>

--000000000000d2fb5805ac63efb8--

In this case testses was the Workmail domain, and the email was sent via Gmail (in practice, there are a load of DKIM headers, etc. prior to this part - but we will not use those).

The MIME format consists of many headers (containing some of the most relevant information such as sender address, message ID, subject and recipient), and a body split up in to parts. Each part is terminated with a boundary separator: --000000000000d2fb5805ac63efb8 in the above case (note this is specified where the multipart begins).

In the above case we can see the email contains both a plaintext and HTML version. The body is multipart/alternative so that the client knows it should choose between the two parts. If it were multipart/mixed then the client would show both parts, in this case effectively duplicating the body for the reader. You can read about the MIME specification in detail but we won't need much more than this for our purposes.

The most important thing to note is that multiparts can be nested (as different multiparts will have different boundary strings). If we send an email with a simple plaintext attachment we see the following:

Content-Type: multipart/mixed; boundary="00000000000025ab0605ac643145"

--00000000000025ab0605ac643145
Content-Type: multipart/alternative; boundary="00000000000025ab0305ac643143"

--00000000000025ab0305ac643143
Content-Type: text/plain; charset="UTF-8"

attachtest body

--00000000000025ab0305ac643143
Content-Type: text/html; charset="UTF-8"

<div dir="ltr">attachtest body</div>

--00000000000025ab0305ac643143--
--00000000000025ab0605ac643145
Content-Type: text/plain; charset="US-ASCII"; name="test_attachment.txt"
Content-Disposition: attachment; filename="test_attachment.txt"
Content-Transfer-Encoding: base64
Content-ID: <f_kdm5n1pg0>
X-Attachment-Id: f_kdm5n1pg0

cGxhaW50ZXh0Cgo=
--00000000000025ab0605ac643145--

We have the first multipart which is mixed and contains:

  • A nested multipart for the actual body, this is an alternative multipart so the client chooses between showing the HTML email or the plaintext email.
  • A single part which is the plaintext attachment, encoded in base64.

So the client will choose to display either the HTML or plaintext body, and always include the attachment.

Already we can start to imagine how we could parse this. Fortunately there are already crates to do this! In this post we will use the mailparse crate.

Reading received emails with mailparse

We use the parse_mail function to parse the MIME message (from bytes) in to a usable struct, and extract the attachment body and attachment filename for the first attachment:

 let cap = parse_mail(&file)?;
    let attachment = cap
        .subparts
        .into_iter()
        .find(|x| x.get_content_disposition().disposition == mailparse::DispositionType::Attachment)
        .expect("No attachment");

    let attachment_name = attachment
        .get_content_disposition()
        .params
        .get("filename")
        .expect("No filename")
        .to_string();

    let attachment = attachment.get_body()?;

This gives us the first attachment as a String. Note if we were not dealing with a plaintext format, i.e. we had a zipped file, etc., we would use get_body_raw() to get a Vec<u8> of bytes. This already handles the base64 decoding for us in both cases!

Deserializing and validating the CSV

In our case we expect one attachment, which will be a CSV file. If we have any errors reading the file we want to return these to the sender. For this simple example, we have a CSV with three fields:

  • A positive ID number (i.e. u32)
  • A start date
  • An end date

We will use the chrono crate to deserialize the dates, and the csv crate with Serde to do the overall deserialization.

We set up for deserialization (and serialization for the dates for later, so we maintain the same format) as follows:

#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct Entry {
    #[serde(alias = "ID")]
    pub id: u32,
    #[serde(deserialize_with = "de_datetime", serialize_with = "se_datetime")]
    pub start_date: NaiveDateTime,
    #[serde(deserialize_with = "de_datetime", serialize_with = "se_datetime")]
    pub end_date: NaiveDateTime,
}

fn de_datetime<'de, D>(deserializer: D) -> Result<NaiveDateTime, D::Error>
where
    D: serde::Deserializer<'de>,
{
    let s = String::deserialize(deserializer)?;
    match NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") {
        Ok(x) => Ok(x),
        Err(x) => Err(serde::de::Error::custom(x)),
    }
}

fn se_datetime<S>(dt: &NaiveDateTime, serializer: S) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    let t = dt.format("%Y-%m-%d %H:%M:%S").to_string();
    serializer.collect_str(&t)
}

We specify the deserialization and serialization functions for the dates so that we can specify the format to match the original input. Note the alias attribute means we can deserialize from either "ID" or "id", see the Serde field attributes for more details.

Then to deserialize we use:

pub fn deserialize_csv(attachment: &str) -> (Vec<Entry>, Vec<csv::Error>) {
    let mut rdr = csv::Reader::from_reader(Cursor::new(attachment.trim()));
    let mut records: Vec<Entry> = Vec::with_capacity(16);
    let mut de_errors: Vec<csv::Error> = Vec::new();
    for result in rdr.deserialize() {
        match result {
            Ok(record) => {
                records.push(record);
            }
            Err(error) => de_errors.push(error),
        }
    }

    (records, de_errors)
}

Note we put any errors in to the de_errors Vec, so if there are any, we can later parse them to return to the sender.

We validate the records with the following functions, passing in a reference to the Vec<Entry> from above (since we can choose our error type here, we use anyhow::Error for simplicity):

fn validate_record(r: &Entry) -> Result<()> {
    if r.start_date > r.end_date {
        Err(anyhow!(
            "Start date after end date for entry: {}, {}, {}",
            r.id,
            r.start_date,
            r.end_date
        ))
    } else {
        Ok(())
    }
}

pub fn validate_all_records(records: &[Entry]) -> Vec<anyhow::Error> {
    records
        .iter()
        .map(|x| validate_record(x))
        .filter_map(|x| x.err())
        .collect()
}

We can use the above Vectors of errors to generate an error email if there are any errors present. But first we need to deal with sending an email with SES.

Sending an email

To send an email with SES, there are two options:

  • SendEmail for sending emails without attachments. This is easy-to-use but unfortunately we need to send attachments.
  • SendRawEmail for sending a raw MIME email, including attachments.

So we will use SendRawEmailRequest in Rusoto to send MIME emails. But first of all we need to construct our MIME email. To do this, we will use the lettre crate, specifically the MessageBuilder in the current alpha version (note we use the 0.10 alpha in order to use rustls).

By default SES will be in Sandbox mode. This means you must verify the email addresses that you want to send to (and from) in the SES console.

The full code is available in the Github repo. Here is an excerpt showing how to handle adding the attachment part if necessary:

    // Set headers from arguments
    let email = Message::builder()
        .to(recipient.parse().unwrap())
        .from(crate::FROM.parse().unwrap())
        .subject(subject);

    // Deal with text and HTML body - omitted here
    // let mpart =  ...

    // Add attachment if present
    let mpart = if let Some(attachment) = attachment {
        mpart.singlepart(
            SinglePart::base64()
                .header(header::ContentType(attachment.mime.parse().unwrap()))
                .header(lettre::message::header::ContentDisposition {
                    disposition: DispositionType::Attachment,
                    parameters: vec![DispositionParam::Filename(
                        Charset::Us_Ascii,
                        None,
                        attachment.name.as_bytes().to_vec(), // the bytes of the filename
                    )],
                })
                .body(attachment.attachment),
        )
    } else {
        mpart
    };

    let email = email.multipart(mpart)?;
    let msg_string = email.formatted();

Note that we don't need to base64 encode the attachment ourselves, lettre will do this for us.

Then we make the request with Rusoto, note that we must base64 encode our message in the request in the RawMessage struct! Also note that we need to block on the Future returned from send_raw_email():

    let raw_message = rusoto_ses::RawMessage {
        data: bytes::Bytes::from(base64::encode(msg_string)),
    };
    let request = rusoto_ses::SendRawEmailRequest {
        configuration_set_name: None,
        destinations: None,
        from_arn: None,
        raw_message,
        return_path_arn: None,
        source: None,
        source_arn: None,
        tags: None,
    };

    let fut = ses_client.send_raw_email(request);
    let response = rt.block_on(fut)?;
    info!("Email sent: {:?}", response);

Now we can receive and send emails, and parse and validate the CSV attachments, we only need to put it together for the Lambda function.

S3

First we need to be able to read and write objects to S3, in order to write the CSV we receive after validation, and read it when we send the prompt email triggered with a Cloudwatch event.

This is simple to achieve with Rusoto, but note we need to block on the returned futures just as in the SES case:

pub fn get_file_from_s3(
    bucket: &str,
    key: &str,
    s3_client: &S3Client,
    rt: &mut tokio::runtime::Runtime,
) -> Result<Vec<u8>> {
    info!("Reading bucket: {}, key: {}", bucket, key);
    let s3file_fut = s3_client.get_object(GetObjectRequest {
        bucket: bucket.to_string(),
        key: key.to_string(),
        ..Default::default()
    });

    let s3file = rt.block_on(s3file_fut)?;

    let mut buffer: Vec<u8> = Vec::new();
    let _file = s3file
        .body
        .unwrap()
        .into_blocking_read()
        .read_to_end(&mut buffer)?;
    Ok(buffer)
}

pub fn write_file_to_s3(
    file: Vec<u8>,
    bucket: &str,
    key: &str,
    s3_client: &S3Client,
    rt: &mut tokio::runtime::Runtime,
) -> Result<()> {
    let fut = s3_client.put_object(PutObjectRequest {
        bucket: bucket.to_string(),
        key: key.to_string(),
        body: Some(file.into()),
        ..Default::default()
    });
    let _response = rt.block_on(fut)?;
    Ok(())
}

Now we can actually operate on the S3 event the Lambda function will receive as a trigger for received emails (by extracting the S3 key from the event and reading it).

AWS events and Lambda

We will use the aws_lambda_events crate for deserializing the events we receive. In this case we want to handle two different events - the Cloudwatch event from a fixed time trigger to send a prompt email, and the S3 event from received emails (being written to S3 via the SES/Workmail inbound mail rule).

We can do this by using the untagged Serde attribute and our own enum, acting as a wrapper over the possibilities:

#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
enum EventEnum {
    S3Event(aws_lambda_events::event::s3::S3Event),
    CloudWatchEvent(aws_lambda_events::event::cloudwatch_events::CloudWatchEvent),
}

See the Serde enum representations docs for more details and handling other possibilities.

Then we can deserialize to EventEnum in our handler function:

fn main() -> Result<()> {
    let mut builder = pretty_env_logger::formatted_timed_builder();
    builder.filter_level(log::LevelFilter::Debug);
    builder.init();
    lambda_runtime::lambda!(my_handler);

    Ok(())
}

fn my_handler(e: EventEnum, _c: lambda_runtime::Context) -> Result<(), HandlerError> {
    // Create clients here once since we will use them in all cases
    let s3_client = S3Client::new(Region::EuWest1);
    let ses_client = SesClient::new(Region::EuWest1);
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    match e {
    // Match on different events here...
    }
}

Deployment

Now we are ready to deploy our Lambda function. In order to use Rustls and the Tokio's block_on method we should set the following features in our dependencies in Cargo.toml:

rusoto_core = {version = "0.44", default_features = false, features = ["rustls"]}
rusoto_s3 = {version = "0.44", default_features = false, features = ["rustls"]}
rusoto_ses = {version = "0.44", default_features = false, features = ["rustls"]}
lettre = { version = "0.10.0-alpha.0", default_features = false, features = ["rustls-tls", "builder"] }
tokio = {version = "0.2", default_features = false, features = ["blocking", "rt-threaded"]}

Now we can compile with the musl target (here I assume a GNU/Linux platform, for OS X you will need to install musl-cross).

$ cargo build --release --target=x86_64-unknown-linux-musl

And then zip up the resulting binary, to upload it in the AWS Lambda console:

$ zip bootstrap.zip ./target/x86_64-unknown-linux-musl/release/bootstrap

Upload this zip file to the Lambda console and add the S3 trigger for the bucket set in the Inbound rule, then add a Cloudwatch trigger on a schedule for sending the prompt email.

Summary

In this post we have seen how to build a simple serverless data pipeline in Rust, based around ingesting and sending data via email.

Note that we use lambda_runtime v0.2.1 in the above code. Soon v0.3 will release which will allow us to use async functions directly. I think even in v0.2.1 there is a way of passing the Tokio runtime in the handler Context so we wouldn't need to pass it manually, but I didn't investigate it much since the new changes will be stable very soon (see this PR for example).

We used the alpha version of the new lettre release so that we could use rustls everywhere. In the future the new API will change and hopefully become more ergonomic like v0.9.3.

Possible Improvements

If we were to extend this example, a simple task would be to handle zip file attachments with the zip crate. We could also send responses as actual email replies using the In-Reply-To and References headers, this would help to ensure the sent emails are appropriately viewable in the recipient's client (i.e. otherwise Gmail might thread our new messages with older ones since they share the same Subject).

We should also better handle the case where we receive no attachment, or multiple attachments. This could be handled elegantly by moving the error email sending outside of the handle_email() function and adding custom error types for these different cases. So in the case of any error we could send an appropriate notification email.

It would also be good to deploy the Lambda function with the AWS Cloud Development Kit (CDK) so we can create all resources necessary, and keep the necessary infrastructure steps with the code. Unfortunately jsii does not yet have Rust bindings so the actual CDK code would need to be written in a supported language such as TypeScript.


Many thanks to all of the contributors to the crates used in this example, and especially to the maintainers of lambda_runtime and Rusoto whose efforts have made Rust a viable choice in this space.