This course is still being built. Content will change. Get updates on the mailing list.

Send newsletter signup messages

You'll learn
  • Preparing to send newsletter confirmation emails after signup.
  • Using AWS SQS as a job queue.

Now that you're able to receive and store email addresses of your newsletter-subscribers-to-be, it's time to prepare for being able to send them some emails. There are two important emails for them: a confirmation email, and a welcome email.

The first email is going to be the one that asks a subscriber to confirm his or her email address. You've probably clicked one of these a thousand times yourself, but have you ever thought about why we have it? I'll give you the short answer: it's to verify, via some kind of secret that only the email recipient has, that the person behind that email address actually wants to receive the newsletter.

Without it, it would be easy for malicious people or bots to add your email address to a million newsletters, flooding your inbox. At least with this common confirmation step, you would only be flooded with confirmation emails.

And importantly, it's also a legal requirement many places in the world.

The second email will be an email welcoming your new subscriber to the newsletter list, and it will be sent out immediately after the email address has been confirmed.

Application flow

The basic flow for the first email will look like this:

  1. Someone enters an email address and hits submit on the signup page.
  2. Your signup handler saves the email address, and sends a message to a message queue. The message includes the type of email to be sent, the email address, and a secret token.
  3. A different component gets the message from the message queue, and sends the confirmation email.
  4. The user is redirected to a page and asked to check his or her email inbox.

Similarly, the flow for the second email is this:

  1. A subscriber clicks a link in the confirmation email, which includes a secret token and the email address as URL query parameters.
  2. The HTTP handler behind that link verifies the email address and secret token, and sends another message to the queue if the email address and token match what we have in the database.
  3. A different component receives the message from the queue and sends out a welcome email.
  4. The user is redirected to a page confirming that everything went well.

Excellent, let's build that. Except: we haven't heard about a message queue before. What's that?

Message queues

A message queue is a resource that a process can use, which accepts messages with any content, and queues them up until another process reads and deletes them. It's a core building block of cloud services, just like a database.

Message queues are a nice way to decouple components in your code that need to pass information to one another, but shouldn't necessarily talk directly to each other via for example a function call in your code or an HTTP request. Why do we need that?

There are two reasons. One is about user experience and performance. When someone signs up with their email address or clicks a link in an email, it should be fast. No-one should be forced to wait long for machines to talk to one another, so we can make it fast by running things that could be slow—like sending an email through an external system, or more generally performing longer-running processing—outside of the code that sends the HTTP response. Sending a message to a message queue is fast.

The second reason is about failure and error handling. You usually have to design systems with failures in mind, because they happen all the time in a distributed system. Hard drives crash, network cables get accidentally pulled out in the datacenter, network packets get lost, programs crash, electronics fail. You get the picture. Without a message queue, we would have to write error handling code ourselves. With it, we get a lot of that functionality for free. This built-in failure handling is a big reason why we don't just use a goroutine that runs parallel to the HTTP handler code.

When we use a message queue in this way, it's often called a job queue, because the messages represent jobs to be performed by a separate process.

Unfortunately, there is no queue in AWS Lightsail directly, but we can use AWS SQS (short for Simple Queue Service) from the main AWS products. More on that later.

Bonus: Why all the talk of failures?

Sending a message from the handler

Follow along with the code, or see the final result with:

$ git fetch && git checkout --track golangdk/newsletter-queue

See the diff on Github.

The first thing we will do is write the code in the handler that sends a message to the queue. We'll define a small interface with just one function for sending, and assume that whatever component the handler gets passed later satisfies that interface. Open up handlers/newsletter.go and add (changes highlighted):

handlers/newsletter.go
package handlers import ( "context" "net/http" "github.com/go-chi/chi/v5" "canvas/model" "canvas/views" ) type signupper interface { SignupForNewsletter(ctx context.Context, email model.Email) (string, error) } type sender interface { Send(ctx context.Context, m model.Message) error } func NewsletterSignup(mux chi.Router, s signupper, q sender) { mux.Post("/newsletter/signup", func(w http.ResponseWriter, r *http.Request) { email := model.Email(r.FormValue("email")) if !email.IsValid() { http.Error(w, "email is invalid", http.StatusBadRequest) return } token, err := s.SignupForNewsletter(r.Context(), email) if err != nil { http.Error(w, "error signing up, refresh to try again", http.StatusBadGateway) return } err = q.Send(r.Context(), model.Message{ "job": "confirmation_email", "email": email.String(), "token": token, }) if err != nil { http.Error(w, "error signing up, refresh to try again", http.StatusBadGateway) return } http.Redirect(w, r, "/newsletter/thanks", http.StatusFound) }) } // …

