Understanding Futures in Rust -- Part 2

Joe Jackson, Former Senior Developer

Article Category: #Code

Posted on

Futures make async programming in Rust easy and readable. Learn how to use futures by building them from scratch. Part 2 focuses on combinators.

Background

If you missed it, go checkout Part 1.

In part one we covered the Future trait, saw how futures are created and run, and began to see how they can be chained together.

The final product from last time can be found here at this playground link, and the code found there will be the starting point for all examples covered in this post.

Note: All code examples have a playground link and ones that don't compile but are used to illustrate a point will be marked.

Objective

If you're familiar with promises in JavaScript and followed the last blog post you may have been confused about where the familiar combinators (then, catch, and finally) were in the previous post.

You will find their equivalents in this post, and, by the end, the following code will compile. You will also gain an understanding of the types, traits, and underling concepts that make futures work.

// This does not compile, yet

fn main() {
    let my_future = future::ready(1)
        .map(|x| x + 3)
        .map(Ok)
        .map_err(|e: ()| format!("Error: {:?}", e))
        .and_then(|x| future::ready(Ok(x - 3)))
        .then(|res| {
            future::ready(match res {
                Ok(val) => Ok(val + 3),
                err => err,
            })
        });

    let val = block_on(my_future);
    assert_eq!(val, Ok(4));
}

Utility Functions

First we need a couple of utility functions, future::ready and block_on. These functions will allow us to easily create and run futures to completion and, while useful, are not very common in production code.

Before we do anything else we are going to move our Future trait and Context struct into modules that mirror those found in the standard library.

mod task {
    use crate::NOTIFY;

    pub struct Context<'a> {
        waker: &'a Waker,
    }

    impl<'a> Context<'a> {
        pub fn from_waker(waker: &'a Waker) -> Self {
            Context { waker }
        }

        pub fn waker(&self) -> &'a Waker {
            &self.waker
        }
    }

    pub struct Waker;

    impl Waker {
        pub fn wake(&self) {
            NOTIFY.with(|f| *f.borrow_mut() = true)
        }
    }

}
use crate::task::*;

mod future {
    use crate::task::*;

    pub enum Poll<T> {
        Ready(T),
        Pending,
    }

    pub trait Future {
        type Output;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output>;
    }
}
use crate::future::*;

Playground Link

The main details to note here are just that the module, types and functions need to be made public to be used in the rest of the code. This is accomplished using the pub keyword.

Helper Function Implementations

future::ready

future::ready creates a future that is immediately ready with the value passed in. This function is useful for starting a future chain when you have a value that is not a future already, as seen in previous examples.

mod future {
    // ...

    pub struct Ready<T>(Option<T>);

    impl<T> Future for Ready<T> {
        type Output = T;

        fn poll(&mut self, _: &Context) -> Poll<Self::Output> {
            Poll::Ready(self.0.take().unwrap())
        }
    }

    pub fn ready<T>(val: T) -> Ready<T> {
        Ready(Some(val))
    }
}

fn main() {
    let my_future = future::ready(1);
    println!("Output: {}", run(my_future));
}

Playground Link

We create a generic struct of type Ready<T> that wraps an Option. We use the Option enum here to ensure that the poll function is only called once. It would be an error in the executor implementation if it did poll after returning a Poll::Ready response.

block_on

For our purposes we'll just rename our run function to block_on. In the futures-preview crate this function uses the LocalPool under the hood to run a future to completion while blocking the current thread. We are doing something very similar with our function.

fn block_on<F>(mut f: F) -> F::Output
where
    F: Future,
{
    NOTIFY.with(|n| loop {
        if *n.borrow() {
            *n.borrow_mut() = false;
            let ctx = Context::from_waker(&Waker);
            if let Poll::Ready(val) = f.poll(&ctx) {
                return val;
            }
        }
    })
}

fn main() {
    let my_future = future::ready(1);
    println!("Output: {}", block_on(my_future));
}

Playground Link

Combinators

First we'll start with some combinators that allow you to act directly on the value Output of another future. In this article we are using the informal, but popular definition of combinator, which are functions that allow you to operate on some type, combining it with other types.  For example, a nested future could be created with a combinator function and could have a complex type Future< Output = Future < Output = i32>>.  This can be read as a future, who's output is another future, who's output is of type i32. Once such combinator, and the simplest is map.

Map

If you are familiar with the map function for the Result or Option types, this should be very familiar. The map combinator takes a function and applies it to the future's Output value, returning a new future with the result of that function as its Output. In the case of futures the mapping is even simpler than in Result or Option since there is no failure case to take into account. The mapping is simply Future -> Future.

Here's the function signature:

