Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python_Versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
1 change: 1 addition & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def sickbayTests = [
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
outputs.upToDateWhen { false }
classpath = configurations.validatesRunner

var prismBuildTask = dependsOn(':runners:prism:build')
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,13 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
24 changes: 17 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ type ElementManager struct {
}

func (em *ElementManager) addPending(v int) {
prev := em.livePending.Load()
em.livePending.Add(int64(v))
em.pendingElements.Add(v)
slog.Info("em.addPending", "delta", v, "prev", prev, "current", em.livePending.Load())
}

// LinkID represents a fully qualified input or output.
Expand Down Expand Up @@ -530,7 +532,7 @@ func (em *ElementManager) DumpStages() string {
stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n",
em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
} else {
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v livePending: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles, em.livePending.Load()))
}
sort.Strings(ids)
for _, id := range ids {
Expand Down Expand Up @@ -1091,18 +1093,25 @@ func (em *ElementManager) FailBundle(rb RunBundle) {
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}

// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals) {
stage := em.stages[rb.StageID]

slog.Info("ElementManager.ReturnResiduals start", "bundle", rb, "firstRsIndex", firstRsIndex)

stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
originalRemainingCount := len(completed.es) - firstRsIndex
stage.mu.Unlock()

stage.splitBundle(rb, firstRsIndex, em)
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
if len(unprocessedElements) > 0 {
slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements))
count := stage.AddPending(em, unprocessedElements)
if len(unprocessedElements) > originalRemainingCount {
newResiduals := unprocessedElements[originalRemainingCount:]
slog.Info("ReturnResiduals: new residuals added back", "bundle", rb, "count", len(newResiduals))
count := stage.AddPending(em, newResiduals)
em.addPending(count)
}
slog.Info("ElementManager.ReturnResiduals end", "bundle", rb, "unprocessedCount", len(unprocessedElements), "livePending", em.livePending.Load())
em.markStagesAsChanged(singleSet(rb.StageID))
}

Expand Down Expand Up @@ -2187,7 +2196,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
defer ss.mu.Unlock()

es := ss.inprogress[rb.BundleID]
slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual)
slog.Info("splitBundle start", "bundle", rb, "elem count", len(es.es), "firstResidual", firstResidual, "livePending", em.livePending.Load())

prim := es.es[:firstResidual]
res := es.es[firstResidual:]
Expand All @@ -2207,6 +2216,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa
// we don't need to increment pending count in em, since it is already pending
ss.kind.addPending(ss, em, res)
ss.inprogress[rb.BundleID] = es
slog.Info("splitBundle completed", "bundle", rb, "primaryCount", len(prim), "residualCount", len(res), "livePending", em.livePending.Load())
}

// minimumPendingTimestamp returns the minimum pending timestamp from all pending elements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,3 +684,73 @@ func TestElementManager_OnWindowExpiration(t *testing.T) {
validateSideBundles(t, singleSet("\u0004key5")) // still exist..
})
}

func TestElementManager_ReturnResidualsPendingCount(t *testing.T) {
tests := []struct {
name string
firstRsIndex int
wantFinalPending int64
}{
{
name: "ChannelSplit",
firstRsIndex: 0,
wantFinalPending: 1,
},
{
name: "SDFCheckpoint",
firstRsIndex: 1,
wantFinalPending: 2, // Incremented by 1 because the active portion (index 0) is still in progress and will be completed/decremented in PersistBundle.
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

stage := em.stages["dofn"]
info := PColInfo{
GlobalID: "generic_info",
WDec: exec.MakeWindowDecoder(coder.NewGlobalWindow()),
WEnc: exec.MakeWindowEncoder(coder.NewGlobalWindow()),
EDec: func(r io.Reader) []byte {
b, _ := io.ReadAll(r)
return b
},
}

// Initial state should have 1 pending element from impulse
if got, want := em.livePending.Load(), int64(1); got != want {
t.Fatalf("initial livePending = %v, want %v", got, want)
}

// Start a bundle
bundID, ok, _, _ := stage.startEventTimeBundle(mtime.MaxTimestamp, func() string { return "inst0" })
if !ok {
t.Fatalf("failed to start bundle")
}

// Waitgroup/livePending shouldn't change on starting a bundle (it's still pending)
if got, want := em.livePending.Load(), int64(1); got != want {
t.Fatalf("livePending after startEventTimeBundle = %v, want %v", got, want)
}

// Prepare residuals
residBytes := []byte{127, 223, 59, 100, 90, 28, 172, 9, 0, 0, 0, 1, 15, 3, 65, 66, 67} // windowed value header + ABC
residuals := Residuals{
Data: []Residual{{Element: residBytes}},
}

rb := RunBundle{StageID: "dofn", BundleID: bundID}

// Return residuals (Simulates splitting)
em.ReturnResiduals(rb, test.firstRsIndex, info, residuals)

if got, want := em.livePending.Load(), test.wantFinalPending; got != want {
t.Errorf("livePending after ReturnResiduals = %v, want %v", got, want)
}
})
}
}
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
_, err = pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
if err != nil {
slog.Warn("StopWorker failed", "worker", wk, "error", err)
}
wk.Stop()
}

