Writing a Custom Theseus Operator

Contents

  1. Shouldn't I just write an Apply function instead?
  2. Overview of the process
  3. An example: the MyUnion operator
  4. Creating the basic class structure
  5. State management
  6. Filling in the details
  7. Access to plan and transaction state
  8. Compiling, deploying, and testing

1. Shouldn't I just write an Apply function instead?

When writing a plan, it is sometimes desirable to integrate some custom functionality that is beyond the scope of what the existing Theseus operator provide. For example, suppose that you want to calculate the polar distance between two sets of polar coordinates. Such a function is not included in the standard Theseus library. However, it is relatively easy to extend Theseus by writing an Apply function that accomplishes this task.

For most purposes, using the Apply operator to extend Theseus is the way to go. Still, if there a pressing need for performance of if the nature of the operator is such that various inputs require buffering before outputting answer tuples, one may need to write a custom Theseus operator to handle this task.

2. Overview of the process

Writing a custom operator in Theseus consists of the following key steps:

Once you have done all this, you're ready to begin coding.

3. An example: the MyUnion operator

As a running example, we will consider how one writes the Union operator. To keep our example separate from the existing operator, we will call our operator "MyUnion". In answering the points above, we find that union involves:

4. Operator lifecycle

All operators in Theseus share a common semantics as far as how they are created, how they are initialized, and how they are cleaned-up. More specifically:

It is important to emphasize that the on_init() method is called BEFORE processing the first input tuple and the on_final() is called AFTER processing of the last EOS. These methods are each called exactly once per transaction for a particular operator.

5. Creating the basic class structure

Once you have decided on your inputs, outputs, and execution semantics, you're ready to start coding. The first thing you should do is to create the basic class structure. All Theseus operators have a similar class structure because the way that operators need to be called and because of the parent Operator class that all operators must inherit from.

Creating the basic class structure involves the following:

As an example, here are the methods we need to implement to start off our MyUnion operator:

Notice that our list includes an OpMyunion() constructor. For consistency, all operators should be named Opname, where name is the all lowercased form of the operator name. This consistency is required in order to distinguish user operators from other classes and system operators.

At this point, it is also good to create the State data structure. For consistency, all operators that require state (and nearly all will) should define an inner, private class that inherits from theseus.api.OperatorState. Structures that do not do this cannot be indexed by the executor at runtime.

Recall that in our MyUnion example, we will need to use the java.util.HashSet class and that we also need to keep the handle to the output stream. So, these two members will be part of our inner OperatorState class.

To illustrate the details, consider the following MyUnion class skeleton below:

import theseus.api.*;

import java.util.HashSet;

public class OpMyunion
  extends Operator
{
  private class State
    implements OperatorState
  {
    OutputArg out;
    HashSet set;
  }

  public OpMyunion()
  {
  }

  public QueueDesc[] getInputs()
  {
    return new QueueDesc[] {
      new QueueDesc("lhs", QueueType.STREAMING),
      new QueueDesc("rhs", QueueType.STREAMING)
    };
  }

  public QueueDesc[] getOutputs ()
  {
    return new QueueDesc[] {
      new QueueDesc("out", QueueType.STREAMING)
    };
  }


  public void on_init(Transaction a_tx, InputArgSet a_in, OutputArgSet a_out)
    throws OperatorException
  {
  }


  /* LHS */

  public void on_lhs(Transaction a_tx, InputArg a_in)
    throws OperatorException
  {
  }

  public void on_lhs_EOS(Transaction a_tx)
    throws OperatorException
  {
  }


  /* RHS */

  public void on_rhs(Transaction a_tx, InputArg a_in)
    throws OperatorException
  {
  }

  public void on_rhs_EOS(Transaction a_tx)
    throws OperatorException
  {
  }


  /* FINAL */

  public void on_final(Transaction a_tx)
    throws OperatorException
  {
  }
}

Make sure that you notice the following:

6. State management