Notice the sender interface, which is used as a new parameter q in the NewsletterSignup function. With it, we can Send a model.Message to the queue. Looks pretty easy, right?

For this to work, we need the message model. Put this in model/model.go:

model/model.go
package model // Message for communication through a queue. type Message = map[string]string

As you can see, the Message is actually just an alias for a string map.

We need to adjust the tests as well, to use a mock that satisfies the sender interface and checks that the correct message is being sent:

handlers/newsletter_test.go
package handlers_test import ( "context" "io" "net/http" "net/http/httptest" "strings" "testing" "github.com/go-chi/chi/v5" "github.com/matryer/is" "canvas/handlers" "canvas/model" ) type signupperMock struct { email model.Email } func (s *signupperMock) SignupForNewsletter(ctx context.Context, email model.Email) (string, error) { s.email = email return "123", nil } type senderMock struct { m model.Message } func (s *senderMock) Send(ctx context.Context, m model.Message) error { s.m = m return nil } func TestNewsletterSignup(t *testing.T) { mux := chi.NewMux() s := &signupperMock{} q := &senderMock{} handlers.NewsletterSignup(mux, s, q) t.Run("signs up a valid email address and sends a message", func(t *testing.T) { is := is.New(t) code, _, _ := makePostRequest(mux, "/newsletter/signup", createFormHeader(), strings.NewReader("email=me%40example.com")) is.Equal(http.StatusFound, code) is.Equal(model.Email("me@example.com"), s.email) is.Equal(q.m, model.Message{ "job": "confirmation_email", "email": "me@example.com", "token": "123", }) }) // …

See how defining the interface at the consumer side makes testing really easy again? We don't care about other methods the component might have for interacting with a queue, we only care about Send.

Using the queue

Now that we have the handler code, we need the component that satisfies the sender interface. This component will talk to an actual queue resource, much like storage.Database talks to the database. Let's create the Queue in a new file messaging/queue.go:

messaging/queue.go
// Package messaging is for components that enable messaging to other systems. package messaging import ( "context" "encoding/json" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "go.uber.org/zap" "canvas/model" ) type Queue struct { Client *sqs.Client log *zap.Logger mutex sync.Mutex name string url *string waitTime time.Duration } type NewQueueOptions struct { Config aws.Config Log *zap.Logger Name string WaitTime time.Duration } func NewQueue(opts NewQueueOptions) *Queue { if opts.Log == nil { opts.Log = zap.NewNop() } return &Queue{ Client: sqs.NewFromConfig(opts.Config), log: opts.Log, name: opts.Name, waitTime: opts.WaitTime, } } // Send a message to the queue as JSON. func (q *Queue) Send(ctx context.Context, m model.Message) error { if q.url == nil { if err := q.getQueueURL(ctx); err != nil { return err } } messageAsBytes, err := json.Marshal(m) if err != nil { return err } messageAsString := string(messageAsBytes) _, err = q.Client.SendMessage(ctx, &sqs.SendMessageInput{ MessageBody: &messageAsString, QueueUrl: q.url, }) return err } // Receive a message and its receipt ID from the queue. Returns nil if no message is available. func (q *Queue) Receive(ctx context.Context) (*model.Message, string, error) { if q.url == nil { if err := q.getQueueURL(ctx); err != nil { return nil, "", err } } output, err := q.Client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: q.url, WaitTimeSeconds: int32(q.waitTime.Seconds()), }) if err != nil { return nil, "", err } if len(output.Messages) == 0 { return nil, "", nil } var m model.Message if err := json.Unmarshal([]byte(*output.Messages[0].Body), &m); err != nil { return nil, "", err } return &m, *output.Messages[0].ReceiptHandle, nil } // Delete a message by receipt ID. func (q *Queue) Delete(ctx context.Context, receiptID string) error { if q.url == nil { if err := q.getQueueURL(ctx); err != nil { return err } } _, err := q.Client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: q.url, ReceiptHandle: &receiptID, }) return err } // getQueueURL under a lock. func (q *Queue) getQueueURL(ctx context.Context) error { q.mutex.Lock() defer q.mutex.Unlock() // Check again after the lock, we might have the URL already. if q.url != nil { return nil } output, err := q.Client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{ QueueName: &q.name, }) if err != nil { return err } q.url = output.QueueUrl return nil }

In many ways, this code looks familiar. We have a NewQueue function that can create a Queue struct for us with some default options. The most important parameter here is the name, which is the queue name used to identify the queue throughout the component's lifetime.

Note the new imports, github.com/aws/aws-sdk-go-v2/aws and github.com/aws/aws-sdk-go-v2/service/sqs, which are Amazon's libraries for talking to AWS SQS. Get them, along with two more dependencies we'll need later, using:

$ go get -u github.com/aws/aws-sdk-go-v2 github.com/aws/aws-sdk-go-v2/config github.com/aws/aws-sdk-go-v2/service/sqs github.com/aws/smithy-go

The Queue has three methods: Send, Receive, and Delete. Let's go through each of them and see what they do.

Sending a message

The Send function signature is familiar. It's the one we've used in the sender interface previously.

First up is getting the queue URL. This is an implementation detail of how AWS SQS works. Essentially, it translates the queue name into a URL that the library can contact. The URL is different in development, testing, and production, so it gets constructed here the first time any of the three queue methods are used. We're using a to ensure that the queue can be used concurrently, and that we only get the URL once.

After that, the message (which is really a string map) gets converted to JSON and sent to the queue. That's it.

Receiving a message

Getting a message from the queue is slightly more complex, because of the error handling I mentioned earlier. After checking the URL like in Send, we contact the queue and ask for a message, waiting a maximum of waitTime seconds for it. If there is no message, we return nil.

If there is a message, we turn the JSON back into a Message, and return it, along with the receipt id, which is used when deleting the message.

This deletion of the mesage is important. AWS SQS delivers the messages using Receive, but they will be redelivered, perhaps to another process, if they are not deleted. This is to ensure that all messages are processed properly, so the deletion is basically the component telling the queue that the message was processed succesfully.

They won't be redelivered immediately, otherwise you would have a lot of redundant processing done. Instead, a configuration parameter called the visibility timeout controls how long the queue waits before marking a message delivery as failed and redelivering it. If you delete the message before this timeout has elapsed, the message is not redelivered.

Deleting a message

Now that you know why we delete the message, the how is easy in comparison. We just check the URL once again, and otherwise tell the queue to delete the message identified by the receipt id.

Bonus: More SQS features

Testing the queue

Because the queue is a core component that we'll use a lot, let's write a test for it that sends, receives, and deletes a message. Open messaging/queue_test.go:

messaging/queue_test.go
package messaging_test import ( "context" "testing" "github.com/matryer/is" "canvas/integrationtest" "canvas/model" ) func TestQueue(t *testing.T) { integrationtest.SkipIfShort(t) t.Run("sends a message to the queue, receives it, and deletes it", func(t *testing.T) { is := is.New(t) queue, cleanup := integrationtest.CreateQueue() defer cleanup() err := queue.Send(context.Background(), model.Message{ "foo": "bar", }) is.NoErr(err) m, receiptID, err := queue.Receive(context.Background()) is.NoErr(err) is.Equal(model.Message{"foo": "bar"}, *m) is.True(len(receiptID) > 0) err = queue.Delete(context.Background(), receiptID) is.NoErr(err) m, _, err = queue.Receive(context.Background()) is.NoErr(err) is.Equal(nil, m) }) }

We send a test message to the queue, receive it right after, check that the content is the same, and delete it from the queue. Notice that the test also runs a final receive, and checks that there is no message.

This integration test can't run yet, because we're missing the integrationtest.CreateQueue helper. This helper is very similar to the integrationtest.CreateDatabase helper we have created earlier. It basically makes sure that a queue exists before the test, and deletes it again after. It looks like this:

integrationtest/queue.go
package integrationtest import ( "context" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/maragudk/env" "canvas/messaging" ) // CreateQueue for testing. // Usage: // queue, cleanup := CreateQueue() // defer cleanup() // … func CreateQueue() (*messaging.Queue, func()) { env.MustLoad("../.env-test") name := env.GetStringOrDefault("QUEUE_NAME", "jobs") queue := messaging.NewQueue(messaging.NewQueueOptions{ Config: getAWSConfig(), Name: name, }) createQueueOutput, err := queue.Client.CreateQueue(context.Background(), &sqs.CreateQueueInput{ QueueName: &name, }) if err != nil { panic(err) } return queue, func() { _, err := queue.Client.DeleteQueue(context.Background(), &sqs.DeleteQueueInput{ QueueUrl: createQueueOutput.QueueUrl, }) if err != nil { panic(err) } } }

Because there is some AWS-specific configuration code we want to use elsewhere later, put it in a separate file:

integrationtest/aws.go
package integrationtest import ( "context" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/maragudk/env" ) func getAWSConfig() aws.Config { awsConfig, err := config.LoadDefaultConfig(context.Background(), config.WithEndpointResolver(createAWSEndpointResolver()), ) if err != nil { panic(err) } return awsConfig } func createAWSEndpointResolver() aws.EndpointResolverFunc { sqsEndpointURL := env.GetStringOrDefault("SQS_ENDPOINT_URL", "") if sqsEndpointURL == "" { panic("sqs endpoint URL must be set in testing with env var SQS_ENDPOINT_URL") } return func(service, region string) (aws.Endpoint, error) { if sqsEndpointURL != "" && service == sqs.ServiceID { return aws.Endpoint{ URL: sqsEndpointURL, }, nil } return aws.Endpoint{}, &aws.EndpointNotFoundError{} } }

This code creates a configuration struct for the AWS library to use, including a so-called endpoint resolver that points the library to the right endpoint for the queue, so we don't use a production system in testing.

This code won't work yet. Why? Because there's no queue system to talk to in integration testing! Let's add an SQS-compatible queue to our dependencies, called elasticmq. Like with Postgres, we pull it through Docker. Add both a development and testing container to your docker-compose.yaml file:

docker-compose.yaml
version: '3.8' services: postgres: image: postgres:12 environment: POSTGRES_USER: canvas POSTGRES_PASSWORD: 123 ports: - 5432:5432 volumes: - postgres:/var/lib/postgresql/data elasticmq: image: softwaremill/elasticmq-native ports: - 9324:9324 - 9325:9325 volumes: - ./elasticmq.conf:/opt/elasticmq.conf postgres-test: image: postgres:12 environment: POSTGRES_USER: test POSTGRES_PASSWORD: 123 POSTGRES_DB: template1 ports: - 5433:5432 elasticmq-test: image: softwaremill/elasticmq-native ports: - 9326:9324 volumes: postgres:

The final part of getting elasticmq up and running for development is to add a small configuration file in the root of your project:

elasticmq.conf
# https://github.com/softwaremill/elasticmq#automatically-creating-queues-on-startup include classpath("application.conf") queues { jobs { defaultVisibilityTimeout = 60 seconds receiveMessageWait = 20 seconds } }

And then point your development and test code to the local containers, and add some fake API credentials that elasticmq needs:

.env
DB_USER=canvas DB_PASSWORD=123 DB_NAME=canvas SQS_ENDPOINT_URL=http://localhost:9324 AWS_ACCESS_KEY_ID=access AWS_SECRET_ACCESS_KEY=secretsecret
.env-test
DB_PASSWORD=123 DB_PORT=5433 SQS_ENDPOINT_URL=http://localhost:9326 AWS_ACCESS_KEY_ID=access AWS_SECRET_ACCESS_KEY=secretsecret

Wheew, what a lot of setup code. 😮‍💨 It's always like that the first time a big new component is added, such as a database or queue. Luckily, we only have to do it once, and everything will be easier and faster later. I promise.

Remember, you can get your new containers up and running using the docker compose command:

$ docker compose up -d
[+] Running 5/5
 ⠿ Network canvas_default             Created
 ⠿ Container canvas_postgres_1        Started
 ⠿ Container canvas_postgres-test_1   Started
 ⠿ Container canvas_elasticmq-test_1  Started
 ⠿ Container canvas_elasticmq_1       Started

Excellent, that looks good. You should now be able to run your integration tests and see that the new queue works as expected, if you add nil to your handler in server/routes.go where the queue should be. And of course, this is the last part we are missing: hooking up the queue to the web server.

Add the queue to Server

Now that we know the queue is working, it's time to add it to our server and routes so that the handler can use it:

server/server.go
// Package server contains everything for setting up and running the HTTP server. package server import ( "context" "errors" "fmt" "net" "net/http" "strconv" "time" "github.com/go-chi/chi/v5" "go.uber.org/zap" "canvas/messaging" "canvas/storage" ) type Server struct { address string database *storage.Database log *zap.Logger mux chi.Router queue *messaging.Queue server *http.Server } type Options struct { Database *storage.Database Host string Log *zap.Logger Port int Queue *messaging.Queue } func New(opts Options) *Server { if opts.Log == nil { opts.Log = zap.NewNop() } address := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) mux := chi.NewMux() return &Server{ address: address, database: opts.Database, log: opts.Log, mux: mux, queue: opts.Queue, server: &http.Server{ // …
server/routes.go
package server import ( "canvas/handlers" ) func (s *Server) setupRoutes() { handlers.Health(s.mux, s.database) handlers.FrontPage(s.mux) handlers.NewsletterSignup(s.mux, s.database, s.queue) handlers.NewsletterThanks(s.mux) }

Also, we might use the queue in integration tests for the server later, so let's add it here as well:

integrationtest/server.go
package integrationtest import ( "net/http" "testing" "time" "canvas/server" ) // CreateServer for testing on port 8081, returning a cleanup function that stops the server. // Usage: // cleanup := CreateServer() // defer cleanup() func CreateServer() func() { db, cleanupDB := CreateDatabase() queue, cleanupQueue := CreateQueue() s := server.New(server.Options{ Host: "localhost", Port: 8081, Database: db, Queue: queue, }) // … return func() { if err := s.Stop(); err != nil { panic(err) } cleanupDB() cleanupQueue() } } // SkipIfShort skips t if the "-short" flag is passed to "go test". func SkipIfShort(t *testing.T) { if testing.Short() { t.SkipNow() } }

The very last file we need to add some glue code to is cmd/server/main.go. In here, we load the configuration for the queue (similar to loading it in integration testing), construct it, and add it to the server.

cmd/server/main.go
// Package main is the entry point to the server. It reads configuration, sets up logging and error handling, // handles signals from the OS, and starts and stops the server. package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/smithy-go/logging" "github.com/maragudk/env" "go.uber.org/zap" "golang.org/x/sync/errgroup" "canvas/messaging" "canvas/server" "canvas/storage" ) // … func start() int { // … awsConfig, err := config.LoadDefaultConfig(context.Background(), config.WithLogger(createAWSLogAdapter(log)), config.WithEndpointResolver(createAWSEndpointResolver()), ) if err != nil { log.Info("Error creating AWS config", zap.Error(err)) return 1 } s := server.New(server.Options{ Database: createDatabase(log), Host: host, Log: log, Port: port, Queue: createQueue(log, awsConfig), }) // … } // … func createAWSLogAdapter(log *zap.Logger) logging.LoggerFunc { return func(classification logging.Classification, format string, v ...interface{}) { switch classification { case logging.Debug: log.Sugar().Debugf(format, v...) case logging.Warn: log.Sugar().Warnf(format, v...) } } } // createAWSEndpointResolver used for local development endpoints. // See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/endpoints/ func createAWSEndpointResolver() aws.EndpointResolverFunc { sqsEndpointURL := env.GetStringOrDefault("SQS_ENDPOINT_URL", "") return func(service, region string) (aws.Endpoint, error) { if sqsEndpointURL != "" && service == sqs.ServiceID { return aws.Endpoint{ URL: sqsEndpointURL, }, nil } // Fallback to default endpoint return aws.Endpoint{}, &aws.EndpointNotFoundError{} } } // … func createQueue(log *zap.Logger, awsConfig aws.Config) *messaging.Queue { return messaging.NewQueue(messaging.NewQueueOptions{ Config: awsConfig, Log: log, Name: env.GetStringOrDefault("QUEUE_NAME", "jobs"), WaitTime: env.GetDurationOrDefault("QUEUE_WAIT_TIME", 20*time.Second), }) }

We create the aws.Config struct with a custom log adapter, to glue the zap logger together with the AWS logger, and a custom endpoint resolver, to be able to point the AWS library to our local development queue. And that's it!

You've seen the createAWSEndpointResolver function before, in our integrationtest package. Shouldn't we reuse that function somehow, even though it's not exactly the same? You could argue that, but sometimes, it's simpler to copy a little code than to pull a function out into yet another package and refactor it to work in both places. So now we have it twice with only a small difference, and that's fine.

Seeing messages in action

You should now be able to start your server, sign up with an email address on localhost:8080, and see a message coming in on elasticmq's dashboard at localhost:9325. Congratulations, you've now added a message queue to your web app! 🎉

Up next: reading the messages from the queue and actually sending some emails out.

Review questions

Sign up or log in to get review questions by email! 📧

Questions?

Get help on Twitter or by email.