Expand Down
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
// Log a heartbeat every 60 seconds
case <-ticker.C:
j.Logger.Info("pipeline is running", slog.String("job", j.String()))
j.Logger.Info("pipeline stages state", slog.String("stages", em.DumpStages()))
for envID, wk := range wks {
if wk != nil && wk.Connected() && !wk.Stopped() {
j.Logger.Info("worker status",
slog.String("workerID", wk.ID),
slog.String("envID", envID),
slog.Duration("uptime", wk.Uptime()),
slog.Any("active_bundles", wk.ActiveBundles()))
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *runner) handleReshuffle(tid string, t *pipepb.PTransform, comps *pipepb
}

// And all the sub transforms.
toRemove = append(toRemove, t.GetSubtransforms()...)
toRemove = append(toRemove, removeSubTransforms(comps, t.GetSubtransforms())...)

// Return the new components which is the transforms consumer
return prepareResult{
Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
panic(err)
}

bundleStart := time.Now()

// Progress + split loop.
previousIndex := int64(-2)
previousTotalCount := int64(-2) // Total count of all pcollection elements.
Expand Down Expand Up @@ -232,7 +234,11 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
runningFor := time.Since(bundleStart)
slog.Debug("progress report", "bundle", rb, "runningFor", runningFor, "index", index, "prevIndex", previousIndex)
if runningFor > 5*time.Minute {
slog.Warn("Bundle has been running for a long time", "bundle", rb, "runningFor", runningFor, "worker", wk.ID)
}

// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
Expand Down
27 changes: 27 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type W struct {
// These are the ID sources
inst uint64
connected, stopped atomic.Bool
StartTime time.Time
StoppedChan chan struct{} // Channel to Broadcast stopped state.

InstReqs chan *fnpb.InstructionRequest
Expand Down Expand Up @@ -292,11 +293,37 @@ func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Uptime returns how long the worker has been connected.
func (wk *W) Uptime() time.Duration {
wk.mu.Lock()
defer wk.mu.Unlock()
if wk.StartTime.IsZero() {
return 0
}
return time.Since(wk.StartTime)
}

// ActiveBundles returns a list of active bundles currently processing on this worker.
func (wk *W) ActiveBundles() []string {
wk.mu.Lock()
defer wk.mu.Unlock()
var bundles []string
for id, responder := range wk.activeInstructions {
if b, ok := responder.(*B); ok {
bundles = append(bundles, fmt.Sprintf("%s (%s)", id, b.PBDID))
}
}
return bundles
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
wk.mu.Lock()
wk.StartTime = time.Now()
wk.mu.Unlock()
done := make(chan error, 1)
go func() {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def main(argv):
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):

# Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback
# connections on dual-stack (IPv4/IPv6) systems.
address = 'localhost:{}'.format(known_args.port)
address = '0.0.0.0:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
Expand All @@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
# Ensure gRPC server successfully binds. If this fails (e.g., due to port collision),
# add_insecure_port returns 0. We raise an error to crash the subprocess immediately,
# allowing the parent process to detect it and fail fast rather than hanging.
bound_port = server.add_insecure_port(address)
if not bound_port:
raise RuntimeError(
"Failed to bind expansion service to {}".format(address))
server.add_insecure_port(address)
server.start()
_LOGGER.info('Listening for expansion requests at %d', bound_port)
_LOGGER.info('Listening for expansion requests at %d', known_args.port)

def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,11 @@ def _get_platform_for_default_sdk_container():
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = distribution('pip').version
if version.parse(pip_version) >= version.parse('19.3'):
# pip can only recognize manylinux2014_x86_64 wheels
# from version 19.3.
# See more information about manylinux at
# https://github.com/pypa/manylinux
if version.parse(pip_version) >= version.parse('20.3'):
return 'manylinux_2_28_x86_64'
elif version.parse(pip_version) >= version.parse('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'
Expand Down Expand Up @@ -795,7 +797,7 @@ def _populate_requirements_cache(
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
processes.check_call(cmd_args)

# Get list of downloaded packages and copy them to the cache
downloaded_packages = set()
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,13 @@ def test_populate_requirements_cache_uses_find_links(self):

captured_cmd_args = []

def mock_check_output(cmd_args, **kwargs):
def mock_check_call(cmd_args, **kwargs):
captured_cmd_args.extend(cmd_args)
return b''
return 0

with mock.patch(
'apache_beam.runners.portability.stager.processes.check_output',
side_effect=mock_check_output):
'apache_beam.runners.portability.stager.processes.check_call',
side_effect=mock_check_call):
stager.Stager._populate_requirements_cache(
requirements_file, requirements_cache_dir)

Expand Down
Loading
Loading