// does not compile
fn map<U, F>(self: Sized, f: F) -> Map<Self, F>
where
    F: FnOnce(Self::Output) -> U,
    Self: Sized,

map is a generic function that takes a closure and returns a Map struct which itself implements Future. Instead of implementing the Future trait every time we want to chain on a value, like we did in the last part of this series, we can use these functions to do that for us.

Let's break this down:

  • Map<Self, F> declares that the type of the map function will included the current future, and the passed in function.
  • where is a keyword that allows us to add bounds to our types. For the F type parameter, we could define the bound inline map<U, F: FnOnce(Self::Output) -> U but is much more readable in the where clause.
  • FnOnce(Self::Output) -> U is the type definition of a function that takes the Output of the current future and returns any type U. FnOnce is one of several related function traits including FnMut, and Fn. FnOnce is the easiest to work with of these because the compiler can guarantee that that the function is only called once. It consumes and takes ownership of the environment values that is uses. Fn and FnMut borrow references to the environment either immutably or mutably. Closures all implement the FnOnce trait and ones that don't move values also implement the FnMut and Fn traits. This is one of the coolest things that rust does, allowing really expressive use of closures and first class function arguments. The section in the rust book on it is worth a read.
  • Self: Sized is a bound that only allows map to be called when the trait implementer is Sized. You won't really need to worry about this, but some types aren't sized. For example [i32] is a non-sized array since we don't know how long it is. If we tried to implement our Future trait for it we would not be able to call map on it.

Most of our combinators will follow this pattern, so we won't need to break them down in this much detail for the rest of the post.

Following is a full implementation of map, its Map type and the Future implementation for it.

mod future {
    trait Future {
        // ...

