Python Data Processing Extensions
A Python import extension is required to implement a DataProcessingPlugin class:
class DataProcessingPlugin:
def __init__(self, system, user, password):
def SetOptions(self, options):
def GetFunctions(self):
def GetFunctionParameters(self, function):
def GetFunctionMetadata(self, function):
def ExecuteFunction(self, function, arguments):
def Shutdown(self):
Constructor
def __init__(self, system, user, password):
public class MyDotNetImportExtensions : ACustomImportExtension
{
public MyDotNetImportExtensions(string System, string UserId, string Password)
: base(System, UserId, Password)
{
}
}
The constructor is provided with the system, user and password. The user and password are credentials mapped in the OmniFi Administration application.
This method is called once for each service header. Therefor it is recommended to do any slow startup tasks like connection to another system here
SetOptions
If the extension has any custom options, they are read here. In the example below the extension has the option “ExtensionOption”.
def SetOptions(self, options):
def SetOptions(self, options):
argReader = ArgumentReader(options)
extensionOption = argReader.TryGetArgumentValue("ExtensionOption")
public override void SetOptionsImpl(List<Plugin_Argument> Options)
{
var unsupportedOptions = Options
.Where((x) => x.ParameterName != "OutputFile")
.Select((x) => x.ParameterName)
.ToArray();
if (unsupportedOptions.Length > 0)
throw new ApplicationException(
"Options not supported: "
+ String.Join(", ", unsupportedOptions));
string outputFile = Options
.FirstOrDefault((x) => x.ParameterName == "OutputFile")
?.StringValue;
if (!String.IsNullOrEmpty(outputFile))
this._file = outputFile;
var directory = Path.GetDirectoryName(this._file);
if (!Directory.Exists(directory))
Directory.CreateDirectory(directory);
}
Reading extension option “ExtensionOption”
GetFunctions
def GetFunctions(self):
def GetFunctions(self):
self.logger.log_debug("GetFunctions(): Enter")
functions = list()
functions.append("FunctionOne")
functions.append("FunctionTwo")
functions.append("FunctionThree")
#Return the supported functions
return (functions)
public override List<string> GetFunctionsImpl()
{
return (new List<string> { "DumpToFile" });
}
GetFunctionParameters
def GetFunctionParameters(self, function):
Defines function parameters for the specified function. Returns a list of Parameter.
def GetFunctionParameters(self, function):
self.logger.log_debug("GetFunctionParameters(): Enter")
parameters = list()
parameters.append(Parameter("Id", "Id", DataType.string, None ,None, True, "Id"))
self.logger.log_debug("GetFunctionParameters(): Returning", len(parameters), "parameters")
return parameters;
public override List<Plugin_Parameter> GetFunctionParametersImpl(string Function)
{
if (Function == null)
{
throw new ArgumentNullException(nameof(Function));
}
if (Function != "DumpToFile")
throw new ApplicationException($"Function '{Function}' not supported");
List<Plugin_Parameter> parameters = new List<Plugin_Parameter>
{
new Plugin_Parameter {
Type = Plugin_DataType.INTEGER,
Name = "intParam_0",
Description = "Integer Parameter 0",
Mandatory = true,
Comment = "Provide an integer number"
},
new Plugin_Parameter {
Type = Plugin_DataType.STRING,
Name = "strParam_0",
Description = "String Parameter 0",
Mandatory = false,
Comment = "Provide string, or don't"
},
new Plugin_Parameter {
Type = Plugin_DataType.DOUBLE,
Name = "dblParam_0",
Description = "Double Parameter 0",
Mandatory = true,
Comment = "Provide a double-precision number"
},
};
return (parameters);
}
GetFunctionMetadata
def GetFunctionMetadata(self, function):
.
def GetFunctionMetadata(self, function):
self.logger.log_debug("GetFunctionMetadata(): Enter")
metadata = list()
metadata.append(Metadata("ResultOne", "ResultOne", DataType.string))
return (metadata);
ExecuteFunction
def ExecuteFunction(self, function, arguments):
Executes the specified function with the supplied arguments. Arguments are supplied as a list of Argument. The type of arguments is defined by the “GetFunctionParameters” method.
The method should return a list of ArrayArguments signaling a successful execution.
Any exception that is not handled by the method will fail the row and the error message is presented to the user.
def ExecuteFunction(self, function, arguments):
self.logger.log_debug("ExecuteFunction(): Enter")
#Read arguments with the help of ArgumentReader
self.logger.log_trace("ExecuteFunction(): reading arguments ...")
argReader = ArgumentReader(arguments)
result = list()
#Handled the supported function
if (function == 'FunctionOne'):
#Handle FunctionOne
result.append(ArrayArgument("ResultOne", DataType.string, ["1", "One"]))
elif (function == 'FunctionTwo'):
#Handle FunctionTwo
result.append(ArrayArgument("ResultTwo", DataType.string, ["2", "Two"]))
elif (function == 'FunctionThree'):
#Handle FunctionThee
result.append(ArrayArgument("ResultThree", DataType.string, ["3", "Three"]))
return result
public override List<Plugin_ArrayArgument> ExecuteFunctionImpl(
string Function,
List<Plugin_Argument> Parameters)
{
Logger.Debug("ExecuteFunctionImpl(): Enter");
if (Function == null)
{
throw new ArgumentNullException(nameof(Function));
}
if (Function != "DumpToFile")
{
throw new ApplicationException($"Function '{Function}' not supported");
}
var paramLookup = Parameters.ToDictionary(
(x) => x.ParameterName
);
Plugin_Argument intValue, dblValue, strValue;
Logger.Debug("ExecuteFunctionImpl(): Checking Integer Parameter 0");
if (!paramLookup.TryGetValue("intParam_0", out intValue) || intValue.IntegerValue == null)
throw new ApplicationException("Mandatory argument 'Integer Parameter 0' not supplied");
int iParam = (int)intValue.IntegerValue;
Logger.Debug("ExecuteFunctionImpl(): Checking Double Parameter 0");
if (!paramLookup.TryGetValue("dblParam_0", out dblValue) || dblValue.DoubleValue == null)
throw new ApplicationException("Mandatory argument 'Double Parameter 0' not supplied");
double dParam = (double)dblValue.DoubleValue;
Logger.Debug("ExecuteFunctionImpl(): Obtaining String Parameter 0");
string sParam;
if (!paramLookup.TryGetValue("strParam_0", out strValue) || strValue.StringValue == null)
{
Logger.Warning(
"ExecuteFunctionImpl(): String Parameter 0 not supplied\nreverting to default.");
sParam = String.Empty;
}
else
{
sParam = strValue.StringValue;
}
string text = $"L{this._line};{iParam};{dParam};{sParam}";
File.AppendAllLines(this._file, new string[] { text });
List<Plugin_ArrayArgument> result = new List<Plugin_ArrayArgument>
{
new Plugin_ArrayArgument ("number", new int?[] { this._line }),
new Plugin_ArrayArgument ("formatted_text", new string[] { text }),
};
this._line++;
Logger.Debug("ExecuteFunctionImpl(): Return");
return (result);
ExecuteFunction for extension that supports three functions and example implementation of ExecuteFunctionImpl.
Shutdown
def Shutdown(self):
The shutdown method is called just before the extension is closed, allowing you to clean up any cached state.
Note that the plugin may be shut down not only because the import is completed, but also because the user has configured a batch size, and the current batch has been filled.
Extension life-time
Data Processing extensions are created by the service header and disposed with the next service header or when the batch size is reached. Long running operations, like connecting to a database, are best performed once in the constructor and reused throughout the execution. For optimal performance you can accumulate execution instructions throughout the lifetime and batch-commit in the
Shutdown()
method.However, since the lifetime of the plugin is not guaranteed you cannot assume e.g. a running average is correct, since a header may be split up into several batches with restarts of the extension in between.
Updated about 2 years ago