(Re)Implement QueryJSON as LogMsg emitter
diff --git a/tfexec/cmd.go b/tfexec/cmd.go
index cd5a7e2..c8d8d9a 100644
--- a/tfexec/cmd.go
+++ b/tfexec/cmd.go
@@ -18,6 +18,7 @@
 	"strings"
 
 	"github.com/hashicorp/terraform-exec/internal/version"
+	tfjson "github.com/hashicorp/terraform-json"
 )
 
 const (
@@ -216,6 +217,55 @@
 	return dec.Decode(v)
 }
 
+func (tf *Terraform) runTerraformCmdJSONLog(ctx context.Context, cmd *exec.Cmd) *LogMsgEmitter {
+	pr, pw := io.Pipe()
+	tf.SetStdout(pw)
+
+	go func() {
+		_ = tf.runTerraformCmd(ctx, cmd)
+		// TODO: handle error
+		_ = pr.Close()
+		// TODO: handle error
+		_ = pw.Close()
+		// TODO: handle error
+	}()
+
+	return newLogMsgEmitter(pr)
+}
+
+func newLogMsgEmitter(stdout io.ReadCloser) *LogMsgEmitter {
+	return &LogMsgEmitter{
+		scanner:            bufio.NewScanner(stdout),
+		stdoutReaderCloser: stdout,
+	}
+}
+
+type LogMsgEmitter struct {
+	scanner            *bufio.Scanner
+	stdoutReaderCloser io.Closer
+}
+
+// NextMessage returns next decoded message along with true until the last one.
+// Stdout reader is closed when the last message is received.
+//
+// Error returned can be related to decoding of the message (nil, true, error)
+// or closing of stdout reader (nil, false, error).
+//
+// Any error coming from Terraform (such as wrong configuration syntax) is
+// represented as LogMsg of Level [tfjson.Error].
+func (e *LogMsgEmitter) NextMessage() (tfjson.LogMsg, bool, error) {
+	ok := e.scanner.Scan()
+	if !ok {
+		return nil, ok, nil
+	}
+	msg, err := tfjson.UnmarshalLogMessage(e.scanner.Bytes())
+	return msg, true, err
+}
+
+func (e *LogMsgEmitter) Err() error {
+	return e.scanner.Err()
+}
+
 // mergeUserAgent does some minor deduplication to ensure we aren't
 // just using the same append string over and over.
 func mergeUserAgent(uas ...string) string {
diff --git a/tfexec/internal/e2etest/query_test.go b/tfexec/internal/e2etest/query_test.go
index 4d7f16b..4b9ab41 100644
--- a/tfexec/internal/e2etest/query_test.go
+++ b/tfexec/internal/e2etest/query_test.go
@@ -4,16 +4,15 @@
 package e2etest
 
 import (
-	"bytes"
 	"context"
-	"io"
 	"regexp"
-	"strings"
 	"testing"
 
+	"github.com/google/go-cmp/cmp"
 	"github.com/hashicorp/go-version"
 	"github.com/hashicorp/terraform-exec/tfexec"
 	"github.com/hashicorp/terraform-exec/tfexec/internal/testutil"
+	tfjson "github.com/hashicorp/terraform-json"
 )
 
 func TestQueryJSON_TF112(t *testing.T) {
@@ -27,7 +26,7 @@
 
 		re := regexp.MustCompile("terraform query -json was added in 1.14.0")
 
-		err = tf.QueryJSON(context.Background(), io.Discard)
+		_, err = tf.QueryJSON(context.Background())
 		if err != nil && !re.MatchString(err.Error()) {
 			t.Fatalf("error running Query: %s", err)
 		}
@@ -43,15 +42,39 @@
 			t.Fatalf("error running Init in test directory: %s", err)
 		}
 
-		var output bytes.Buffer
-		err = tf.QueryJSON(context.Background(), &output)
+		le, err := tf.QueryJSON(context.Background())
 		if err != nil {
 			t.Fatalf("error running Query: %s", err)
 		}
 
-		results := strings.Count(output.String(), "list.concept_pet.pets: Result found")
+		results := 0
+		var completeData tfjson.ListCompleteData
+		for {
+			msg, ok, err := le.NextMessage()
+			if !ok {
+				break
+			}
+			if err != nil {
+				t.Fatalf("error getting next message: %s", err)
+			}
+			switch m := msg.(type) {
+			case tfjson.ListResourceFoundMessage:
+				results++
+			case tfjson.ListCompleteMessage:
+				completeData = m.ListComplete
+			}
+		}
+
 		if results != 5 {
 			t.Fatalf("expected 5 query results, but got %d", results)
 		}
+		expectedData := tfjson.ListCompleteData{
+			Address:      "list.concept_pet.pets",
+			ResourceType: "concept_pet",
+			Total:        5,
+		}
+		if diff := cmp.Diff(expectedData, completeData); diff != "" {
+			t.Fatalf("unexpected complete message data: %s", diff)
+		}
 	})
 }
diff --git a/tfexec/query.go b/tfexec/query.go
index 04021d1..674339b 100644
--- a/tfexec/query.go
+++ b/tfexec/query.go
@@ -6,7 +6,6 @@
 import (
 	"context"
 	"fmt"
-	"io"
 	"os/exec"
 )
 
@@ -57,25 +56,18 @@
 //
 // QueryJSON is likely to be removed in a future major version in favour of
 // query returning JSON by default.
-func (tf *Terraform) QueryJSON(ctx context.Context, w io.Writer, opts ...QueryOption) error {
+func (tf *Terraform) QueryJSON(ctx context.Context, opts ...QueryOption) (*LogMsgEmitter, error) {
 	err := tf.compatible(ctx, tf1_14_0, nil)
 	if err != nil {
-		return fmt.Errorf("terraform query -json was added in 1.14.0: %w", err)
+		return nil, fmt.Errorf("terraform query -json was added in 1.14.0: %w", err)
 	}
 
-	tf.SetStdout(w)
-
-	cmd, err := tf.queryJSONCmd(ctx, opts...)
+	queryCmd, err := tf.queryJSONCmd(ctx, opts...)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	err = tf.runTerraformCmd(ctx, cmd)
-	if err != nil {
-		return err
-	}
-
-	return nil
+	return tf.runTerraformCmdJSONLog(ctx, queryCmd), nil
 }
 
 func (tf *Terraform) queryJSONCmd(ctx context.Context, opts ...QueryOption) (*exec.Cmd, error) {