Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.filters.StreamDestinationFilterService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
Expand All @@ -122,6 +123,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -163,6 +165,7 @@ public class StreamResource extends RestResource {
private final MessageFactory messageFactory;
private final StreamService streamService;
private final StreamRuleService streamRuleService;
private final StreamDestinationFilterService streamDestinationFilterService;
private final StreamRouterEngine.Factory streamRouterEngineFactory;
private final IndexSetRegistry indexSetRegistry;
private final RecentActivityService recentActivityService;
Expand All @@ -179,6 +182,7 @@ public class StreamResource extends RestResource {
public StreamResource(StreamService streamService,
PaginatedStreamService paginatedStreamService,
StreamRuleService streamRuleService,
StreamDestinationFilterService streamDestinationFilterService,
StreamRouterEngine.Factory streamRouterEngineFactory,
IndexSetRegistry indexSetRegistry,
RecentActivityService recentActivityService,
Expand All @@ -189,6 +193,7 @@ public StreamResource(StreamService streamService,
EntitySharesService entitySharesService) {
this.streamService = streamService;
this.streamRuleService = streamRuleService;
this.streamDestinationFilterService = streamDestinationFilterService;
this.streamRouterEngineFactory = streamRouterEngineFactory;
this.indexSetRegistry = indexSetRegistry;
this.paginatedStreamService = paginatedStreamService;
Expand Down Expand Up @@ -661,6 +666,8 @@ public List<PipelineCompactSource> getConnectedPipelines(@Parameter(name = "stre

public record GetConnectedPipelinesRequest(List<String> streamIds) {}

public record GetDestinationFilterRuleCountsRequest(List<String> streamIds) {}

@POST
@Path("/pipelines")
@Operation(summary = "Get pipelines associated with specified streams")
Expand Down Expand Up @@ -694,6 +701,31 @@ public Map<String, List<PipelineCompactSource>> getConnectedPipelinesForStreams(
}));
}

@POST
@Path("/destinations/filters/count")
@Operation(summary = "Get destination filter rule counts associated with specified streams")
@NoAuditEvent("No data is changed.")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Long> getDestinationFilterRuleCountsForStreams(@Parameter(name = "streamIds", required = true) GetDestinationFilterRuleCountsRequest request) {
final var streamIds = request.streamIds.stream()
.filter(streamId -> {
if (!isPermitted(RestPermissions.STREAMS_READ, streamId)) {
throw new ForbiddenException("Not allowed to read configuration for stream with id: " + streamId);
}
return true;
})
.collect(Collectors.toSet());
final var countsByStreamId = streamDestinationFilterService.countByStreamIds(
streamIds,
dtoId -> isPermitted(RestPermissions.STREAM_DESTINATION_FILTERS_READ, dtoId)
);

final var response = new LinkedHashMap<String, Long>();
request.streamIds.forEach(streamId -> response.put(streamId, countsByStreamId.getOrDefault(streamId, 0L)));

return response;
}

@PUT
@Path("/indexSet/{indexSetId}")
@Timed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.Projections;
import jakarta.inject.Inject;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.PaginatedList;
Expand All @@ -36,14 +38,18 @@
import org.graylog2.streams.events.StreamDeletedEvent;
import org.mongojack.Id;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.in;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.graylog2.database.utils.MongoUtils.idEq;
import static org.graylog2.database.utils.MongoUtils.insertedId;
Expand All @@ -57,6 +63,7 @@

public class StreamDestinationFilterService {
public static final String COLLECTION = "stream_destination_filters";
private static final String FIELD_ID = "_id";

private static final ImmutableMap<String, SearchQueryField> SEARCH_FIELD_MAPPING = ImmutableMap.<String, SearchQueryField>builder()
.put(FIELD_TITLE, SearchQueryField.create(FIELD_TITLE))
Expand Down Expand Up @@ -132,6 +139,26 @@ public Optional<StreamDestinationFilterRuleDTO> findByIdForStream(String streamI
return utils.getById(id);
}

public Map<String, Long> countByStreamIds(Collection<String> streamIds, Predicate<String> permissionSelector) {
if (streamIds.isEmpty()) {
return Map.of();
}

final Map<String, Long> countsByStreamId = new HashMap<>();
collection.find(in(FIELD_STREAM_ID, streamIds), Document.class)
.projection(Projections.include(FIELD_ID, FIELD_STREAM_ID))
.forEach(document -> {
final var id = document.getObjectId(FIELD_ID);
final var streamId = document.getString(FIELD_STREAM_ID);

if (id != null && streamId != null && permissionSelector.test(id.toHexString())) {
countsByStreamId.merge(streamId, 1L, Long::sum);
}
});

return countsByStreamId;
}

public StreamDestinationFilterRuleDTO createForStream(String streamId, StreamDestinationFilterRuleDTO dto) {
if (!isBlank(dto.id())) {
throw new IllegalArgumentException("id must be blank");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ void findPaginatedForStreamAndTargetWithQuery() {
assertThat(result.delegate().get(0).status()).isEqualTo(StreamDestinationFilterRuleDTO.Status.DISABLED);
}

@Test
@MongoDBFixtures("StreamDestinationFilterServiceTest-2024-07-01-1.json")
void countByStreamIds() {
final var countByStreamIds = service.countByStreamIds(List.of("54e3deadbeefdeadbeef1000", "54e3deadbeefdeadbeef2000"), id -> !"54e3deadbeefdeadbeef0001".equals(id));

assertThat(countByStreamIds)
.containsEntry("54e3deadbeefdeadbeef1000", 2L)
.containsEntry("54e3deadbeefdeadbeef2000", 1L);
}

@Test
@MongoDBFixtures("StreamDestinationFilterServiceTest-2024-07-01-1.json")
void findByIdForStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import StreamRulesCell from './cells/StreamRulesCell';
import PipelinesCell from './cells/PipelinesCell';
import OutputsCell from './cells/OutputsCell';
import ArchivingsCell from './cells/ArchivingsCell';
import DestinationFilterRulesCell from './cells/DestinationFilterRulesCell';

const getStreamDataLakeTableElements = PluginStore.exports('dataLake')?.[0]?.getStreamDataLakeTableElements;
const pipelineRenderer = {
Expand Down Expand Up @@ -72,6 +73,10 @@ const customColumnRenderers = (
renderCell: (_outputs: Output[], stream) => <OutputsCell stream={stream} />,
staticWidth: 'matchHeader' as const,
},
destination_filters: {
renderCell: (_destinationFilters: string, stream) => <DestinationFilterRulesCell stream={stream} />,
staticWidth: 'matchHeader' as const,
},
archiving: {
renderCell: (_archiving: boolean, stream) => <ArchivingsCell stream={stream} indexSets={indexSets} />,
staticWidth: 'matchHeader' as const,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,39 @@ const getStreamTableElements = (
defaultDisplayedAttributes: [
'title',
'index_set_title',
'archiving',
...(streamDataLakeTableElements?.attributeName ? [streamDataLakeTableElements.attributeName] : []),
'rules',
...(isPipelineColumnPermitted ? ['pipelines'] : []),
...(pluggableAttributes?.attributeNames || []),
'outputs',
'throughput',
'archiving',
...(streamDataLakeTableElements?.attributeName ? [streamDataLakeTableElements.attributeName] : []),
'destination_filters',
...(pluggableAttributes?.attributeNames || []),
'disabled',
'throughput',
],
defaultColumnOrder: [
'title',
'index_set_title',
'archiving',
...(streamDataLakeTableElements?.attributeName ? [streamDataLakeTableElements.attributeName] : []),
'rules',
...(isPipelineColumnPermitted ? ['pipelines'] : []),
...(pluggableAttributes?.attributeNames || []),
'outputs',
'throughput',
'archiving',
...(streamDataLakeTableElements?.attributeName ? [streamDataLakeTableElements.attributeName] : []),
'destination_filters',
...(pluggableAttributes?.attributeNames || []),
'disabled',
'throughput',
'created_at',
],
};

const additionalAttributes: Array<Attribute> = [
{ id: 'index_set_title', title: 'Index Set', sortable: true, permissions: ['indexsets:read'] },
{ id: 'throughput', title: 'Throughput' },
{ id: 'rules', title: 'Rules' },
{ id: 'rules', title: 'Stream Rules' },
...(isPipelineColumnPermitted ? [{ id: 'pipelines', title: 'Pipelines' }] : []),
{ id: 'outputs', title: 'Outputs' },
{ id: 'destination_filters', title: 'Filter Rules' },
{ id: 'archiving', title: 'Archiving' },
...(streamDataLakeTableElements?.attributes || []),
...(pluggableAttributes?.attributes || []),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';

import type { Stream } from 'stores/streams/StreamsStore';
import { Button } from 'components/bootstrap';
import { LinkContainer } from 'components/common/router';
import Routes from 'routing/Routes';
import { IfPermitted } from 'components/common';

type Props = {
stream: Stream;
};

const ExpandedDestinationFilterRulesActions = ({ stream }: Props) => (
<IfPermitted permissions={[`streams:edit:${stream.id}`]}>
<LinkContainer to={`${Routes.stream_view(stream.id)}?segment=destinations`}>
<Button bsStyle="link" bsSize="xsmall" disabled={stream.is_default || !stream.is_editable}>
Manage Filter Rules
</Button>
</LinkContainer>
</IfPermitted>
);

export default ExpandedDestinationFilterRulesActions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';
import { useCallback, useState } from 'react';
import styled, { css } from 'styled-components';
import * as Immutable from 'immutable';

import type { Stream } from 'stores/streams/StreamsStore';
import { DEFAULT_PAGINATION } from 'stores/PaginationTypes';
import type { StreamOutputFilterRule } from 'components/streams/StreamDetails/output-filter/Types';
import { DataTable, NoSearchResult, PaginatedList, Spinner, Text, Pluralize } from 'components/common';
import { DEFAULT_PAGE_SIZES } from 'hooks/usePaginationQueryParameter';
import useStreamOutputFilters from 'components/streams/hooks/useStreamOutputFilters';
import FilterStatusCell from 'components/streams/StreamDetails/output-filter/FilterStatusCell';

const TABLE_HEADERS = ['Title', 'Destination', 'Status'];

const StyledText = styled(Text)(
({ theme }) => css`
color: ${theme.colors.gray[50]};
`,
);

const destinationTitle = (destinationType: string) => {
if (destinationType === 'indexer') {
return 'Index Set';
}

if (destinationType === 'data-lake') {
return 'Data Lake';
}

return 'Output';
};

const _headerCellFormatter = (header: string) => <th>{header}</th>;

const filterRuleItem = (filter: StreamOutputFilterRule) => (
<tr key={filter.id}>
<td>
{filter.title}
<StyledText>{filter.description}</StyledText>
</td>
<td>{destinationTitle(filter.destination_type)}</td>
<td>
<FilterStatusCell filterOutputRule={filter} />
</td>
</tr>
);

type Props = {
stream: Stream;
};

const ExpandedDestinationFilterRulesSection = ({ stream }: Props) => {
const [pagination, setPagination] = useState(DEFAULT_PAGINATION);
const { data: paginatedFilters, isLoading } = useStreamOutputFilters(stream.id, undefined, pagination);

const onPaginationChange = useCallback(
(newPage: number, newPerPage: number) =>
setPagination((currentPagination) => ({
...currentPagination,
page: newPage,
perPage: newPerPage,
})),
[],
);

if (isLoading && !paginatedFilters) {
return <Spinner />;
}

const filters = paginatedFilters?.list ?? Immutable.List<StreamOutputFilterRule>();
const total = paginatedFilters?.pagination?.total ?? 0;

return (
<>
<p>
Showing {total} configured filter <Pluralize value={total} singular="rule" plural="rules" /> across all
destinations.
</p>
<PaginatedList
totalItems={total}
pageSize={DEFAULT_PAGE_SIZES[0]}
onChange={onPaginationChange}
useQueryParameter={false}
showPageSizeSelect={false}>
<DataTable
id="stream-filter-rule-list"
className="striped"
rowClassName="no-bm"
headers={TABLE_HEADERS}
headerCellFormatter={_headerCellFormatter}
sortByKey="title"
noDataText={<NoSearchResult>No filter rules have been found.</NoSearchResult>}
rows={filters.toJS()}
dataRowFormatter={filterRuleItem}
/>
</PaginatedList>
</>
);
};

export default ExpandedDestinationFilterRulesSection;
Loading
Loading