To keep proper state for your operator, it is important to create it and access it properly. Since multiple threads may be executing your operator code concurrently, it is of utmost importance that you take care when managing state yourself. This generally means the following:

Example of how to manage state are shown in the next section.

7. Filling in the details

The details will largely depend on the semantics of your operator. There is a case to be made for "operator patterns" (physical semantic patterns that seem to crop up for various types of operators), but that is another topic for another time.

Still, here are a few words of advice about operator writing that are semantic-independent:

The complete MyUnion operator is shown below. The boldfaced text indicates parts not shown in the skeleton (above) that are worth noting.


import theseus.api.*;

import java.util.HashSet;

public class OpMyunion 
  extends Operator
{
  private class State
    implements OperatorState
  {
    OutputArg out;
    HashSet set;
  }

  public OpMyunion() 
  { 
  }

  public QueueDesc[] getInputs()
  {
    return new QueueDesc[] {
      new QueueDesc("lhs", QueueType.STREAMING),
      new QueueDesc("rhs", QueueType.STREAMING)
    };
  }

  public QueueDesc[] getOutputs ()
  {
    return new QueueDesc[]
    {
      new QueueDesc("out", QueueType.STREAMING)
    };
  }


  public void on_init(Transaction a_tx, InputArgSet a_in, OutputArgSet a_out)
    throws OperatorException
  {
    State s = new State();
    s.set = new HashSet();
    s.out = a_out.getArg("out");
    Lock lock = lockState(a_tx);
    setState(a_tx, lock, s);
    unlockState(a_tx, lock);
  }

  private void doUnion(Transaction a_tx, InputArg a_in)
  {
    Tuple t = (Tuple)a_in.getObject();     
    String valStr = t.toString();
    Lock lock = lockState(a_tx);
    State s = (State)getLockedState(a_tx, lock);
    if (!s.set.contains(valStr)) {
      s.set.add(valStr);
      s.out.putObject(a_tx, t);
    } 
    unlockState(a_tx, lock);
  }

  /* LHS */

  public void on_lhs(Transaction a_tx, InputArg a_in)
    throws OperatorException
  {
    doUnion(a_tx, a_in);
  }

  public void on_lhs_EOS(Transaction a_tx)
    throws OperatorException
  {
  }

  /* RHS */

  public void on_rhs(Transaction a_tx, InputArg a_in)
    throws OperatorException
  {
    doUnion(a_tx, a_in);
  }

  public void on_rhs_EOS(Transaction a_tx)
    throws OperatorException
  {
  }


  /* FINAL */

  public void on_final(Transaction a_tx)
    throws OperatorException
  {
    Lock lock = lockState(a_tx);
    State s = (State)getLockedState(a_tx, lock);
    s.out.putEOS(a_tx);
    unlockState(a_tx, lock);
  }

}

8. Access to plan and transaction state

On occasion, you may need to coordinate 2 or more operators during the span of all transactions or a particular transaction. The Theseus API supports mechanisms that allow you to safely access global or specific transaction state. Global transaction state is henceforth referred to as "plan state".

To understand how to do this, let's consider an example. Suppose you want to pool connections to a remote database, as a means to improve performance. That is, you want to eliminate the need for various plan operators to keep creating and tearing down expensive database connections every time they process a single tuple. More specifically, you would like to:

The TheseusAPI allows operators to get access to plan state through the inherited method getPlanState(). It's up to you to handle the synchronization needs you have, but assuming you have done this, getPlanState() will allow you access to a GenericStateMananger object, on which you can call methods like getItem(), putItem(), removeItem(), which allow you associate 2 objects (key and value) and then operate on the value given the key.

In a similar manner, getTransactionState allows you to access a different GenericStateManager, this one different per transaction.

To return to the example: one way to address the task at hand is to create at least 3 operators:

Note: the ConnPool object is responsible for synchronizing access to the pool.

9. Compiling, deploying, and testing

To compile your operator, you should:

To deploy your operator, you should:

To test your operator: