Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/flucoma/algorithms/public/DataSetQuery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ class DataSetQuery
for (index i = from; i < from + count; i++) { mColumns.emplace(i); }
}

index numColumns() { return asSigned(mColumns.size()); }
index numColumns() const { return asSigned(mColumns.size()); }

bool hasAndConditions() { return (mAndConditions.size() > 0); }
bool hasAndConditions() const { return (mAndConditions.size() > 0); }

index maxColumn() { return mColumns.empty() ? 0 : *mColumns.rbegin(); }
index maxColumn() const { return mColumns.empty() ? 0 : *mColumns.rbegin(); }

bool addCondition(
index column, string comparison, double value, bool conjunction)
Expand Down
108 changes: 100 additions & 8 deletions include/flucoma/clients/nrt/DataSetQueryClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ namespace fluid {
namespace client {
namespace datasetquery {

enum { kName };
constexpr auto DataSetQueryParams =
defineParameters(StringParam<Fixed<true>>("name", "Name"));

constexpr auto DataSetQueryParams = defineParameters();
class DataSetQueryClient : public FluidBaseClient,
OfflineIn,
OfflineOut,
ModelObject,
public DataClient<algorithm::DataSetQuery>

class DataSetQueryClient : public FluidBaseClient, OfflineIn, OfflineOut
{
enum { kName };

public:
using string = std::string;
using DataSet = FluidDataSet<string, double, 1>;
Expand Down Expand Up @@ -50,7 +56,7 @@ class DataSetQueryClient : public FluidBaseClient, OfflineIn, OfflineOut
return DataSetQueryParams;
}

DataSetQueryClient(ParamSetViewType& p, FluidContext&) : mParams(p) {}
DataSetQueryClient(ParamSetViewType& p, FluidContext&) : mParams(p) {}

MessageResult<void> addColumn(index column)
{
Expand Down Expand Up @@ -101,7 +107,7 @@ class DataSetQueryClient : public FluidBaseClient, OfflineIn, OfflineOut


MessageResult<void> transform(InputDataSetClientRef sourceClient,
DataSetClientRef destClient)
DataSetClientRef destClient)
{
if (mAlgorithm.numColumns() <= 0) return Error("No columns");
auto srcPtr = sourceClient.get().lock();
Expand All @@ -120,7 +126,7 @@ class DataSetQueryClient : public FluidBaseClient, OfflineIn, OfflineOut

MessageResult<void> transformJoin(InputDataSetClientRef source1Client,
InputDataSetClientRef source2Client,
DataSetClientRef destClient)
DataSetClientRef destClient)
{
auto src1Ptr = source1Client.get().lock();
auto src2Ptr = source2Client.get().lock();
Expand Down Expand Up @@ -165,12 +171,98 @@ class DataSetQueryClient : public FluidBaseClient, OfflineIn, OfflineOut
makeMessage("limit", &DataSetQueryClient::limit));
}

algorithm::DataSetQuery& algorithm() { return mAlgorithm; }
};

using DSQueryRef = SharedClientRef<DataSetQueryClient>;

constexpr auto DataSetRTQueryParams = defineParameters(
DSQueryRef::makeParam("dataSetQuery", "DataSetQuery"),
InputDataSetClientRef::makeParam("sourceClient", "Source DataSet Name"),
DataSetClientRef::makeParam("destClient", "Destination DataSet Name"));

class DataSetRTQuery : public FluidBaseClient, ControlIn, ControlOut
{
enum { kDSQ, kSourceDataSet, kDestDataSet };

public:
using ParamDescType = decltype(DataSetRTQueryParams);
using ParamSetViewType = ParameterSetView<ParamDescType>;

std::reference_wrapper<ParamSetViewType> mParams;

void setParams(ParamSetViewType& p) { mParams = p; }

template <size_t N>
auto& get() const
{
return mParams.get().template get<N>();
}

static constexpr auto& getParameterDescriptors()
{
return DataSetRTQueryParams;
}

DataSetRTQuery(ParamSetViewType& p, FluidContext& c) : mParams(p)
{
controlChannelsIn(1);
controlChannelsOut({1, 1});
}

template <typename T>
void process(std::vector<FluidTensorView<T, 1>>& input,
std::vector<FluidTensorView<T, 1>>& output, FluidContext& c)
{
output[0](0) = 0; // start with error output as default

if (input[0](0) > 0)
{
auto DSQptr = get<kDSQ>().get().lock();
if (!DSQptr)
return; // c.reportError("FluidDataSetQuery RT Query: No
// FluidDataSetQuery found");
if (DSQptr->algorithm().numColumns() <= 0)
return; // c.reportError("FluidDataSetQuery RT Query: No colums

auto sourceDSptr = get<kSourceDataSet>().get().lock();
if (!sourceDSptr)
return; // c.reportError("FluidDataSetQuery RT Query: invalid Source
// FluidDataSet");

auto src = sourceDSptr->getDataSet();
if (src.size() == 0)
return; // c.reportError("FluidDataSetQuery RT Query: empty Source
// FluidDataSet");
if (src.pointSize() <= DSQptr->algorithm().maxColumn())
return; // c.reportError("FluidDataSetQuery RT Query: wrong point
// size");


auto destDSptr = get<kDestDataSet>().get().lock();
if (!destDSptr)
return; // c.reportError("FluidDataSetQuery RT Query: invalid
// Destination FluidDataSet");

index resultSize = DSQptr->algorithm().numColumns();
DataSetQueryClient::DataSet result(resultSize);
DSQptr->algorithm().process(src, mEmpty, result);
destDSptr->setDataSet(result);
output[0](0) = 1; // reaching here means success as a trigger output
}
}

index latency() const { return 0; }

private:
algorithm::DataSetQuery mAlgorithm;
DataSetQueryClient::DataSet mEmpty;
};

} // namespace datasetquery

using NRTThreadedDataSetQueryClient =
NRTThreadingAdaptor<ClientWrapper<datasetquery::DataSetQueryClient>>;
NRTThreadingAdaptor<typename datasetquery::DSQueryRef::SharedType>;
using RTDataSetQueryClient = ClientWrapper<datasetquery::DataSetRTQuery>;

} // namespace client
} // namespace fluid