fancy-army-59540
06/21/2023, 12:46 AMfunc (t *Transcriber) start() error {
defer func() {
close(t.closeCh)
}()
for {
// stream, err := t.newStream()
stream, err := t.newDeepgramStream()
if err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Canceled {
return nil
}
logger.Errorw("failed to create a new speech stream", err)
t.results <- RecognizeResult{
Error: err,
}
return err
}
endStreamCh := make(chan struct{})
nextCh := make(chan struct{})
// Forward oggreader to the speech stream
go func() {
defer close(nextCh)
buf := make([]byte, 1024)
for {
select {
case <-endStreamCh:
return
default:
n, err := t.oggReader.Read(buf)
if err != nil {
if err != io.EOF {
logger.Errorw("failed to read from ogg reader", err)
}
return
}
if n <= 0 {
continue // No data
}
if err := stream.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil {
if err != io.EOF {
logger.Errorw("failed to forward audio data to speech stream", err)
t.results <- RecognizeResult{
Error: err,
}
}
return
}
}
}
}()
// Read transcription results
for {
_, msg, err := stream.ReadMessage() // Read from Deepgram
if err != nil {
log.Println("failed to read transcription message", err)
break
} else {
log.Println("Success awesome !!")
}
jsonParsed, _ := gabs.ParseJSON(msg)
log.Printf("recv 0--------------------------00000 : %s", jsonParsed.Path("channel.alternatives.0.transcript").String())
var txtMsg = jsonParsed.Path("channel.alternatives.0.transcript").String()
t.results <- RecognizeResult{
Text: txtMsg,
IsFinal: true,
}
}
close(endStreamCh)
// When nothing is written on the transcriber (The track is muted), this will block because the oggReader
// is waiting for data. It avoids to create useless speech streams. (Also we end up here because Google automatically close the
// previous stream when there's no "activity")
//
// Otherwise (When we have data) it is used to wait for the end of the current stream,
// so we can create the next one and reset the oggSerializer
<-nextCh
// Create a new oggSerializer each time we open a new SpeechStream
// This is required because the stream requires ogg headers to be sent again
t.lock.Lock()
t.oggSerializer = nil
t.lock.Unlock()
}
}
func (t *Transcriber) newDeepgramStream() (*websocket.Conn, error) {
dg := *deepgram.NewClient("<MY API KEY....>")
options := deepgram.LiveTranscriptionOptions{
Language: "en-US",
Punctuate: true,
}
dgConn, _, err := dg.LiveTranscription(options)
dgConn.SetWriteDeadline(time.Time{})
dgConn.SetReadDeadline(time.Time{})
if err != nil {
return nil, err
}
return dgConn, nil
}