        fn map<U, F>(self, f: F) -> Map<Self, F>
        where
            F: FnOnce(Self::Output) -> U,
            Self: Sized,
        {
            Map {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...

    pub struct Map<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, F, T> Future for Map<Fut, F>
    where
        Fut: Future,
        F: FnOnce(Fut::Output) -> T,
    {
        type Output = T;

        fn poll(&mut self, cx: &Context) -> Poll<T> {
            match self.future.poll(cx) {
                Poll::Ready(val) => {
                    let f = self.f.take().unwrap();
                    Poll::Ready(f(val))
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1).map(|val| val + 1);
    println!("Output: {}", block_on(my_future));
}

Playground Link

At a high level, what's happening here is that when we call map on a future, we are constructing a map type which takes a reference to the current future, and the closure we passed in. The Map object itself is also a future. When it is polled, it polls the underlying future in turn. When the underlying future is ready it takes the value of of that future's Output and passes it to the closure, wrapping the value returned by that closure in Poll::Ready and passing the new value up the chain.

What you see here should be familiar if you read the last blog post, but I'll break it down as a quick refresher before we move on.

  • pub struct Map<Fut, F> is a generic type of a future, Fut and a function F.
  • f: Option<F> is that closure wrapped in an Option type. This is a bit of a trick to ensure that the closure is only called once. When you take the value of an option, it replaces the value with None and returns the contained value. This function panics if it is polled after returning a Poll::Ready. In practice, executors of futures never allow this to happen.
  • type Output = T; defines the output of the map future to be the same value return of our closure.
  • Poll::Read(f(val)) returns ready with result of the closure.
  • Poll::Pending => Poll::Pending if the underlying future returns pending, pass it through.
  • future::ready(1).map(|val| val + 1); this maps over the output of the ready future, and adds one to it. This returns a map future with a reference to the original future. The map future polls the ready when this is run. This does the exact same thing as our AddOneFuture

This is really cool for a couple of reasons. First of all, you don't have to implement a new future for every computation you want to do, they can be wrapped in combinators. In fact, unless you are implementing your own asynchronous operations you'll likely never have to directly implement the Future trait yourself.

The second reason is that this abstraction doesn't cost anything at runtime. The details of this are a little complicated, and I'm far from an expert. But, since the closure doesn't rely on its environment, the struct representing it will be empty. When this is compiled, there will be no allocation. There are other optimizations that happen, too. The upshot, and one of the best features of Rust, is that these abstractions are just as fast as if you were to implement the lower-level, imperative version yourself.

Then

Now we have map we can chain any computation we want, right? The answer to that is yes, but there is a pretty big caveat to that.

Imagine what happens when you have functions that return futures that you want to chain together. For this example, we can imagine they are subsequent api calls that return results wrapped in futures, get_user and get_files_for_user.

// does not compile
fn main() {
    let files_future = get_user(1).map(|user| get_files_for_user(user));
    println!("User Files: {}", block_on(files_future));
}

This won't compile, but you can imagine that the type you are constructing here would look something like this Future<Output = Future<Output= FileList>>. This is also a common problem when using Result and Option types. Using the map function will often result in nested output and some pretty nasty handling for that nesting. In this case you'd have to keep track of how many levels deep you were nested and call block_on for each nested future.

Fortunately Result, Option have a solution for that called and_then. and_then for Option maps Some(T) -> Some(U) by applying a function to T and returning the Option that the closure returns. For futures, this is done by a function called then that looks a lot like mapping, but the closure should return its own future. In some languages, this is called flatmap.  It's important to note that the value returned by the closure passed to then must be implement Future or you will get a compiler error. 

Here's our implementation of then, the Then struct and its implementation of the Future trait. Most of this mirrors what we've done with mapping.

mod future {
    trait Future {
        // ...
        fn then<Fut, F>(self, f: F) -> Then<Self, F>
        where
            F: FnOnce(Self::Output) -> Fut,
            Fut: Future,
            Self: Sized,
        {
            Then {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...

    pub struct Then<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, NextFut, F> Future for Then<Fut, F>
    where
        Fut: Future,
        NextFut: Future,
        F: FnOnce(Fut::Output) -> NextFut,
    {
        type Output = NextFut::Output;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.poll(cx) {
                Poll::Ready(val) => {
                    let f = self.f.take().unwrap();
                    f(val).poll(cx)
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1));
    println!("Output: {}", block_on(my_future));
}

Playground Link

The only piece that's really new here is f(val).poll(cx). This calls the closure with the previous future and directly returns the value you of poll.

The astute among you will realize that our Then::poll function can panic. If the first future returns ready but the second future returns Poll::Pending, then the line let f = self.f.take().unwrap(); will panic the next time it is polled and exit the program. In the futures-preview library this is handled using a type called Chain. Chain is implemented using unsafe code blocks, and using new type, Pin. This is beyond the scope of this article. For now we will assume that any future returned by a then closure will never return Poll::Pending. This is not a safe assumption in general!

Result Combinators

In version 0.1 of the futures-rs library the Future trait and the Result type where closely linked. The Future trait was defined like this:

// does not compile
trait Future {
    type Item;
    type Error;

    fn poll(self) -> Poll<Self::Item, Self::Error>;
}

The Poll type had a concept of a success state, a failure state and a not ready state. This meant that functions like map would only execute their closures when Poll was ready and not an error. While this could be a bit confusing it lead to some really nice ergonomics when chaining combinators, and making decisions based on the success or failure state.

This is accomplished differently with std::future. Now futures are either ready or not, and are agnostic to any success or failure semantics. They could contain any value, including a Result. To get handy combinators like map_err, which allows you to change the just the error type of a nested result, or and_then, which allows you to change the just the value type of a nested result, we'll need to implement a new trait. Here's a definition for TryFuture

mod future {
    //...
    pub trait TryFuture {
        type Ok;
        type Error;

        fn try_poll(self, cx: &mut Context) -> Poll<Result<Self::Ok, Self::Error>>;
    }

    impl<F, T, E> TryFuture for F
    where
        F: Future<Output = Result<T, E>>,
    {
        type Ok = T;
        type Error = E;

        fn try_poll(&mut self, cx: &Context) -> Poll<F::Output> {
            self.poll(cx)
        }
    }
}

Playground Link

TryFuture is a trait that we are implementing for every type <F, T, E> where F implements the Future trait and its Output type is a Result<T,E>. It only has one implementer. That implementer defines a try_poll function with an identical signature to poll on the Future trait. It simply calls the poll method.

This means any future that has an Output type of result also has access to its success/error state. This allows us to define some really handy combinators to work on those inner result types without having to match on Ok and Err explicitly in the body of a map or and_then combinators. Following are a couple of implementations to demonstrate this concept.

AndThen

Let's revisit our imaginary api functions from before. Now let's imagine that instead of always returning a value, that they exist in the real world of network partitions, and server outages. Those api methods will actually return a future with embedded result indicating that it has completed, and that it has either completed successfully or completed with an error. We need to handle those results, and here's how we might do it with the tools we already have.

// does not compile
fn main() {
    let files_future = get_user(1).then(|result| {
        match result {
            Ok(user) => get_files_for_user(user),
            Err(err) => future::ready(Err(err)),
        }
    });

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

This isn't too bad, but imagine that you want to chain more futures. It would get messy quickly. Fortunately we can define a combinator, and_then that will map the type Future<Output = Result<T, E>> to Future<Output = Result<U, E>> where we are changing the T to a U

Here's how we define that:

mod future {
    pub trait TryFuture {
        // ...

        fn and_then<Fut, F>(self, f: F) -> AndThen<Self, F>
        where
            F: FnOnce(Self::Ok) -> Fut,
            Fut: Future,
            Self: Sized,
        {
            AndThen {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...
    pub struct AndThen<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, NextFut, F> Future for AndThen<Fut, F>
    where
        Fut: TryFuture,
        NextFut: TryFuture<Error = Fut::Error>,
        F: FnOnce(Fut::Ok) -> NextFut,
    {
        type Output = Result<NextFut::Ok, Fut::Error>;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.try_poll(cx) {
                Poll::Ready(Ok(val)) => {
                    let f = self.f.take().unwrap();
                    f(val).try_poll(cx)
                }
                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1))
        .map(Ok::<i32, ()>)
        .and_then(|val| future::ready(Ok(val + 1)));

    println!("Output: {:?}", block_on(my_future));
}

Playground Link

This should look very familiar to you. In fact this is nearly identical to the implementation of the then combinator. There are just a couple of important differences to note:

  • The function is defined in the TryFuture trait.
  • type Output = Result<NextFut::Ok, Fut::Error>; means that the output of the AndThen future has the value type of the new future, and the error type of the future that came before it. In other words, the closure won't be executed if the previous future's output contains an error result.
  • We are calling try_poll rather than poll.

It's important to note that when chaining combinators like this their type signatures can get really long and hard to read in compiler error messages. The and_then function requires that the error type of the future its being called on and the error type of the future returned by the closure must be the same!

MapErr

Let's go back again to our fantasy api calls. Imagine that both api calls return a future with the same kind of error, but you need to do another step in between. Let's say you have to parse the first api result to pass into the second.

// does not compile
fn main() {
    let files_future = get_user(1)
        .and_then(|user_string| parse::<User>())
        .and_then(|user| get_files_for_user(user));

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

This looks fine but will fail to compile with a rather hard to decipher error message saying that it expected something like ApiError and found a ParseError. You can easily use the map_err combinator on the Result returned from parse, but what do you do with the futures? If we implement a map_err for a TryFuture, then we can rewrite that as.

// does not compile
fn main() {
    let files_future = get_user(1)
        .map_err(|e| format!("Api Error: {}", e))
        .and_then(|user_string| parse::<User>())
        .map_err(|e| format!("Parse Error: {}", e))
        .and_then(|user| get_files_for_user(user))
        .map_err(|e| format!("Api Error: {}", e));

    match block_on(files_future) {
        Ok(files) => println!("User Files: {}", files),
        Err(err) => println!("There was an error: {}", err),:w
    };
}

If this looks a little messy to you, stay tuned for part three of this series where I will talk about ways to handle this and other problems you will may across when using futures

Here's how we implement map_err

mod future {
    pub trait TryFuture {
        // ...

        fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
        where
            F: FnOnce(Self::Error) -> E,
            Self: Sized,
        {
            MapErr {
                future: self,
                f: Some(f),
            }
        }
    }

    // ...
    pub struct MapErr<Fut, F> {
        future: Fut,
        f: Option<F>,
    }

    impl<Fut, F, E> Future for MapErr<Fut, F>
    where
        Fut: TryFuture,
        F: FnOnce(Fut::Error) -> E,
    {
        type Output = Result<Fut::Ok, E>;

        fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
            match self.future.try_poll(cx) {
                Poll::Ready(result) => {
                    let f = self.f.take().unwrap();
                    Poll::Ready(result.map_err(f))
                }
                Poll::Pending => Poll::Pending,
            }
        }
    }
}

fn main() {
    let my_future = future::ready(1)
        .map(|val| val + 1)
        .then(|val| future::ready(val + 1))
        .map(Ok)
        .and_then(|val| future::ready(Ok(val + 1)))
        .map_err(|_: ()| 5);

    println!("Output: {:?}", block_on(my_future));
}

Playground Link

The only thing new to see here is Poll::Ready(result.map_err(f)). There we are passing our closure into the Result type's map_err function.

Wrap Up

Now, the code from the beginning of the post will run! The cool thing is that we implemented all of it ourselves.  There are a ton of other useful combinators, but they are all constructed nearly the same way.  As an exercise for the reader, try to implement a map_ok combinator that acts like map_err on TryFuture but for a successful result.

Playground Link

Recap

  • What makes Futures in rust so powerful is the suite of useful combinators available to chain computations, and asynchronous calls.
  • We've learned about about Rust's powerful function pointer traits, FnOnce, FnMut, and Fn.
  • We've learned how to work with the result type when it's embedded in a future.

Up Next

In part 3, we'll cover ways to make error handling less painless, how to deal with returning futures when you have many branching paths, and we'll dive into the exciting, feature-flagged world of async/await

Related Articles