When writing a plan, it is sometimes desirable to integrate some custom functionality that is beyond the scope of what the existing Theseus operator provide. For example, suppose that you want to calculate the polar distance between two sets of polar coordinates. Such a function is not included in the standard Theseus library. However, it is relatively easy to extend Theseus by writing an Apply function that accomplishes this task.
For most purposes, using the Apply operator to extend Theseus is the way to go. Still, if there a pressing need for performance of if the nature of the operator is such that various inputs require buffering before outputting answer tuples, one may need to write a custom Theseus operator to handle this task.
Writing a custom operator in Theseus consists of the following key steps:
Once you have done all this, you're ready to begin coding.
As a running example, we will consider how one writes the Union operator. To keep our example separate from the existing operator, we will call our operator "MyUnion". In answering the points above, we find that union involves:
All operators in Theseus share a common semantics as far as how they are created, how they are initialized, and how they are cleaned-up. More specifically:
on_init() method is called. This is when you will
want to initialize stateful data structures (since they are
indexed by transaction and you will first get the handle to
the current transaction at this time). At this time, it is
also useful to obtain handles to the output streams. Since
the executor cannot know the detailed semantics of your
particular operator, it lets you decide when to generate output -
to do that, you will need to grab the operator output stream
handles during the initialization method. You should
then keep the object references stored in a stateful data structure.
This will become clearer as we move on.
on_final() method will
be called. This is a good time to do any cleanup, closing of
file, database, or network streams/connections. It is also
a convenient time to emit the EOS for your output stream.
If your operator has many inputs, there can be a variety of
cases when you might need to output the EOS. But doing
it in this method is the easiest, since you are guaranteed
that it will be called last.
It is important to emphasize that the on_init()
method is called BEFORE processing the first input tuple
and the on_final() is called AFTER processing
of the last EOS. These methods are each called exactly once
per transaction for a particular operator.
Once you have decided on your inputs, outputs, and execution semantics, you're ready to start coding. The first thing you should do is to create the basic class structure. All Theseus operators have a similar class structure because the way that operators need to be called and because of the parent Operator class that all operators must inherit from.
Creating the basic class structure involves the following:
on_init() and on_final() methods.
It also means defining two metadata methods:
getInputs() method: this method should return
an array of theseus.api.QueueDesc objects that
describe the inputs and their types (either STREAMING or VA_STREAMING).
getOutputs() method which returns
an array of output descriptors.
on_input and
on_input_EOS methods, where the input refers
to each of the operator inputs. During execution, these methods are
called like event listener methods -- when an input (or its EOS) arrives,
these methods are called appropriately.
As an example, here are the methods we need to implement to start off our MyUnion operator:
OpMyunion()
getInputs()
getOutputs()
on_init()
on_final()
on_lhs()
on_lhs_EOS()
on_rhs()
on_rhs_EOS()
Notice that our list includes an OpMyunion() constructor.
For consistency, all operators should be named Opname, where
name is the all lowercased form of the operator name. This consistency
is required in order to distinguish user operators from other classes
and system operators.
At this point, it is also good to create the State data structure. For consistency, all operators that require state (and nearly all will) should define an inner, private class that inherits from theseus.api.OperatorState. Structures that do not do this cannot be indexed by the executor at runtime.
Recall that in our MyUnion example, we will need to use the java.util.HashSet class and that we also need to keep the handle to the output stream. So, these two members will be part of our inner OperatorState class.
To illustrate the details, consider the following MyUnion class skeleton below:
import theseus.api.*;
import java.util.HashSet;
public class OpMyunion
extends Operator
{
private class State
implements OperatorState
{
OutputArg out;
HashSet set;
}
public OpMyunion()
{
}
public QueueDesc[] getInputs()
{
return new QueueDesc[] {
new QueueDesc("lhs", QueueType.STREAMING),
new QueueDesc("rhs", QueueType.STREAMING)
};
}
public QueueDesc[] getOutputs ()
{
return new QueueDesc[] {
new QueueDesc("out", QueueType.STREAMING)
};
}
public void on_init(Transaction a_tx, InputArgSet a_in, OutputArgSet a_out)
throws OperatorException
{
}
/* LHS */
public void on_lhs(Transaction a_tx, InputArg a_in)
throws OperatorException
{
}
public void on_lhs_EOS(Transaction a_tx)
throws OperatorException
{
}
/* RHS */
public void on_rhs(Transaction a_tx, InputArg a_in)
throws OperatorException
{
}
public void on_rhs_EOS(Transaction a_tx)
throws OperatorException
{
}
/* FINAL */
public void on_final(Transaction a_tx)
throws OperatorException
{
}
}
|
Make sure that you notice the following:
import theseus.api.*; declaration. Without this,
the class won't be able to be compiled.
State object and how it
inherits from theseus.api.OperatorState.
on_input() method signatures: each
takes a transaction handle, an input argument, and each method may
potentially throw an theseus.api.OperatorException.
on_init() method takes input and output
argument descriptors; the output descriptor will eventually be
queried to obtain the output handle that will be stored in the
State object.
To keep proper state for your operator, it is important to create it and access it properly. Since multiple threads may be executing your operator code concurrently, it is of utmost importance that you take care when managing state yourself. This generally means the following:
on_init() method. Notice that a single state
class can be used as a parent class for all state objects.
Example of how to manage state are shown in the next section.
The details will largely depend on the semantics of your operator. There is a case to be made for "operator patterns" (physical semantic patterns that seem to crop up for various types of operators), but that is another topic for another time.
Still, here are a few words of advice about operator writing that are semantic-independent:
on_input methods to check for
sufficient input (i.e., have you recieved all of the input you
need to begin processing) and then call business logic methods
as appropriate.
on_final() method to solve the
sometimes-complicated problem of knowing when to emit the EOS.
The complete MyUnion operator is shown below. The boldfaced text indicates parts not shown in the skeleton (above) that are worth noting.
import theseus.api.*;
import java.util.HashSet;
public class OpMyunion
extends Operator
{
private class State
implements OperatorState
{
OutputArg out;
HashSet set;
}
public OpMyunion()
{
}
public QueueDesc[] getInputs()
{
return new QueueDesc[] {
new QueueDesc("lhs", QueueType.STREAMING),
new QueueDesc("rhs", QueueType.STREAMING)
};
}
public QueueDesc[] getOutputs ()
{
return new QueueDesc[]
{
new QueueDesc("out", QueueType.STREAMING)
};
}
public void on_init(Transaction a_tx, InputArgSet a_in, OutputArgSet a_out)
throws OperatorException
{
State s = new State();
s.set = new HashSet();
s.out = a_out.getArg("out");
Lock lock = lockState(a_tx);
setState(a_tx, lock, s);
unlockState(a_tx, lock);
}
private void doUnion(Transaction a_tx, InputArg a_in)
{
Tuple t = (Tuple)a_in.getObject();
String valStr = t.toString();
Lock lock = lockState(a_tx);
State s = (State)getLockedState(a_tx, lock);
if (!s.set.contains(valStr)) {
s.set.add(valStr);
s.out.putObject(a_tx, t);
}
unlockState(a_tx, lock);
}
/* LHS */
public void on_lhs(Transaction a_tx, InputArg a_in)
throws OperatorException
{
doUnion(a_tx, a_in);
}
public void on_lhs_EOS(Transaction a_tx)
throws OperatorException
{
}
/* RHS */
public void on_rhs(Transaction a_tx, InputArg a_in)
throws OperatorException
{
doUnion(a_tx, a_in);
}
public void on_rhs_EOS(Transaction a_tx)
throws OperatorException
{
}
/* FINAL */
public void on_final(Transaction a_tx)
throws OperatorException
{
Lock lock = lockState(a_tx);
State s = (State)getLockedState(a_tx, lock);
s.out.putEOS(a_tx);
unlockState(a_tx, lock);
}
}
|
On occasion, you may need to coordinate 2 or more operators during the span of all transactions or a particular transaction. The Theseus API supports mechanisms that allow you to safely access global or specific transaction state. Global transaction state is henceforth referred to as "plan state".
To understand how to do this, let's consider an example. Suppose you want to pool connections to a remote database, as a means to improve performance. That is, you want to eliminate the need for various plan operators to keep creating and tearing down expensive database connections every time they process a single tuple. More specifically, you would like to:
The TheseusAPI allows operators to get access to plan state through
the inherited method getPlanState(). It's up to you to
handle the synchronization needs you have, but assuming you have
done this, getPlanState() will allow you access to
a GenericStateMananger object, on which you can call
methods like getItem(), putItem(), removeItem(), which
allow you associate 2 objects (key and value) and then operate
on the value given the key.
In a similar manner, getTransactionState allows
you to access a different GenericStateManager, this
one different per transaction.
To return to the example: one way to address the task at hand is to create at least 3 operators:
OpCustom_db_init: creates the connections pool object (call
it ConnPool) and make it a plan-wide state object (via
getPlanState().putItem()).
OpCustom_db_update: uses ConnPool
(obtained via getPlanState().getItem()) to write data to
the database.
OpCustom_db_close: uses ConnPool
(obtained via getPlanState().getItem()) to close all
pool connections to the database.
Note: the ConnPool object is responsible for
synchronizing access to the pool.
To compile your operator, you should:
To deploy your operator, you should:
To test your operator: