Fix graceful shutdown "broken pipe" error (#527)
diff --git a/tfexec/cmd.go b/tfexec/cmd.go
index c8b18ff..cd5a7e2 100644
--- a/tfexec/cmd.go
+++ b/tfexec/cmd.go
@@ -252,20 +252,30 @@
return io.MultiWriter(compact...)
}
-func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
- // ReadBytes will block until bytes are read, which can cause a delay in
- // returning even if the command's context has been canceled. Use a separate
- // goroutine to prompt ReadBytes to return on cancel
- closeCtx, closeCancel := context.WithCancel(ctx)
- defer closeCancel()
- go func() {
- select {
- case <-ctx.Done():
- r.Close()
- case <-closeCtx.Done():
- return
- }
- }()
+func (tf *Terraform) writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
+ // ReadBytes will block until all bytes are read, which can cause a delay in
+ // returning even if the command's context has been canceled. When the
+ // context is canceled, Terraform receives an interrupt signal and will exit
+ // after a short while. Once the process has exited, the stdio pipes will
+ // close, allowing this function to return.
+
+ if tf.enableLegacyPipeClosing {
+ // Rather than wait for the stdio pipes to close naturally, we can close
+ // them ourselves when the command's context is canceled, causing the
+ // process to exit immediately. This works around a bug in Terraform
+ // < v1.1 that would otherwise leave the process (and this function)
+ // hanging after the context is canceled.
+ closeCtx, closeCancel := context.WithCancel(ctx)
+ defer closeCancel()
+ go func() {
+ select {
+ case <-ctx.Done():
+ r.Close()
+ case <-closeCtx.Done():
+ return
+ }
+ }()
+ }
buf := bufio.NewReader(r)
for {
diff --git a/tfexec/cmd_default.go b/tfexec/cmd_default.go
index 3af11c8..7a5aa81 100644
--- a/tfexec/cmd_default.go
+++ b/tfexec/cmd_default.go
@@ -59,13 +59,13 @@
wg.Add(1)
go func() {
defer wg.Done()
- errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
+ errStdout = tf.writeOutput(ctx, stdoutPipe, stdoutWriter)
}()
wg.Add(1)
go func() {
defer wg.Done()
- errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
+ errStderr = tf.writeOutput(ctx, stderrPipe, stderrWriter)
}()
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
diff --git a/tfexec/cmd_linux.go b/tfexec/cmd_linux.go
index 0565372..43e44a5 100644
--- a/tfexec/cmd_linux.go
+++ b/tfexec/cmd_linux.go
@@ -64,13 +64,13 @@
wg.Add(1)
go func() {
defer wg.Done()
- errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
+ errStdout = tf.writeOutput(ctx, stdoutPipe, stdoutWriter)
}()
wg.Add(1)
go func() {
defer wg.Done()
- errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
+ errStderr = tf.writeOutput(ctx, stderrPipe, stderrWriter)
}()
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
diff --git a/tfexec/internal/e2etest/errors_test.go b/tfexec/internal/e2etest/errors_test.go
index c99aad9..74b695f 100644
--- a/tfexec/internal/e2etest/errors_test.go
+++ b/tfexec/internal/e2etest/errors_test.go
@@ -23,6 +23,8 @@
var (
protocol5MinVersion = version.Must(version.NewVersion("0.12.0"))
+
+ gracefulShutdownMinVersion = version.Must(version.NewVersion("1.1.0"))
)
func TestUnparsedError(t *testing.T) {
@@ -168,9 +170,12 @@
t.Skip("the ability to interrupt an apply was added in protocol 5.0 in Terraform 0.12, so test is not valid")
}
- // sleep will not react to SIGINT
- // This ensures that process is killed within the expected time limit.
- tf.SetWaitDelay(500 * time.Millisecond)
+ if !tfv.GreaterThanOrEqual(gracefulShutdownMinVersion) {
+ // Versions < 1.1 will not react to SIGINT.
+ // This ensures the process is killed within the expected time limit.
+ tf.SetEnableLegacyPipeClosing(true)
+ tf.SetWaitDelay(500 * time.Millisecond)
+ }
err := tf.Init(context.Background())
if err != nil {
@@ -200,6 +205,51 @@
})
}
+func TestContext_sleepGracefulShutdown(t *testing.T) {
+ runTest(t, "sleep", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) {
+ // only testing versions that can shut down gracefully
+ if !tfv.GreaterThanOrEqual(gracefulShutdownMinVersion) {
+ t.Skip("graceful shutdown was added in Terraform 1.1, so test is not valid")
+ }
+
+ err := tf.Init(context.Background())
+ if err != nil {
+ t.Fatalf("err during init: %s", err)
+ }
+
+ ctx := context.Background()
+ ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+
+ errCh := make(chan error)
+ go func() {
+ err = tf.Apply(ctx)
+ if err != nil {
+ errCh <- err
+ }
+ }()
+
+ select {
+ case err := <-errCh:
+ if !errors.Is(err, context.DeadlineExceeded) {
+ t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
+ }
+ var ee *exec.ExitError
+ if !errors.As(err, &ee) {
+ t.Fatalf("expected exec.ExitError, got %T, %s", err, err)
+ }
+ if !ee.Exited() {
+ t.Fatalf("expected process to have exited, but it did not (%s)", ee.ProcessState.String())
+ }
+ if ee.ExitCode() != 1 {
+ t.Fatalf("expected exit code 1, got %d", ee.ExitCode())
+ }
+ case <-time.After(time.Second * 10):
+ t.Fatal("terraform apply should have canceled and returned in ~5s")
+ }
+ })
+}
+
func TestContext_alreadyCancelled(t *testing.T) {
runTest(t, "", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) {
ctx, cancel := context.WithCancel(context.Background())
diff --git a/tfexec/terraform.go b/tfexec/terraform.go
index 9088616..9ea21c5 100644
--- a/tfexec/terraform.go
+++ b/tfexec/terraform.go
@@ -73,6 +73,9 @@
// waitDelay represents the WaitDelay field of the [exec.Cmd] of Terraform
waitDelay time.Duration
+ // enableLegacyPipeClosing closes the stdout/stderr pipes before calling [exec.Cmd.Wait]
+ enableLegacyPipeClosing bool
+
versionLock sync.Mutex
execVersion *version.Version
provVersions map[string]*version.Version
@@ -232,6 +235,16 @@
return nil
}
+// SetEnableLegacyPipeClosing causes the library to "force-close" stdio pipes.
+// This works around a bug in Terraform < v1.1 that would otherwise leave
+// the process (and caller) hanging after graceful shutdown.
+//
+// This option can be safely ignored (set to false) with Terraform 1.1+.
+func (tf *Terraform) SetEnableLegacyPipeClosing(enabled bool) error {
+ tf.enableLegacyPipeClosing = enabled
+ return nil
+}
+
// WorkingDir returns the working directory for Terraform.
func (tf *Terraform) WorkingDir() string {
return tf.workingDir