schmohl.io

Go Testing: Mocks for Google Cloud Pub/Sub

01 Nov 2016

I recently noticed a github issue which pleads for publicly exposed interfaces to aid in testing and dep injection of Google Cloud Go libraries.

Heeding the advice of the Google team, I use namespaces in order to run isolated tests on services that are backed by Cloud Datastore. In my CircleCI test, this is implemented with linked docker containers (1 for my application and 1 for the Gcloud emulator). This has worked well for me, especially given the complexity of a caching layer over some transactions.

Nevertheless, it’s a little simpler to write tests for Cloud Pub/sub if we create our own Go interface.

Reading Messages

One can find great examples of using the Pub/Sub API sometimes shrouded within GoDocs.

To read messages off of Pubsub, we might do something like this:

import (
	"context"
	"log"

	ps "cloud.google.com/go/pubsub"
	"google.golang.org/api/iterator"
)

const N = 10

func pull(ctx context.Context, sub *ps.Subscription) <-chan *ps.Message {

	out := make(chan *ps.Message)

	go func() {
		it, err := sub.Pull(ctx)
		if err != nil {
			log.Printf("error pulling: %v", err)
			return
		}
		defer it.Stop()

		for i := 0; i < N; i++ {
			msg, err := it.Next()
			if err == iterator.Done {
				break
			}
			if err != nil {
				// handle error somehow
				log.Printf("error pulling: %v", err)
				break
			}

			out <- msg
		}
	}()

	return out
}

The function pulls up to N messages asynchronously and returns a channel that is closed after no more messages are available, or an error occurs. The messages aren’t ACKed, and that is left to the consumer of the channel by calling msg.Done(true).

Refactoring with Interfaces

The ubiquity of such an operation lends itself nicely to writing an interface:

type puller interface {
	Pull() <-chan envelope
	Size() int
}

type envelope interface {
	Id() string
	Data() []byte
	Ack()
}

We’ve defined an envelope interface so that we can acknowledge messages that have been successfully processed, and avoid the RPC call in Done(bool) for tests. Below are both “real” and “fake” implementations to use in our application, and tests, respectively:

type Puller struct {
	sub *ps.Subscription
	n   int
	ctx context.Context
}

func New(ctx context.Context, sub *ps.Subscription, size int) *Puller {
	return &Puller{
		sub: sub,
		n:   size,
		ctx: ctx,
	}
}

type Envelope struct {
	*ps.Message
}

func (e Envelope) Id() string   { return e.ID }
func (e Envelope) Data() []byte { return e.Message.Data }
func (e Envelope) Ack()         { e.Done(true) }

func (p *Puller) Size() int { return p.n }

func (p *Puller) Pull() <-chan envelope {
	out := make(chan envelope)

	go func() {
		it, err := p.sub.Pull(p.ctx)
		if err != nil {
			log.Printf("error pulling: %v", err)
			return
		}
		defer it.Stop()

		for i := 0; i < p.Size(); i++ {
			msg, err := it.Next()
			if err == iterator.Done {
				break
			}
			if err != nil {
				// handle error somehow
				log.Printf("error pulling: %v", err)
				break
			}

			out <- Envelope{msg}
		}
	}()

	return out
}

type fakePuller struct {
	results []*fakeEnvelope
}

type fakeEnvelope struct {
	data []byte
	ID   string
}

func (e *fakeEnvelope) Id() string   { return e.ID }
func (e *fakeEnvelope) Data() []byte { return e.data }
func (e *fakeEnvelope) Ack()         {}

func (p *fakePuller) Size() int { return len(p.results) }

func (p *fakePuller) Pull() <-chan envelope {
	out := make(chan envelope)
	for i := 0; i < p.Size(); i++ {
		out <- p.results[i]
	}

	return out
}

Usage

In our application:

func CountWords(p puller) (num int) {
	for sentence := range p.Pull() {
		num += len(strings.Split(string(sentence.Data()), " "))
	}
	return
}

and in our tests:

func TestCountWords(t *testing.T) {
	expected := 5

	pulled := []*fakeEnvelope{
		&fakeEnvelope{data: []byte(`hello world`)},
		&fakeEnvelope{data: []byte(`foo bar`)},
		&fakeEnvelope{data: []byte(`bar`)},
	}
	mock := &fakePuller{pulled}

	got := CountWords(mock)
	if expected != got {
		t.Errorf("TestProcess; expected %d; got %d;", expected, got)
	}
}