Understanding the pipeline pattern

Published by at 5th October 2018 6:36 pm

In a previous post, I used the pipeline pattern to demonstrate processing letters using optical recognition and machine learning. The pipeline pattern is something I've found very useful in recent months. For a sequential series of tasks, this approach can make your code easier to understand by allowing you to break it up into simple, logical steps which are easy to test and understand individually. If you're familiar with pipes and redirection in Unix, you'll be aware of how you can chain together multiple, relatively simple commands to carry out some very complex transformations on data.

A few months back, I was asked to build a webhook for a Facebook lead form at work. One of my colleagues was having to manually export CSV data from Facebook for the data, and then import it into a MySQL database and a Campaign Monitor mailing list, which was an onerous task, so they asked me to look at more automated solutions. I wound up building a webhook with Lumen that would go through the following steps:

  • Get the lead ID's from the webhook
  • Pull the leads from the Facebook API using those ID's
  • Process the raw data into a more suitable format
  • Save the data to the database
  • Push the data to Campaign Monitor

Since this involved a number of discrete steps, I chose to implement each step as a separate stage. That way, each step was easy to test in isolation, and it was easily reusable. As it turned out, this approach saved us because Facebook needed to approve this app (and ended up rejecting it - their documentation at the time wasn't clear on implementing server-to-server apps, making it hard to meet their guidelines), so we needed an interim solution. I instead wrote an Artisan task for importing the file from a CSV, which involved the following steps:

  • Read the rows from the CSV file
  • Format the CSV data into the desired format
  • Save the data to the database
  • Push the data to Campaign Monitor

This meant that two of the existing steps could be reused, as is, without touching the code or tests. I just added two new classes to read the data and format the data, and the Artisan command, which simply called the various pipeline stages, and that was all. In this post, I'll demonstrate how I implemented this.

While there is more than one implementation of this available, and it wouldn't be hard to roll your own, I generally use the PHP League's Pipeline package, since it's simple, solid and well-tested. Let's say our application has three steps:

  • Format the request data
  • Save the data
  • Push it to a third party service.

We therefore need to write a stage for each step in the process. Each one must be a callable, such as a closure, a callback, or a class that implements the __invoke() magic method. I usually go for the latter as it allows you to more easily inject dependencies into the stage via its constructor, making it easier to use and test. Here's what our first stage might look like:

3namespace App\Stages;
5use Illuminate\Support\Collection;
7class FormatData
9 public function __invoke(Collection $data): Collection
10 {
11 return $data->map(function ($item) {
12 return [
13 'name' => $item->fullname,
14 'email' => $item->email
15 ];
16 });
17 }

This class does nothing more than receive a collection, and format the data as expected. We could have it accept a request object instead, but I opted not to because I felt it made more sense to pass the data in as a collection so it's not tied to an HTTP request. That way, it can also handle data passed through from a CSV file using an Artisan task, and the details of how it receives the data in the first place are deferred to the class that calls the pipeline in the first place. Note this stage also returns a collection, for handling by the next step:

3namespace App\Stages;
5use App\Lead;
6use Illuminate\Support\Collection;
8class SaveData
10 public function __invoke(Collection $data): Collection
11 {
12 return $data->map(function ($item) {
13 $lead = new Lead;
14 $lead->name = $item->name;
15 $lead->email = $item->email;
16 $lead->save();
17 return $lead;
18 }
19 }

This step saves each lead as an Eloquent model, and returns a collection of the saved models, which are passed to the final step:

3namespace App\Stages;
5use App\Contracts\Services\MailingList;
6use Illuminate\Support\Collection;
8class AddDataToList
10 protected $list;
12 public function __construct(MailingList $list)
13 {
14 $this->list = $list;
15 }
17 public function __invoke(Collection $data)
18 {
19 return $data->each(function ($item) {
20 $this->list->add([
21 'name' => $item->name,
22 'email' => $item->email
23 ]);
24 });
25 }

This step uses a wrapper class for a mailing service, which is passed through as a dependency in the constructor. The __invoke() method then loops through each Eloquent model and uses it to fetch the data, which is then added to the list. With our stages complete, we can now put them together in our controller:

3namespace App\Http\Controllers;
5use Illuminate\Http\Request;
6use App\Stages\FormatData;
7use App\Stages\SaveData;
8use App\Stages\AddDataToList;
9use League\Pipeline\Pipeline;
10use Illuminate\Support\Collection;
12class WebhookController extends Controller
14 public function store(Request $request, Pipeline $pipeline, FormatData $formatData, SaveData $savedata, AddDataToList $addData)
15 {
16 try {
17 $data = Collection::make($request->get('data'));
18 $pipe = $pipeline->pipe($formatData)
19 ->pipe($saveData)
20 ->pipe($addData);
21 $pipe->process($data);
22 } catch (\Exception $e) {
23 // Handle exception
24 }
25 }

As mentioned above, we extract the request data (assumed to be an array of data for a webhook), and convert it into a collection. Then, we put together our pipeline. Note that we use dependency injection to fetch the steps - feel free to use method or constructor injection as appropriate. We instantiate our pipeline, and call the pipe() method multiple times to add new stages.

Finally we pass the data through to our pipe for processing by calling the process() method, passing in the initial data. Note that we can wrap the whole thing in a try...catch statement to handle exceptions, so if something happens that would mean we would want to cease processing at that point, we can throw an exception in the stage and handle it outside the pipeline.

This means that our controller is kept very simple. It just gets the data as a collection, then puts the pipeline together and passes the data through. If we subsequently had to write an Artisan task to do something similar from the command line, we could fetch the data via a CSV reader class, and then pass it to the same pipeline. If we needed to change the format of the initial data, we could replace the FormatData class with a single separate class with very little trouble.

Another thing you can do with the League pipeline package, but I haven't yet had the occasion to try, is use League\Pipeline\PipelineBuilder to build pipelines in a more dynamic fashion. You can make steps conditional, as in this example:

3use League\Pipeline\PipelineBuilder;
5$builder = (new PipelineBuilder)
6 ->add(new FormatData);
7if ($data['type'] = 'foo') {
8 $builder->add(new HandleFooType);
10$builder->add(new SaveData);
11$pipeline = $builder->build();

The pipeline pattern isn't appropriate for every situation, but for anything that involves a set of operations on the same data, it makes a lot of sense, and can make it easy to break larger operations into smaller steps that are easier to understand, test, and re-use.